~ubuntu-branches/debian/jessie/lftp/jessie

« back to all changes in this revision

Viewing changes to src/FileCopy.cc

  • Committer: Bazaar Package Importer
  • Author(s): Noèl Köthe
  • Date: 2004-06-01 04:01:39 UTC
  • Revision ID: james.westby@ubuntu.com-20040601040139-negt39q17jhkv3i4
Tags: upstream-3.0.5
Import upstream version 3.0.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * lftp - file transfer program
 
3
 *
 
4
 * Copyright (c) 1999-2000 by Alexander V. Lukyanov (lav@yars.free.net)
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 
19
 */
 
20
 
 
21
/* $Id: FileCopy.cc,v 1.102 2004/03/19 14:20:55 lav Exp $ */
 
22
 
 
23
/* FileCopyPeer behaviour:
 
24
    1) when suspended, does nothing
 
25
    2) tries to read some data at seek_pos, sets pos to position of Get (get).
 
26
    2.5) tries to position to seek_pos and gets ready to write (put).
 
27
    3) if it cannot seek to seek_pos, changes pos to what it can seek.
 
28
    4) if it knows that it cannot seek to pos>0, CanSeek()==false
 
29
    5) if it knows that it cannot seek to pos==0, CanSeek0()==false
 
30
    6) it tries to get date/size if told to. (get)
 
31
    7) it sets date on the file if eof is reached and date is known (put).
 
32
    8) if put needs size/date before it writes data, NeedSizeDateBeforehand()==true.
 
33
 */
 
34
 
 
35
#include <config.h>
 
36
#include <assert.h>
 
37
#include <unistd.h>
 
38
#include <errno.h>
 
39
#include <sys/types.h>
 
40
#include <sys/stat.h>
 
41
#include <fcntl.h>
 
42
#include <math.h>
 
43
#include "FileCopy.h"
 
44
#include "url.h"
 
45
#include "log.h"
 
46
#include "misc.h"
 
47
#include "LsCache.h"
 
48
#include "plural.h"
 
49
#include "OutputJob.h"
 
50
 
 
51
#define skip_threshold 0x1000
 
52
#define debug(a) Log::global->Format a
 
53
 
 
54
ResDecl rate_period  ("xfer:rate-period","15", ResMgr::UNumberValidate,ResMgr::NoClosure);
 
55
ResDecl eta_period   ("xfer:eta-period", "120",ResMgr::UNumberValidate,ResMgr::NoClosure);
 
56
ResDecl max_redir    ("xfer:max-redirections", "0",ResMgr::UNumberValidate,ResMgr::NoClosure);
 
57
 
 
58
// FileCopy
 
59
#define super SMTask
 
60
 
 
61
int FileCopy::Do()
 
62
{
 
63
   int m=STALL;
 
64
   const char *b;
 
65
   int s;
 
66
   int rate_add;
 
67
 
 
68
   if(Error() || Done())
 
69
      return m;
 
70
   switch(state)
 
71
   {
 
72
   pre_INITIAL:
 
73
      state=INITIAL;
 
74
      m=MOVED;
 
75
   case(INITIAL):
 
76
      if(remove_target_first && !put->FileRemoved())
 
77
         return m;
 
78
      remove_target_first=false;
 
79
      if(put->NeedSizeDateBeforehand() || (cont && put->CanSeek()))
 
80
      {
 
81
         if(get->GetSize()==NO_SIZE_YET || get->GetDate()==NO_DATE_YET)
 
82
         {
 
83
            put->Suspend();
 
84
            get->DontStartTransferYet();
 
85
            get->Resume();
 
86
            get->WantSize();
 
87
            if(put->NeedDate())
 
88
               get->WantDate();
 
89
            goto pre_GET_INFO_WAIT;
 
90
         }
 
91
      }
 
92
      if(get->GetSize()!=NO_SIZE && get->GetSize()!=NO_SIZE_YET)
 
93
         put->SetEntitySize(get->GetSize());
 
94
      else if(get->GetSize()==NO_SIZE_YET)
 
95
         get->WantSize();
 
96
      if(get->GetDate()!=NO_DATE && get->GetDate()!=NO_DATE_YET)
 
97
         put->SetDate(get->GetDate());
 
98
      else if(get->GetDate()==NO_DATE_YET)
 
99
      {
 
100
         if(put->NeedDate())
 
101
            get->WantDate();
 
102
      }
 
103
 
 
104
      if(cont && put->CanSeek())
 
105
         put->Seek(FILE_END);
 
106
      else
 
107
      {
 
108
         if(put->range_start>0 && put->CanSeek())
 
109
            put->Seek(put->range_start);
 
110
         if(get->range_start>0 && get->CanSeek())
 
111
            get->Seek(get->range_start);
 
112
         goto pre_DO_COPY;
 
113
      }
 
114
 
 
115
      get->Suspend();
 
116
      put->Resume();
 
117
      state=PUT_WAIT;
 
118
      m=MOVED;
 
119
      /* fallthrough */
 
120
   case(PUT_WAIT):
 
121
      if(put->Error())
 
122
         goto put_error;
 
123
      if(put->GetSeekPos()!=FILE_END && get->GetSize()>=0
 
124
      && put->GetSeekPos()>=get->GetSize())
 
125
      {
 
126
         debug((9,_("copy: destination file is already complete\n")));
 
127
         if(get->GetDate()!=NO_DATE)
 
128
            goto pre_CONFIRM_WAIT;  // have to set the date.
 
129
         goto pre_GET_DONE_WAIT;
 
130
      }
 
131
      if(!put->IOReady())
 
132
         return m;
 
133
      /* now we know if put's seek failed. Seek get accordingly. */
 
134
      if(get->CanSeek())
 
135
         get->Seek(put->GetRealPos());
 
136
   pre_DO_COPY:
 
137
      get->Resume();
 
138
      get->StartTransfer();
 
139
      RateReset();
 
140
      state=DO_COPY;
 
141
      m=MOVED;
 
142
      /* fallthrough */
 
143
   case(DO_COPY): {
 
144
      if(put->Error())
 
145
      {
 
146
      put_error:
 
147
         SetError(put->ErrorText());
 
148
         return MOVED;
 
149
      }
 
150
      if(get->Error())
 
151
      {
 
152
      get_error:
 
153
         SetError(get->ErrorText());
 
154
         return MOVED;
 
155
      }
 
156
      if(put->Broken())
 
157
      {
 
158
         get->Suspend();
 
159
         if(!put->Done())
 
160
            return m;
 
161
         debug((9,_("copy: put is broken\n")));
 
162
         if(fail_if_broken)
 
163
         {
 
164
            SetError(strerror(EPIPE));
 
165
            return MOVED;
 
166
         }
 
167
         goto pre_GET_DONE_WAIT;
 
168
      }
 
169
      put->Resume();
 
170
      if(put->GetSeekPos()==FILE_END)   // put position is not known yet.
 
171
      {
 
172
         get->Suspend();
 
173
         return m;
 
174
      }
 
175
      get->Resume();
 
176
      if(fail_if_cannot_seek && (get->GetRealPos()<get->range_start
 
177
                              || put->GetRealPos()<put->range_start
 
178
                              || get->GetRealPos()!=put->GetRealPos()))
 
179
      {
 
180
         SetError(_("seek failed"));
 
181
         return MOVED;
 
182
      }
 
183
      if(get->GetSize()>0 && get->GetRealPos()>get->GetSize())
 
184
      {
 
185
         get->SetSize(NO_SIZE_YET);
 
186
         get->SetDate(NO_DATE_YET);
 
187
      }
 
188
      long lbsize=0;
 
189
      if(line_buffer)
 
190
         lbsize=line_buffer->Size();
 
191
      /* check if positions are correct */
 
192
      off_t get_pos=get->GetRealPos()-get->range_start;
 
193
      off_t put_pos=put->GetRealPos()-put->range_start;
 
194
      if(get_pos-lbsize!=put_pos)
 
195
      {
 
196
         if(line_buffer)
 
197
            line_buffer->Empty();
 
198
         if(get_pos==put_pos)
 
199
         {  // rare case.
 
200
            return MOVED;
 
201
         }
 
202
         if(put_pos<get_pos)
 
203
         {
 
204
            if(!get->CanSeek(put->GetRealPos()))
 
205
            {
 
206
               // we lose... How about a large buffer?
 
207
               SetError(_("cannot seek on data source"));
 
208
               return MOVED;
 
209
            }
 
210
            debug((9,_("copy: put rolled back to %lld, seeking get accordingly\n"),
 
211
                     (long long)put->GetRealPos()));
 
212
            debug((10,"copy: get position was %lld\n",
 
213
                     (long long)get->GetRealPos()));
 
214
            get->Seek(put->GetRealPos());
 
215
            return MOVED;
 
216
         }
 
217
         else // put_pos > get_pos
 
218
         {
 
219
            off_t size=get->GetSize();
 
220
            if(size>=0 && put->GetRealPos()>=size)
 
221
            {
 
222
               // simulate eof, as we have already have the whole file.
 
223
               debug((9,_("copy: all data received, but get rolled back\n")));
 
224
               goto eof;
 
225
            }
 
226
            off_t skip=put->GetRealPos()-get->GetRealPos();
 
227
            if(!put->CanSeek(get->GetRealPos()) || skip<skip_threshold)
 
228
            {
 
229
               // we have to skip some data
 
230
               get->Get(&b,&s);
 
231
               if(skip>s)
 
232
                  skip=s;
 
233
               if(skip==0)
 
234
                  return m;
 
235
               get->Skip(skip);
 
236
               bytes_count+=skip;
 
237
               return MOVED;
 
238
            }
 
239
            debug((9,_("copy: get rolled back to %lld, seeking put accordingly\n"),
 
240
                     (long long)get->GetRealPos()));
 
241
            put->Seek(get->GetRealPos());
 
242
            return MOVED;
 
243
         }
 
244
      }
 
245
      if(put->Size()>max_buf)
 
246
         get->Suspend(); // stall the get.
 
247
      get->Get(&b,&s);
 
248
      if(b==0) // eof
 
249
      {
 
250
         debug((10,"copy: get hit eof\n"));
 
251
         goto eof;
 
252
      }
 
253
 
 
254
      rate_add=put_buf;
 
255
 
 
256
      if(s==0)
 
257
      {
 
258
         put_buf=put->Buffered();
 
259
         rate_add-=put_buf;
 
260
         RateAdd(rate_add);
 
261
 
 
262
         if(put->Size()==0)
 
263
            put->Suspend();
 
264
         return m;
 
265
      }
 
266
      m=MOVED;
 
267
 
 
268
      if(get->range_limit!=FILE_END && get->range_limit<get->GetRealPos()+s)
 
269
         s=get->range_limit-get->GetRealPos();
 
270
 
 
271
      if(line_buffer)
 
272
      {
 
273
         const char *lb;
 
274
         int ls;
 
275
         if(line_buffer->Size()>line_buffer_max)
 
276
         {
 
277
            line_buffer->Get(&lb,&ls);
 
278
            put->Put(lb,ls);
 
279
            line_buffer->Skip(ls);
 
280
         }
 
281
         line_buffer->Put(b,s);
 
282
         get->Skip(s);
 
283
         bytes_count+=s;
 
284
 
 
285
         // now find eol in line_buffer.
 
286
         line_buffer->Get(&lb,&ls);
 
287
         while(ls>0)
 
288
         {
 
289
            const char *eol=(const char *)memchr(lb,'\n',ls);
 
290
            if(!eol)
 
291
               break;
 
292
            put->Put(lb,eol-lb+1);
 
293
            line_buffer->Skip(eol-lb+1);
 
294
            line_buffer->Get(&lb,&ls);
 
295
         }
 
296
      }
 
297
      else
 
298
      {
 
299
         put->Put(b,s);
 
300
         get->Skip(s);
 
301
         bytes_count+=s;
 
302
      }
 
303
 
 
304
      put_buf=put->Buffered();
 
305
      rate_add-=put_buf-s;
 
306
      RateAdd(rate_add);
 
307
 
 
308
      if(get->range_limit!=FILE_END && get->range_limit<=get->GetRealPos())
 
309
      {
 
310
         debug((10,"copy: get reached range limit\n"));
 
311
         goto eof;
 
312
      }
 
313
      return m;
 
314
   }
 
315
 
 
316
   eof:
 
317
      if(line_buffer)
 
318
      {
 
319
         line_buffer->Get(&b,&s);
 
320
         put->Put(b,s);
 
321
         line_buffer->Skip(s);
 
322
      }
 
323
   pre_CONFIRM_WAIT:
 
324
      put->SetDate(get->GetDate());
 
325
      if(get->GetSize()!=NO_SIZE && get->GetSize()!=NO_SIZE_YET)
 
326
         put->SetEntitySize(get->GetSize());
 
327
      put->PutEOF();
 
328
      get->Suspend();
 
329
      put->Resume();
 
330
      put_eof_pos=put->GetRealPos();
 
331
      debug((10,"copy: waiting for put confirmation\n"));
 
332
      state=CONFIRM_WAIT;
 
333
      m=MOVED;
 
334
   case(CONFIRM_WAIT):
 
335
      if(put->Error())
 
336
         goto put_error;
 
337
      /* check if put position is correct */
 
338
      if(put_eof_pos!=put->GetRealPos() || put->GetSeekPos()==FILE_END)
 
339
      {
 
340
         state=DO_COPY;
 
341
         return MOVED;
 
342
      }
 
343
 
 
344
      rate_add=put_buf;
 
345
      put_buf=put->Buffered();
 
346
      rate_add-=put_buf;
 
347
      RateAdd(rate_add);
 
348
 
 
349
      if(!put->Done())
 
350
         return m;
 
351
      debug((10,"copy: put confirmed store\n"));
 
352
 
 
353
   pre_GET_DONE_WAIT:
 
354
      get->Empty();
 
355
      get->PutEOF();
 
356
      get->Resume();
 
357
      state=GET_DONE_WAIT;
 
358
      m=MOVED;
 
359
      end_time=now;
 
360
      Delete(put); put=0;
 
361
      /* fallthrough */
 
362
   case(GET_DONE_WAIT):
 
363
      if(get->Error())
 
364
         goto get_error;
 
365
      if(remove_source_later)
 
366
      {
 
367
         get->RemoveFile();
 
368
         remove_source_later=false;
 
369
      }
 
370
      if(!get->Done())
 
371
         return m;
 
372
      debug((10,"copy: get is finished - all done\n"));
 
373
      state=ALL_DONE;
 
374
      Delete(get); get=0;
 
375
      return MOVED;
 
376
 
 
377
   pre_GET_INFO_WAIT:
 
378
      state=GET_INFO_WAIT;
 
379
      m=MOVED;
 
380
   case(GET_INFO_WAIT):
 
381
      if(get->Error())
 
382
         goto get_error;
 
383
      if(get->GetSize()==NO_SIZE_YET || get->GetDate()==NO_DATE_YET)
 
384
         return m;
 
385
      goto pre_INITIAL;
 
386
 
 
387
   case(ALL_DONE):
 
388
      return m;
 
389
   }
 
390
   return m;
 
391
}
 
392
 
 
393
void FileCopy::Init()
 
394
{
 
395
   get=0;
 
396
   put=0;
 
397
   state=INITIAL;
 
398
   max_buf=0x10000;
 
399
   cont=false;
 
400
   error_text=0;
 
401
   rate        =new Speedometer("xfer:rate-period");
 
402
   rate_for_eta=new Speedometer("xfer:eta-period");
 
403
   put_buf=0;
 
404
   put_eof_pos=0;
 
405
   bytes_count=0;
 
406
   fail_if_cannot_seek=false;
 
407
   fail_if_broken=true;
 
408
   remove_source_later=false;
 
409
   remove_target_first=false;
 
410
   line_buffer=0;
 
411
   line_buffer_max=0;
 
412
}
 
413
 
 
414
FileCopy::FileCopy(FileCopyPeer *s,FileCopyPeer *d,bool c)
 
415
{
 
416
   Init();
 
417
   get=s;
 
418
   put=d;
 
419
   cont=c;
 
420
}
 
421
FileCopy::~FileCopy()
 
422
{
 
423
   Delete(get);
 
424
   Delete(put);
 
425
   delete line_buffer;
 
426
   xfree(error_text);
 
427
   Delete(rate);
 
428
   Delete(rate_for_eta);
 
429
}
 
430
FileCopy *FileCopy::New(FileCopyPeer *s,FileCopyPeer *d,bool c)
 
431
{
 
432
   FileCopy *res=0;
 
433
   if(fxp_create)
 
434
      res=fxp_create(s,d,c);
 
435
   if(res)
 
436
      return res;
 
437
   return new FileCopy(s,d,c);
 
438
}
 
439
void FileCopy::Suspend()
 
440
{
 
441
   if(get) get->Suspend();
 
442
   if(put) put->Suspend();
 
443
   super::Suspend();
 
444
}
 
445
void FileCopy::Resume()
 
446
{
 
447
   super::Resume();
 
448
   if(state!=PUT_WAIT)
 
449
   {
 
450
      if(get && !(put && put->Size()>=max_buf))
 
451
         get->Resume();
 
452
   }
 
453
   if(state!=GET_INFO_WAIT)
 
454
   {
 
455
      if(put)
 
456
         put->Resume();
 
457
   }
 
458
}
 
459
void FileCopy::Fg()
 
460
{
 
461
   if(get) get->Fg();
 
462
   if(put) put->Fg();
 
463
}
 
464
void FileCopy::Bg()
 
465
{
 
466
   if(get) get->Bg();
 
467
   if(put) put->Bg();
 
468
}
 
469
 
 
470
void FileCopy::Reconfig(const char *s)
 
471
{
 
472
}
 
473
 
 
474
void FileCopy::SetError(const char *str)
 
475
{
 
476
   xfree(error_text);
 
477
   error_text=xstrdup(str);
 
478
   Delete(get); get=0;
 
479
   Delete(put); put=0;
 
480
}
 
481
 
 
482
void FileCopy::LineBuffered(int s)
 
483
{
 
484
   if(!line_buffer)
 
485
      line_buffer=new Buffer();
 
486
   line_buffer_max=s;
 
487
}
 
488
 
 
489
off_t FileCopy::GetPos()
 
490
{
 
491
   if(put)
 
492
      return put->GetRealPos() - put->Buffered();
 
493
   if(get)
 
494
      return get->GetRealPos();
 
495
   return 0;
 
496
}
 
497
 
 
498
off_t FileCopy::GetSize()
 
499
{
 
500
   if(get)
 
501
      return get->GetSize();
 
502
   return NO_SIZE;
 
503
}
 
504
 
 
505
int FileCopy::GetPercentDone()
 
506
{
 
507
   if(!get || !put)
 
508
      return 100;
 
509
   off_t size=get->GetSize();
 
510
   if(size==NO_SIZE || size==NO_SIZE_YET)
 
511
      return -1;
 
512
   if(size==0)
 
513
      return 0;
 
514
   off_t ppos=put->GetRealPos() - put->Buffered() - put->range_start;
 
515
   if(ppos<0)
 
516
      return 0;
 
517
   off_t psize=size-put->range_start;
 
518
   if(put->range_limit!=FILE_END)
 
519
      psize=put->range_limit-put->range_start;
 
520
   if(psize<0)
 
521
      return 100;
 
522
   if(ppos>psize)
 
523
      return -1;
 
524
   return percent(ppos,psize);
 
525
}
 
526
const char *FileCopy::GetPercentDoneStr()
 
527
{
 
528
   int pct=GetPercentDone();
 
529
   if(pct==-1)
 
530
      return "";
 
531
   static char buf[6];
 
532
   sprintf(buf,"(%d%%) ",pct);
 
533
   return buf;
 
534
}
 
535
float FileCopy::GetRate()
 
536
{
 
537
   if(!rate->Valid() || !put)
 
538
      return 0;
 
539
   return rate->Get();
 
540
}
 
541
const char *FileCopy::GetRateStr()
 
542
{
 
543
   if(!rate->Valid() || !put)
 
544
      return "";
 
545
   return rate->GetStrS();
 
546
}
 
547
off_t FileCopy::GetBytesRemaining()
 
548
{
 
549
   if(!get)
 
550
      return 0;
 
551
   if(get->range_limit==FILE_END)
 
552
   {
 
553
      off_t size=get->GetSize();
 
554
      if(size<=0 || size<get->GetRealPos() || !rate_for_eta->Valid())
 
555
         return -1;
 
556
      return(size-GetPos());
 
557
   }
 
558
   return get->range_limit-GetPos();
 
559
}
 
560
const char *FileCopy::GetETAStr()
 
561
{
 
562
   off_t b=GetBytesRemaining();
 
563
   if(b<0 || !put)
 
564
      return "";
 
565
   return rate_for_eta->GetETAStrSFromSize(b);
 
566
}
 
567
long FileCopy::GetETA(off_t b)
 
568
{
 
569
   if(b<0 || !rate_for_eta->Valid())
 
570
      return -1;
 
571
   return (long)(double(b) / rate_for_eta->Get() + 0.5);
 
572
}
 
573
const char *FileCopy::GetStatus()
 
574
{
 
575
   static char *buf;
 
576
   xfree(buf); buf=0;
 
577
   const char *get_st=0;
 
578
   if(get)
 
579
      get_st=get->GetStatus();
 
580
   const char *put_st=0;
 
581
   if(put)
 
582
      put_st=put->GetStatus();
 
583
   if(get_st && get_st[0] && put_st && put_st[0])
 
584
      buf=xasprintf("[%s->%s]",get_st,put_st);
 
585
   else if(get_st && get_st[0])
 
586
      buf=xasprintf("[%s]",get_st);
 
587
   else if(put_st && put_st[0])
 
588
      buf=xasprintf("[%s]",put_st);
 
589
   else
 
590
      return "";
 
591
   return buf;
 
592
}
 
593
 
 
594
double FileCopy::GetTimeSpent()
 
595
{
 
596
   if(end_time<start_time)
 
597
      return 0;
 
598
   return TimeDiff(end_time,start_time);
 
599
}
 
600
 
 
601
FgData *FileCopy::GetFgData(bool fg)
 
602
{
 
603
   // NOTE: only one of get/put can have FgData in this implementation.
 
604
   FgData *f=0;
 
605
   if(get) f=get->GetFgData(fg);
 
606
   if(f) return f;
 
607
   if(put) f=put->GetFgData(fg);
 
608
   return f;
 
609
}
 
610
 
 
611
pid_t FileCopy::GetProcGroup()
 
612
{
 
613
   pid_t p=0;
 
614
   if(get) p=get->GetProcGroup();
 
615
   if(p) return p;
 
616
   if(put) p=put->GetProcGroup();
 
617
   return p;
 
618
}
 
619
 
 
620
void FileCopy::Kill(int sig)
 
621
{
 
622
   if(get) get->Kill(sig);
 
623
   if(put) put->Kill(sig);
 
624
}
 
625
 
 
626
// FileCopyPeer implementation
 
627
#undef super
 
628
#define super Buffer
 
629
void FileCopyPeer::SetSize(off_t s)
 
630
{
 
631
   size=s;
 
632
   if(seek_pos==FILE_END)
 
633
   {
 
634
      if(size!=NO_SIZE && size!=NO_SIZE_YET)
 
635
         seek_pos=size;
 
636
      else
 
637
         seek_pos=0;
 
638
   }
 
639
}
 
640
void FileCopyPeer::SetDate(time_t d)
 
641
{
 
642
   date=d;
 
643
   if(date==NO_DATE || date==NO_DATE_YET)
 
644
      date_set=true;
 
645
   else
 
646
      date_set=false;
 
647
}
 
648
 
 
649
bool FileCopyPeer::Done()
 
650
{
 
651
   if(Error())
 
652
      return true;
 
653
   if(eof && in_buffer==0)
 
654
   {
 
655
      if(removing)
 
656
         return false;
 
657
      if(mode==PUT)
 
658
         return done;
 
659
      return true;
 
660
   }
 
661
   if(broken)
 
662
      return true;
 
663
   return false;
 
664
}
 
665
 
 
666
FileCopyPeer::FileCopyPeer(dir_t m) : IOBuffer(m)
 
667
{
 
668
   want_size=false;
 
669
   want_date=false;
 
670
   start_transfer=true;
 
671
   size=NO_SIZE_YET;
 
672
   e_size=NO_SIZE;
 
673
   date=NO_DATE_YET;
 
674
   seek_pos=0;
 
675
   can_seek=false;
 
676
   can_seek0=false;
 
677
   date_set=false;
 
678
   do_set_date=true;
 
679
   ascii=false;
 
680
   range_start=0;
 
681
   range_limit=FILE_END;
 
682
   removing=false;
 
683
   file_removed=false;
 
684
   use_cache=true;
 
685
   write_allowed=true;
 
686
   done=false;
 
687
   Suspend();  // don't do anything too early
 
688
}
 
689
FileCopyPeer::~FileCopyPeer()
 
690
{
 
691
}
 
692
 
 
693
// FileCopyPeerFA implementation
 
694
#undef super
 
695
#define super FileCopyPeer
 
696
int FileCopyPeerFA::Do()
 
697
{
 
698
   int m=STALL;
 
699
   int res;
 
700
 
 
701
   if(removing)
 
702
   {
 
703
      res=session->Done();
 
704
      if(res<=0)
 
705
      {
 
706
         removing=false;
 
707
         file_removed=true;
 
708
         session->Close();
 
709
         Suspend();
 
710
         return MOVED;
 
711
      }
 
712
      else
 
713
         return m;
 
714
   }
 
715
 
 
716
   if(Done() || Error())
 
717
      return m;
 
718
   if(want_size && size==NO_SIZE_YET && (mode==PUT || !start_transfer))
 
719
   {
 
720
      if(session->IsClosed())
 
721
      {
 
722
         info.file=file;
 
723
         info.get_size=true;
 
724
         info.get_time=want_date;
 
725
         session->GetInfoArray(&info,1);
 
726
         m=MOVED;
 
727
      }
 
728
      res=session->Done();
 
729
      if(res==FA::IN_PROGRESS)
 
730
         return m;
 
731
      if(res<0)
 
732
      {
 
733
         session->Close();
 
734
         SetSize(NO_SIZE);
 
735
         return MOVED;
 
736
      }
 
737
      SetSize(info.size);
 
738
      SetDate(info.time);
 
739
      session->Close();
 
740
      return MOVED;
 
741
   }
 
742
   switch(mode)
 
743
   {
 
744
   case PUT:
 
745
      if(fxp)
 
746
      {
 
747
         if(eof)
 
748
            goto fxp_eof;
 
749
         return m;
 
750
      }
 
751
      res=Put_LL(buffer+buffer_ptr,in_buffer);
 
752
      if(res>0)
 
753
      {
 
754
         in_buffer-=res;
 
755
         buffer_ptr+=res;
 
756
         m=MOVED;
 
757
      }
 
758
      else if(res<0)
 
759
         return MOVED;
 
760
      if(in_buffer==0)
 
761
      {
 
762
         if(eof)
 
763
         {
 
764
            if(date!=NO_DATE && date!=NO_DATE_YET)
 
765
               session->SetDate(date);
 
766
            if(e_size!=NO_SIZE && e_size!=NO_SIZE_YET)
 
767
               session->SetSize(e_size);
 
768
            res=session->StoreStatus();
 
769
            if(res==FA::OK)
 
770
            {
 
771
               session->Close();
 
772
            fxp_eof:
 
773
               // FIXME: set date for real.
 
774
               date_set=true;
 
775
               done=true;
 
776
               m=MOVED;
 
777
            }
 
778
            else if(res==FA::IN_PROGRESS)
 
779
               return m;
 
780
            else
 
781
            {
 
782
               if(res==FA::DO_AGAIN)
 
783
                  return m;
 
784
               if(res==FA::STORE_FAILED)
 
785
               {
 
786
                  try_time=session->GetTryTime();
 
787
                  retries=session->GetRetries();
 
788
                  Log::global->Format(10,"try_time=%ld, retries=%d\n",try_time,retries);
 
789
                  session->Close();
 
790
                  if(can_seek && seek_pos>0)
 
791
                     Seek(FILE_END);
 
792
                  else
 
793
                     Seek(0);
 
794
                  return MOVED;
 
795
               }
 
796
               SetError(session->StrError(res));
 
797
               return MOVED;
 
798
            }
 
799
            return m;
 
800
         }
 
801
      }
 
802
      break;
 
803
 
 
804
   case GET:
 
805
      if(eof)
 
806
         return m;
 
807
      if(fxp)
 
808
         return m;
 
809
      res=Get_LL(GET_BUFSIZE);
 
810
      if(res>0)
 
811
      {
 
812
         in_buffer+=res;
 
813
         SaveMaxCheck(0);
 
814
         return MOVED;
 
815
      }
 
816
      if(res<0)
 
817
         return MOVED;
 
818
      if(eof)
 
819
      {
 
820
         session->Close();
 
821
         return MOVED;
 
822
      }
 
823
      break;
 
824
   }
 
825
   return m;
 
826
}
 
827
 
 
828
bool FileCopyPeerFA::IOReady()
 
829
{
 
830
   if(seek_pos==0)
 
831
      return true;
 
832
   if(seek_pos==FILE_END && size==NO_SIZE_YET)
 
833
      return false;
 
834
   return session->IOReady();
 
835
}
 
836
 
 
837
void FileCopyPeerFA::Suspend()
 
838
{
 
839
   if(fxp && mode==PUT)
 
840
      return;
 
841
   session->Suspend();
 
842
   super::Suspend();
 
843
}
 
844
void FileCopyPeerFA::Resume()
 
845
{
 
846
   super::Resume();
 
847
   session->Resume();
 
848
}
 
849
 
 
850
const char *FileCopyPeerFA::GetStatus()
 
851
{
 
852
   if(!session->IsOpen())
 
853
      return 0;
 
854
   return session->CurrentStatus();
 
855
}
 
856
 
 
857
void FileCopyPeerFA::Seek(off_t new_pos)
 
858
{
 
859
   if(pos==new_pos)
 
860
      return;
 
861
   super::Seek(new_pos);
 
862
   session->Close();
 
863
   if(seek_pos==FILE_END)
 
864
      WantSize();
 
865
   else
 
866
      pos=new_pos;
 
867
}
 
868
 
 
869
void FileCopyPeerFA::OpenSession()
 
870
{
 
871
   current->Timeout(0); // mark it MOVED.
 
872
   if(mode==GET)
 
873
   {
 
874
      if(size!=NO_SIZE && size!=NO_SIZE_YET && seek_pos>=size && !ascii)
 
875
      {
 
876
      past_eof:
 
877
         debug((10,"copy src: seek past eof (seek_pos=%lld, size=%lld)\n",
 
878
                  (long long)seek_pos,(long long)size));
 
879
         pos=seek_pos;
 
880
         eof=true;
 
881
         return;
 
882
      }
 
883
      const char *b;
 
884
      int s;
 
885
      if(use_cache && LsCache::Find(session,file,FAmode,&b,&s))
 
886
      {
 
887
         size=s;
 
888
         if(seek_pos>=s)
 
889
            goto past_eof;
 
890
         b+=seek_pos;
 
891
         s-=seek_pos;
 
892
         Save(0);
 
893
         Allocate(s);
 
894
         memmove(buffer+buffer_ptr,b,s);
 
895
         in_buffer=s;
 
896
         pos=seek_pos;
 
897
         eof=true;
 
898
         return;
 
899
      }
 
900
   }
 
901
   else // mode==PUT
 
902
   {
 
903
      if(e_size>=0 && size>=0 && seek_pos>=e_size)
 
904
      {
 
905
         debug((10,"copy dst: seek past eof (seek_pos=%lld, size=%lld)\n",
 
906
                  (long long)seek_pos,(long long)e_size));
 
907
         eof=true;
 
908
         if(date==NO_DATE || date==NO_DATE_YET)
 
909
            return;
 
910
      }
 
911
   }
 
912
   session->Open(file,FAmode,seek_pos);
 
913
   session->SetFileURL(orig_url);
 
914
   if(mode==PUT)
 
915
   {
 
916
      if(try_time!=0)
 
917
         session->SetTryTime(try_time);
 
918
      if(retries!=0)
 
919
         session->SetRetries(retries);
 
920
      if(e_size!=NO_SIZE && e_size!=NO_SIZE_YET)
 
921
         session->SetSize(e_size);
 
922
      if(date!=NO_DATE && date!=NO_DATE_YET)
 
923
         session->SetDate(date);
 
924
   }
 
925
   session->RereadManual();
 
926
   if(ascii)
 
927
      session->AsciiTransfer();
 
928
   if(want_size && size==NO_SIZE_YET)
 
929
      session->WantSize(&size);
 
930
   if(want_date && date==NO_DATE_YET)
 
931
      session->WantDate(&date);
 
932
   if(mode==GET)
 
933
   {
 
934
      SaveRollback(seek_pos);
 
935
      pos=seek_pos;
 
936
   }
 
937
   else
 
938
   {
 
939
      pos=seek_pos+in_buffer;
 
940
   }
 
941
}
 
942
 
 
943
void FileCopyPeerFA::RemoveFile()
 
944
{
 
945
   session->Open(file,FA::REMOVE);
 
946
   removing=true;
 
947
}
 
948
 
 
949
int FileCopyPeerFA::Get_LL(int len)
 
950
{
 
951
   int res=0;
 
952
 
 
953
   if(session->IsClosed())
 
954
      OpenSession();
 
955
 
 
956
   if(eof)  // OpenSession can set eof=true.
 
957
      return 0;
 
958
 
 
959
   off_t io_at=pos;
 
960
   if(GetRealPos()!=io_at) // GetRealPos can alter pos.
 
961
      return 0;
 
962
 
 
963
   Allocate(len);
 
964
 
 
965
   res=session->Read(buffer+buffer_ptr+in_buffer,len);
 
966
   if(res<0)
 
967
   {
 
968
      if(res==FA::DO_AGAIN)
 
969
         return 0;
 
970
      if(res==FA::FILE_MOVED)
 
971
      {
 
972
         // handle redirection.
 
973
         assert(!fxp);
 
974
         const char *loc_c=session->GetNewLocation();
 
975
         int max_redirections=max_redir.Query(0);
 
976
         if(loc_c && loc_c[0] && max_redirections>0)
 
977
         {
 
978
            Log::global->Format(3,_("copy: received redirection to `%s'\n"),loc_c);
 
979
            if(++redirections>max_redirections)
 
980
            {
 
981
               SetError(_("Too many redirections"));
 
982
               return -1;
 
983
            }
 
984
            if(FAmode==FA::QUOTE_CMD)
 
985
               FAmode=FA::RETRIEVE;
 
986
 
 
987
            char *loc=alloca_strdup(loc_c);
 
988
            session->Close(); // loc_c is no longer valid.
 
989
 
 
990
            ParsedURL u(loc,true);
 
991
 
 
992
            if(u.proto)
 
993
            {
 
994
               if(reuse_later)
 
995
                  SessionPool::Reuse(session);
 
996
 
 
997
               session=FileAccess::New(&u);
 
998
               reuse_later=true;
 
999
 
 
1000
               xfree(file);
 
1001
               file=xstrdup(u.path?u.path:"");
 
1002
               xfree(orig_url);
 
1003
               orig_url=xstrdup(loc);
 
1004
            }
 
1005
            else // !proto
 
1006
            {
 
1007
               if(orig_url)
 
1008
               {
 
1009
                  int p_ind=url::path_index(orig_url);
 
1010
                  char *s=strrchr(orig_url,'/');
 
1011
                  int s_ind=s?s-orig_url:-1;
 
1012
                  if(p_ind==-1 || s_ind==-1 || s_ind<p_ind)
 
1013
                     s_ind=p_ind=strlen(orig_url);
 
1014
                  if(loc[0]=='/')
 
1015
                  {
 
1016
                     orig_url=(char*)xrealloc(orig_url,p_ind+strlen(loc)+1);
 
1017
                     strcpy(orig_url+p_ind,loc);
 
1018
                  }
 
1019
                  else
 
1020
                  {
 
1021
                     orig_url=(char*)xrealloc(orig_url,s_ind+1+strlen(loc)+1);
 
1022
                     strcpy(orig_url+s_ind,"/");
 
1023
                     strcpy(orig_url+s_ind+1,loc);
 
1024
                  }
 
1025
               }
 
1026
 
 
1027
               url::decode_string(loc);
 
1028
               char *slash=strrchr(file,'/');
 
1029
               char *new_file;
 
1030
               if(loc[0]!='/' && slash)
 
1031
               {
 
1032
                  *slash=0;
 
1033
                  new_file=xstrdup(dir_file(file,loc));
 
1034
               }
 
1035
               else
 
1036
               {
 
1037
                  new_file=xstrdup(loc);
 
1038
               }
 
1039
               xfree(file);
 
1040
               file=new_file;
 
1041
            }
 
1042
 
 
1043
            size=NO_SIZE_YET;
 
1044
            date=NO_DATE_YET;
 
1045
 
 
1046
            try_time=0;
 
1047
            retries=0;
 
1048
            current->Timeout(0); // retry with new location.
 
1049
            return 0;
 
1050
         }
 
1051
      }
 
1052
      SetError(session->StrError(res));
 
1053
      return -1;
 
1054
   }
 
1055
   if(res==0)
 
1056
   {
 
1057
      eof=true;
 
1058
      LsCache::Add(session,file,FAmode,this);
 
1059
   }
 
1060
   return res;
 
1061
}
 
1062
 
 
1063
int FileCopyPeerFA::Put_LL(const char *buf,int len)
 
1064
{
 
1065
   if(session->IsClosed())
 
1066
      OpenSession();
 
1067
 
 
1068
   off_t io_at=pos; // GetRealPos can alter pos, save it.
 
1069
   if(GetRealPos()!=io_at)
 
1070
      return 0;
 
1071
 
 
1072
   if(len==0 && eof)
 
1073
      return 0;
 
1074
 
 
1075
   int res=session->Write(buf,len);
 
1076
   if(res<0)
 
1077
   {
 
1078
      if(res==FA::DO_AGAIN)
 
1079
         return 0;
 
1080
      if(res==FA::STORE_FAILED)
 
1081
      {
 
1082
         try_time=session->GetTryTime();
 
1083
         retries=session->GetRetries();
 
1084
         Log::global->Format(10,"try_time=%ld, retries=%d\n",try_time,retries);
 
1085
         session->Close();
 
1086
         if(can_seek && seek_pos>0)
 
1087
            Seek(FILE_END);
 
1088
         else
 
1089
            Seek(0);
 
1090
         return 0;
 
1091
      }
 
1092
      SetError(session->StrError(res));
 
1093
      return -1;
 
1094
   }
 
1095
   seek_pos+=res; // mainly to indicate that there was some output.
 
1096
   return res;
 
1097
}
 
1098
 
 
1099
off_t FileCopyPeerFA::GetRealPos()
 
1100
{
 
1101
   if(session->OpenMode()!=FAmode || fxp)
 
1102
      return pos;
 
1103
   if(mode==PUT)
 
1104
   {
 
1105
      if(pos-in_buffer!=session->GetPos())
 
1106
      {
 
1107
         Empty();
 
1108
         can_seek=false;
 
1109
         pos=session->GetPos();
 
1110
      }
 
1111
   }
 
1112
   else
 
1113
   {
 
1114
      if(eof)
 
1115
         return pos;
 
1116
      if(session->GetRealPos()==0 && session->GetPos()>0)
 
1117
      {
 
1118
         can_seek=false;
 
1119
         session->SeekReal();
 
1120
      }
 
1121
      if(pos+in_buffer!=session->GetPos())
 
1122
      {
 
1123
         SaveRollback(session->GetPos());
 
1124
         pos=session->GetPos();
 
1125
      }
 
1126
   }
 
1127
   return pos;
 
1128
}
 
1129
 
 
1130
void FileCopyPeerFA::Init()
 
1131
{
 
1132
   FAmode=FA::RETRIEVE;
 
1133
   file=0;
 
1134
   session=0;
 
1135
   reuse_later=false;
 
1136
   orig_url=0;
 
1137
   fxp=false;
 
1138
   try_time=0;
 
1139
   retries=0;
 
1140
   redirections=0;
 
1141
   can_seek=true;
 
1142
   can_seek0=true;
 
1143
}
 
1144
 
 
1145
FileCopyPeerFA::FileCopyPeerFA(FileAccess *s,const char *f,int m)
 
1146
   : FileCopyPeer(m==FA::STORE ? PUT : GET)
 
1147
{
 
1148
   Init();
 
1149
   FAmode=m;
 
1150
   file=xstrdup(f);
 
1151
   session=s;
 
1152
   reuse_later=true;
 
1153
   if(FAmode==FA::LIST || FAmode==FA::LONG_LIST)
 
1154
      Save(LsCache::SizeLimit());
 
1155
}
 
1156
FileCopyPeerFA::~FileCopyPeerFA()
 
1157
{
 
1158
   if(session)
 
1159
   {
 
1160
      session->Close();
 
1161
      if(reuse_later)
 
1162
         SessionPool::Reuse(session);
 
1163
   }
 
1164
   xfree(file);
 
1165
   xfree(orig_url);
 
1166
}
 
1167
 
 
1168
FileCopyPeerFA::FileCopyPeerFA(ParsedURL *u,int m)
 
1169
   : FileCopyPeer(m==FA::STORE ? PUT : GET)
 
1170
{
 
1171
   Init();
 
1172
   FAmode=m;
 
1173
   file=xstrdup(u->path);
 
1174
   session=FileAccess::New(u);
 
1175
   reuse_later=true;
 
1176
   orig_url=u->orig_url;
 
1177
   u->orig_url=0;
 
1178
   if(!file)
 
1179
   {
 
1180
      SetError(_("file name missed in URL"));
 
1181
   }
 
1182
}
 
1183
 
 
1184
FileCopyPeerFA *FileCopyPeerFA::New(FileAccess *s,const char *url,int m,bool reuse)
 
1185
{
 
1186
   ParsedURL u(url,true);
 
1187
   if(u.proto)
 
1188
   {
 
1189
      if(reuse)
 
1190
         SessionPool::Reuse(s);
 
1191
      return new FileCopyPeerFA(&u,m);
 
1192
   }
 
1193
   FileCopyPeerFA *peer=new FileCopyPeerFA(s,url,m);
 
1194
   if(!reuse)
 
1195
      peer->DontReuseSession();
 
1196
   return peer;
 
1197
}
 
1198
 
 
1199
// FileCopyPeerFDStream
 
1200
#undef super
 
1201
#define super FileCopyPeer
 
1202
FileCopyPeerFDStream::FileCopyPeerFDStream(FDStream *o,dir_t m)
 
1203
   : FileCopyPeer(m)
 
1204
{
 
1205
   if(o==0 && m==PUT)
 
1206
      o=new FDStream(1,"<stdout>");
 
1207
   stream=o;
 
1208
   seek_base=0;
 
1209
   delete_stream=true;
 
1210
   create_fg_data=true;
 
1211
   need_seek=false;
 
1212
   can_seek = can_seek0 = stream->can_seek();
 
1213
   if(can_seek && stream->fd!=-1)
 
1214
   {
 
1215
      seek_base=lseek(stream->fd,0,SEEK_CUR);
 
1216
      if(seek_base==-1)
 
1217
      {
 
1218
         can_seek=false;
 
1219
         can_seek0=false;
 
1220
         seek_base=0;
 
1221
      }
 
1222
   }
 
1223
   if(stream->usesfd(1))
 
1224
      write_allowed=false;
 
1225
   put_ll_timer=0;
 
1226
   if(m==PUT)
 
1227
      put_ll_timer=new Timer(TimeDiff(0,200));
 
1228
}
 
1229
FileCopyPeerFDStream::~FileCopyPeerFDStream()
 
1230
{
 
1231
   if(delete_stream)
 
1232
      delete stream;
 
1233
   delete put_ll_timer;
 
1234
}
 
1235
 
 
1236
void FileCopyPeerFDStream::Seek_LL()
 
1237
{
 
1238
   int fd=stream->fd;
 
1239
   assert(fd!=-1);
 
1240
   if(CanSeek(seek_pos))
 
1241
   {
 
1242
      if(seek_pos==FILE_END)
 
1243
      {
 
1244
         seek_pos=lseek(fd,0,SEEK_END);
 
1245
         if(seek_pos==-1)
 
1246
         {
 
1247
            can_seek=false;
 
1248
            can_seek0=false;
 
1249
            seek_pos=0;
 
1250
         }
 
1251
         else
 
1252
         {
 
1253
            SetSize(seek_pos);
 
1254
            if(seek_pos>seek_base)
 
1255
               seek_pos-=seek_base;
 
1256
            else
 
1257
               seek_pos=0;
 
1258
         }
 
1259
         pos=seek_pos;
 
1260
      }
 
1261
      else
 
1262
      {
 
1263
         if(lseek(fd,seek_pos+seek_base,SEEK_SET)==-1)
 
1264
         {
 
1265
            can_seek=false;
 
1266
            can_seek0=false;
 
1267
            seek_pos=0;
 
1268
         }
 
1269
         pos=seek_pos;
 
1270
      }
 
1271
      if(mode==PUT)
 
1272
         pos+=in_buffer;
 
1273
   }
 
1274
   else
 
1275
   {
 
1276
      seek_pos=pos;
 
1277
   }
 
1278
}
 
1279
 
 
1280
int FileCopyPeerFDStream::getfd()
 
1281
{
 
1282
   if(stream->fd!=-1)
 
1283
      return stream->fd;
 
1284
   int fd=stream->getfd();
 
1285
   if(fd==-1)
 
1286
   {
 
1287
      if(stream->error())
 
1288
      {
 
1289
         SetError(stream->error_text);
 
1290
         current->Timeout(0);
 
1291
      }
 
1292
      else
 
1293
      {
 
1294
         current->TimeoutS(1);
 
1295
      }
 
1296
      return -1;
 
1297
   }
 
1298
   stream->clear_status();
 
1299
   pos=0;
 
1300
   if(mode==PUT)
 
1301
      pos+=in_buffer;
 
1302
   Seek_LL();
 
1303
   return fd;
 
1304
}
 
1305
int FileCopyPeerFDStream::Do()
 
1306
{
 
1307
   int m=STALL;
 
1308
   if(Done() || Error())
 
1309
      return m;
 
1310
   switch(mode)
 
1311
   {
 
1312
   case PUT:
 
1313
      if(in_buffer==0)
 
1314
      {
 
1315
         if(eof)
 
1316
         {
 
1317
            if(!date_set && date!=NO_DATE && do_set_date)
 
1318
            {
 
1319
               if(date==NO_DATE_YET)
 
1320
                  return m;
 
1321
               if(getfd()==-1)
 
1322
                  return m;
 
1323
               stream->setmtime(date);
 
1324
               date_set=true;
 
1325
               m=MOVED;
 
1326
            }
 
1327
            if(stream && delete_stream && !stream->Done())
 
1328
               return m;
 
1329
            done=true;
 
1330
            return MOVED;
 
1331
         }
 
1332
         if(seek_pos==0)
 
1333
            return m;
 
1334
      }
 
1335
      if(!write_allowed)
 
1336
         return m;
 
1337
      while(in_buffer>0)
 
1338
      {
 
1339
         if(!eof && in_buffer<PUT_LL_MIN
 
1340
         && put_ll_timer && !put_ll_timer->Stopped())
 
1341
            break;
 
1342
         int res=Put_LL(buffer+buffer_ptr,in_buffer);
 
1343
         if(res>0)
 
1344
         {
 
1345
            in_buffer-=res;
 
1346
            buffer_ptr+=res;
 
1347
            m=MOVED;
 
1348
         }
 
1349
         if(res<0)
 
1350
            return MOVED;
 
1351
         if(res==0)
 
1352
            break;
 
1353
      }
 
1354
      break;
 
1355
 
 
1356
   case GET:
 
1357
      if(eof)
 
1358
         return m;
 
1359
      while(in_buffer<GET_BUFSIZE)
 
1360
      {
 
1361
         int res=Get_LL(GET_BUFSIZE);
 
1362
         if(res>0)
 
1363
         {
 
1364
            in_buffer+=res;
 
1365
            SaveMaxCheck(0);
 
1366
            m=MOVED;
 
1367
         }
 
1368
         if(res<0)
 
1369
            return MOVED;
 
1370
         if(eof)
 
1371
            return MOVED;
 
1372
         if(res==0)
 
1373
            break;
 
1374
      }
 
1375
      break;
 
1376
   }
 
1377
   return m;
 
1378
}
 
1379
 
 
1380
bool FileCopyPeerFDStream::IOReady()
 
1381
{
 
1382
   return seek_pos==pos || stream->fd!=-1;
 
1383
}
 
1384
 
 
1385
void FileCopyPeerFDStream::Seek(off_t new_pos)
 
1386
{
 
1387
   if(pos==new_pos)
 
1388
      return;
 
1389
#ifndef NATIVE_CRLF
 
1390
   if(ascii && new_pos!=0)
 
1391
   {
 
1392
      // it is possible to read file to determine right position,
 
1393
      // but it is costly.
 
1394
      can_seek=false;
 
1395
      // can_seek0 is still true.
 
1396
      return;
 
1397
   }
 
1398
#endif
 
1399
   super::Seek(new_pos);
 
1400
   int fd=stream->fd;
 
1401
   if(fd==-1)
 
1402
   {
 
1403
      if(seek_pos!=FILE_END)
 
1404
      {
 
1405
         pos=seek_pos+(mode==PUT)*in_buffer;
 
1406
         return;
 
1407
      }
 
1408
      else
 
1409
      {
 
1410
         off_t s=stream->get_size();
 
1411
         if(s!=-1)
 
1412
         {
 
1413
            SetSize(s);
 
1414
            pos=seek_pos+(mode==PUT)*in_buffer;
 
1415
            return;
 
1416
         }
 
1417
         else
 
1418
         {
 
1419
            // ok, have to try getfd.
 
1420
            fd=getfd();
 
1421
         }
 
1422
      }
 
1423
      if(fd==-1)
 
1424
         return;
 
1425
   }
 
1426
   Seek_LL();
 
1427
}
 
1428
 
 
1429
int FileCopyPeerFDStream::Get_LL(int len)
 
1430
{
 
1431
   int res=0;
 
1432
 
 
1433
   int fd=getfd();
 
1434
   if(fd==-1)
 
1435
      return 0;
 
1436
 
 
1437
   if((want_date && date==NO_DATE_YET)
 
1438
   || (want_size && size==NO_SIZE_YET))
 
1439
   {
 
1440
      struct stat st;
 
1441
      if(fstat(fd,&st)==-1)
 
1442
      {
 
1443
         SetDate(NO_DATE);
 
1444
         SetSize(NO_SIZE);
 
1445
      }
 
1446
      else
 
1447
      {
 
1448
         SetDate(st.st_mtime);
 
1449
         SetSize(st.st_size);
 
1450
      }
 
1451
   }
 
1452
 
 
1453
#ifndef NATIVE_CRLF
 
1454
   if(ascii)
 
1455
      Allocate(len*2);
 
1456
   else
 
1457
#endif
 
1458
      Allocate(len);
 
1459
 
 
1460
   if(need_seek)  // this does not combine with ascii.
 
1461
      lseek(fd,seek_base+pos,SEEK_SET);
 
1462
 
 
1463
   res=read(fd,buffer+buffer_ptr+in_buffer,len);
 
1464
   if(res==-1)
 
1465
   {
 
1466
      if(E_RETRY(errno))
 
1467
      {
 
1468
         Block(fd,POLLIN);
 
1469
         return 0;
 
1470
      }
 
1471
      if(stream->NonFatalError(errno))
 
1472
         return 0;
 
1473
      stream->MakeErrorText();
 
1474
      SetError(stream->error_text);
 
1475
      return -1;
 
1476
   }
 
1477
   stream->clear_status();
 
1478
 
 
1479
#ifndef NATIVE_CRLF
 
1480
   if(ascii)
 
1481
   {
 
1482
      char *p=buffer+buffer_ptr+in_buffer;
 
1483
      for(int i=res; i>0; i--)
 
1484
      {
 
1485
         if(*p=='\n')
 
1486
         {
 
1487
            memmove(p+1,p,i);
 
1488
            *p++='\r';
 
1489
            res++;
 
1490
         }
 
1491
         p++;
 
1492
      }
 
1493
   }
 
1494
#endif
 
1495
 
 
1496
   if(res==0)
 
1497
      eof=true;
 
1498
   return res;
 
1499
}
 
1500
 
 
1501
int FileCopyPeerFDStream::Put_LL(const char *buf,int len)
 
1502
{
 
1503
   if(len==0)
 
1504
      return 0;
 
1505
 
 
1506
   int fd=getfd();
 
1507
   if(fd==-1)
 
1508
      return 0;
 
1509
 
 
1510
   int skip_cr=0;
 
1511
 
 
1512
#ifndef NATIVE_CRLF
 
1513
   if(ascii)
 
1514
   {
 
1515
      // find where line ends.
 
1516
      const char *cr=buf;
 
1517
      for(;;)
 
1518
      {
 
1519
         cr=(const char *)memchr(cr,'\r',len-(cr-buf));
 
1520
         if(!cr)
 
1521
            break;
 
1522
         if(cr-buf<len-1 && cr[1]=='\n')
 
1523
         {
 
1524
            skip_cr=1;
 
1525
            len=cr-buf;
 
1526
            break;
 
1527
         }
 
1528
         if(cr-buf==len-1)
 
1529
         {
 
1530
            if(eof)
 
1531
               break;
 
1532
            len--;
 
1533
            break;
 
1534
         }
 
1535
         cr++;
 
1536
      }
 
1537
   }
 
1538
#endif   // NATIVE_CRLF
 
1539
 
 
1540
   if(len==0)
 
1541
      return skip_cr;
 
1542
 
 
1543
   if(need_seek)  // this does not combine with ascii.
 
1544
      lseek(fd,seek_base+pos-in_buffer,SEEK_SET);
 
1545
 
 
1546
   int res=write(fd,buf,len);
 
1547
   if(res<0)
 
1548
   {
 
1549
      if(E_RETRY(errno))
 
1550
      {
 
1551
         Block(fd,POLLOUT);
 
1552
         return 0;
 
1553
      }
 
1554
      if(errno==EPIPE)
 
1555
      {
 
1556
         broken=true;
 
1557
         in_buffer=0;
 
1558
         eof=true;
 
1559
         return -1;
 
1560
      }
 
1561
      if(stream->NonFatalError(errno))
 
1562
      {
 
1563
         // in case of full disk, check file correctness.
 
1564
         if(errno==ENOSPC && can_seek)
 
1565
         {
 
1566
            struct stat st;
 
1567
            if(fstat(fd,&st)!=-1)
 
1568
            {
 
1569
               if(st.st_size<seek_base+pos-in_buffer)
 
1570
               {
 
1571
                  // workaround solaris nfs bug. It can lose data if disk is full.
 
1572
                  if(buffer_ptr>=seek_base+pos-in_buffer-buffer_ptr-st.st_size)
 
1573
                     UnSkip(seek_base+pos-in_buffer-st.st_size);
 
1574
                  else
 
1575
                  {
 
1576
                     Empty();
 
1577
                     pos=st.st_size;
 
1578
                  }
 
1579
               }
 
1580
            }
 
1581
         }
 
1582
         return 0;
 
1583
      }
 
1584
      stream->MakeErrorText();
 
1585
      SetError(stream->error_text);
 
1586
      return -1;
 
1587
   }
 
1588
   stream->clear_status();
 
1589
   if(res==len && skip_cr)
 
1590
   {
 
1591
      res+=skip_cr;
 
1592
      // performance gets worse because of writing a single char,
 
1593
      // but leaving uncomplete line on screen allows mixing it with debug text.
 
1594
      if(write(fd,"\n",1)==1)
 
1595
         res+=1;
 
1596
   }
 
1597
   if(put_ll_timer)
 
1598
      put_ll_timer->Reset();
 
1599
   return res;
 
1600
}
 
1601
FgData *FileCopyPeerFDStream::GetFgData(bool fg)
 
1602
{
 
1603
   if(!delete_stream || !create_fg_data)
 
1604
      return 0;   // if we don't own the stream, don't create FgData.
 
1605
   if(stream->GetProcGroup())
 
1606
      return new FgData(stream->GetProcGroup(),fg);
 
1607
   return 0;
 
1608
}
 
1609
 
 
1610
void FileCopyPeerFDStream::RemoveFile()
 
1611
{
 
1612
   stream->remove();
 
1613
   removing=false;   // it is instant.
 
1614
   file_removed=true;
 
1615
   Suspend();
 
1616
   current->Timeout(0);
 
1617
}
 
1618
 
 
1619
const char *FileCopyPeerFDStream::GetStatus()
 
1620
{
 
1621
   return stream->status;
 
1622
}
 
1623
void FileCopyPeerFDStream::Kill(int sig)
 
1624
{
 
1625
   stream->Kill(sig);
 
1626
}
 
1627
 
 
1628
 
 
1629
FileCopyPeerFDStream *FileCopyPeerFDStream::NewPut(const char *file,bool cont)
 
1630
{
 
1631
   return new FileCopyPeerFDStream(new FileStream(file,O_WRONLY|O_CREAT
 
1632
                                    |(cont?0:O_TRUNC)),FileCopyPeer::PUT);
 
1633
}
 
1634
FileCopyPeerFDStream *FileCopyPeerFDStream::NewGet(const char *file)
 
1635
{
 
1636
   return new FileCopyPeerFDStream(new FileStream(file,O_RDONLY),
 
1637
                                    FileCopyPeer::GET);
 
1638
}
 
1639
 
 
1640
 
 
1641
// FileCopyPeerDirList
 
1642
FileCopyPeerDirList::FileCopyPeerDirList(FA *s,ArgV *v)
 
1643
   : FileCopyPeer(GET)
 
1644
{
 
1645
   session=s;
 
1646
   dl=session->MakeDirList(v);
 
1647
   if(dl==0)
 
1648
      eof=true;
 
1649
   can_seek=false;
 
1650
   can_seek0=false;
 
1651
}
 
1652
 
 
1653
FileCopyPeerDirList::~FileCopyPeerDirList()
 
1654
{
 
1655
   Delete(dl);
 
1656
   SessionPool::Reuse(session);
 
1657
}
 
1658
 
 
1659
int FileCopyPeerDirList::Do()
 
1660
{
 
1661
   if(Done())
 
1662
      return STALL;
 
1663
   if(dl->Error())
 
1664
   {
 
1665
      SetError(dl->ErrorText());
 
1666
      return MOVED;
 
1667
   }
 
1668
 
 
1669
   const char *b;
 
1670
   int s;
 
1671
   dl->Get(&b,&s);
 
1672
   if(b==0) // eof
 
1673
   {
 
1674
      eof=true;
 
1675
      return MOVED;
 
1676
   }
 
1677
   if(s==0)
 
1678
      return STALL;
 
1679
   Allocate(s);
 
1680
   memmove(buffer+buffer_ptr+in_buffer,b,s);
 
1681
   in_buffer+=s;
 
1682
   dl->Skip(s);
 
1683
   return MOVED;
 
1684
}
 
1685
 
 
1686
FileCopyPeerOutputJob::FileCopyPeerOutputJob(OutputJob *new_o)
 
1687
   : FileCopyPeer(PUT)
 
1688
{
 
1689
   o=new_o;
 
1690
   DontCopyDate();
 
1691
}
 
1692
 
 
1693
int FileCopyPeerOutputJob::Put_LL(const char *buf,int len)
 
1694
{
 
1695
   off_t io_at=pos;
 
1696
   if(GetRealPos()!=io_at) // GetRealPos can alter pos.
 
1697
      return 0;
 
1698
 
 
1699
   if(len==0 && eof)
 
1700
      return 0;
 
1701
 
 
1702
   if(o->Full())
 
1703
      return 0;
 
1704
 
 
1705
   o->Put(buf,len);
 
1706
 
 
1707
   seek_pos+=len; // mainly to indicate that there was some output.
 
1708
   return len;
 
1709
}
 
1710
 
 
1711
int FileCopyPeerOutputJob::Do()
 
1712
{
 
1713
   if(o->Error())
 
1714
   {
 
1715
      broken=true;
 
1716
      return MOVED;
 
1717
   }
 
1718
 
 
1719
   if(eof && !in_buffer)
 
1720
   {
 
1721
      done=true;
 
1722
      return MOVED;
 
1723
   }
 
1724
 
 
1725
   int m=STALL;
 
1726
 
 
1727
   if(!write_allowed)
 
1728
      return m;
 
1729
 
 
1730
   while(in_buffer>0)
 
1731
   {
 
1732
      int res=Put_LL(buffer+buffer_ptr,in_buffer);
 
1733
      if(res>0)
 
1734
      {
 
1735
         in_buffer-=res;
 
1736
         buffer_ptr+=res;
 
1737
         m=MOVED;
 
1738
      }
 
1739
      if(res<0)
 
1740
         return MOVED;
 
1741
      if(res==0)
 
1742
         break;
 
1743
   }
 
1744
   return m;
 
1745
}
 
1746
 
 
1747
void FileCopyPeerOutputJob::Fg()
 
1748
{
 
1749
   o->Fg();
 
1750
   FileCopyPeer::Fg();
 
1751
}
 
1752
void FileCopyPeerOutputJob::Bg()
 
1753
{
 
1754
   o->Bg();
 
1755
   FileCopyPeer::Bg();
 
1756
}
 
1757
 
 
1758
// special pointer to creator of ftp/ftp copier. It is init'ed in Ftp class.
 
1759
FileCopy *(*FileCopy::fxp_create)(FileCopyPeer *src,FileCopyPeer *dst,bool cont);