~ubuntu-branches/debian/sid/nordugrid-arc/sid

« back to all changes in this revision

Viewing changes to src/hed/libs/data/DataMover.cpp

  • Committer: Package Import Robot
  • Author(s): Mattias Ellert
  • Date: 2012-12-13 16:41:31 UTC
  • mfrom: (1.1.5)
  • Revision ID: package-import@ubuntu.com-20121213164131-0fumka0jar8mxm07
Tags: 2.0.1-1
* 2.0.1 Release
* Drop patches accepted upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
200
200
                                 min_average_speed, time_t max_inactivity_time,
201
201
                                 DataMover::callback cb, void *arg,
202
202
                                 const char *prefix) {
 
203
    class DataPointStopper {
 
204
     private:
 
205
      DataPoint& point_;
 
206
     public:
 
207
      DataPointStopper(DataPoint& p):point_(p) {};
 
208
      ~DataPointStopper(void) {
 
209
        point_.StopReading();
 
210
        point_.FinishReading();
 
211
        point_.StopWriting();
 
212
        point_.FinishWriting();
 
213
      };
 
214
    };
203
215
 
204
216
    if (cb != NULL) {
205
217
      logger.msg(VERBOSE, "DataMover::Transfer : starting new thread");
365
377
            break;
366
378
          if (!destination.IsIndex()) {
367
379
            // pfn has chance to be overwritten directly
368
 
            logger.msg(ERROR, "Failed to delete %s but will still try to upload", del_url.str());
 
380
            logger.msg(WARNING, "Failed to delete %s but will still try to copy", del_url.str());
369
381
            break;
370
382
          }
371
383
          logger.msg(INFO, "Failed to delete %s", del_url.str());
419
431
        /* out of tries */
420
432
        return res;
421
433
      }
422
 
      // By putting DataBuffer here, one makes sure it will be always
423
 
      // destroyed AFTER all DataHandle. This allows for not bothering
424
 
      // to call stop_reading/stop_writing because they are called in
425
 
      // destructor of DataHandle.
426
434
      DataBuffer buffer;
 
435
      // Make sure any transfer is stopped before buffer is destroyed
 
436
      DataPointStopper source_stop(source);
 
437
      DataPointStopper destination_stop(destination);
427
438
      logger.msg(INFO, "Real transfer from %s to %s", source.CurrentLocation().str(), destination.CurrentLocation().str());
428
439
      /* creating handler for transfer */
429
440
      source.SetSecure(force_secure);
500
511
 
501
512
      /* create buffer and tune speed control */
502
513
      buffer.set(&crc, bufsize, bufnum);
503
 
      if (!buffer)
504
 
        logger.msg(INFO, "Buffer creation failed !");
 
514
      if (!buffer) logger.msg(INFO, "Buffer creation failed !");
505
515
      buffer.speed.set_min_speed(min_speed, min_speed_time);
506
516
      buffer.speed.set_min_average_speed(min_average_speed);
507
517
      buffer.speed.set_max_inactivity_time(max_inactivity_time);
962
972
          if (destination.NextLocation())
963
973
            logger.msg(VERBOSE, "(Re)Trying next destination");
964
974
          // check for error from callbacks etc
965
 
          if(destination.GetFailureReason() != DataStatus::UnknownError)
 
975
          if(destination.GetFailureReason() != DataStatus::UnknownError) {
966
976
            res=destination.GetFailureReason();
967
 
          else
 
977
          } else {
968
978
            res=DataStatus::WriteError;
 
979
          }
969
980
        }
970
981
        else if (buffer.error_transfer()) {
971
982
          // Here is more complicated case - operation timeout
974
985
          if (!buffer.for_read()) {
975
986
            // No free buffers for 'read' side. Buffer must be full.
976
987
            res.SetDesc(destination.GetFailureReason().GetDesc());
977
 
            if (destination.NextLocation())
 
988
            if (destination.NextLocation()) {
978
989
              logger.msg(VERBOSE, "(Re)Trying next destination");
 
990
            }
979
991
          }
980
992
          else if (!buffer.for_write()) {
981
993
            // Buffer is empty
982
994
            res.SetDesc(source.GetFailureReason().GetDesc());
983
 
            if (source.NextLocation())
 
995
            if (source.NextLocation()) {
984
996
              logger.msg(VERBOSE, "(Re)Trying next source");
 
997
            }
985
998
          }
986
999
          else {
987
1000
            // Both endpoints were very slow? Choose randomly.
989
1002
            Glib::Rand r;
990
1003
            if (r.get_int() < (RAND_MAX / 2)) {
991
1004
              res.SetDesc(source.GetFailureReason().GetDesc());
992
 
              if (source.NextLocation())
 
1005
              if (source.NextLocation()) {
993
1006
                logger.msg(VERBOSE, "(Re)Trying next source");
 
1007
              }
994
1008
            }
995
1009
            else {
996
1010
              res.SetDesc(destination.GetFailureReason().GetDesc());
997
 
              if (destination.NextLocation())
 
1011
              if (destination.NextLocation()) {
998
1012
                logger.msg(VERBOSE, "(Re)Trying next destination");
 
1013
              }
999
1014
            }
1000
1015
          }
1001
1016
        }
1029
1044
          logger.msg(ERROR, "Checksum mismatch between checksum given as meta option (%s:%s) and calculated checksum (%s)",
1030
1045
              destination.GetURL().MetaDataOption("checksumtype"), destination.GetURL().MetaDataOption("checksumvalue"), calc_csum);
1031
1046
#ifndef WIN32
1032
 
          if (cacheable)
 
1047
          if (cacheable) {
1033
1048
            cache.StopAndDelete(canonic_url);
 
1049
          }
1034
1050
#endif
1035
 
          if (!destination.Unregister(replication || destination_meta_initially_stored))
 
1051
          if (!destination.Unregister(replication || destination_meta_initially_stored)) {
1036
1052
            logger.msg(WARNING, "Failed to unregister preregistered lfn, You may need to unregister it manually");
 
1053
          }
1037
1054
          res = DataStatus(DataStatus::TransferError, "Checksum mismatch");
1038
 
          if (!Delete(destination, true))
 
1055
          if (!Delete(destination, true)) {
1039
1056
            logger.msg(WARNING, "Failed to delete destination, retry may fail");
1040
 
          if (destination.NextLocation())
 
1057
          }
 
1058
          if (destination.NextLocation()) {
1041
1059
            logger.msg(VERBOSE, "(Re)Trying next destination");
 
1060
          }
1042
1061
          continue;
1043
1062
        }
1044
1063
        if (source.CheckCheckSum()) {
1045
1064
          std::string src_csum_s(source.GetCheckSum());
1046
 
          if (src_csum_s.find(':') == src_csum_s.length() -1)
 
1065
          if (src_csum_s.find(':') == src_csum_s.length() -1) {
1047
1066
            logger.msg(VERBOSE, "Cannot compare empty checksum");
1048
 
          else if (calc_csum.substr(0, calc_csum.find(":")) != src_csum_s.substr(0, src_csum_s.find(":")))
 
1067
          } else if (calc_csum.substr(0, calc_csum.find(":")) != src_csum_s.substr(0, src_csum_s.find(":"))) {
1049
1068
            logger.msg(VERBOSE, "Checksum type of source and calculated checksum differ, cannot compare");
1050
 
          else if (calc_csum.substr(calc_csum.find(":")) != src_csum_s.substr(src_csum_s.find(":"))) {
 
1069
          } else if (calc_csum.substr(calc_csum.find(":")) != src_csum_s.substr(src_csum_s.find(":"))) {
1051
1070
            logger.msg(ERROR, "Checksum mismatch between calcuated checksum %s and source checksum %s", calc_csum, source.GetCheckSum());
1052
1071
#ifndef WIN32
1053
 
            if(cacheable)
 
1072
            if(cacheable) {
1054
1073
              cache.StopAndDelete(canonic_url);
 
1074
            }
1055
1075
#endif
1056
1076
            res = DataStatus::TransferError;
1057
 
            if (source.NextLocation())
 
1077
            if (source.NextLocation()) {
1058
1078
              logger.msg(VERBOSE, "(Re)Trying next source");
 
1079
            }
1059
1080
            continue;
1060
1081
          }
1061
 
          else
 
1082
          else {
1062
1083
            logger.msg(VERBOSE, "Calculated transfer checksum %s matches source checksum", calc_csum);
 
1084
          }
1063
1085
        }
1064
1086
        // set the destination checksum to be what we calculated
1065
1087
        destination.SetCheckSum(calc_csum.c_str());