200
200
min_average_speed, time_t max_inactivity_time,
201
201
DataMover::callback cb, void *arg,
202
202
const char *prefix) {
203
class DataPointStopper {
207
DataPointStopper(DataPoint& p):point_(p) {};
208
~DataPointStopper(void) {
209
point_.StopReading();
210
point_.FinishReading();
211
point_.StopWriting();
212
point_.FinishWriting();
204
216
if (cb != NULL) {
205
217
logger.msg(VERBOSE, "DataMover::Transfer : starting new thread");
366
378
if (!destination.IsIndex()) {
367
379
// pfn has chance to be overwritten directly
368
logger.msg(ERROR, "Failed to delete %s but will still try to upload", del_url.str());
380
logger.msg(WARNING, "Failed to delete %s but will still try to copy", del_url.str());
371
383
logger.msg(INFO, "Failed to delete %s", del_url.str());
419
431
/* out of tries */
422
// By putting DataBuffer here, one makes sure it will be always
423
// destroyed AFTER all DataHandle. This allows for not bothering
424
// to call stop_reading/stop_writing because they are called in
425
// destructor of DataHandle.
426
434
DataBuffer buffer;
435
// Make sure any transfer is stopped before buffer is destroyed
436
DataPointStopper source_stop(source);
437
DataPointStopper destination_stop(destination);
427
438
logger.msg(INFO, "Real transfer from %s to %s", source.CurrentLocation().str(), destination.CurrentLocation().str());
428
439
/* creating handler for transfer */
429
440
source.SetSecure(force_secure);
501
512
/* create buffer and tune speed control */
502
513
buffer.set(&crc, bufsize, bufnum);
504
logger.msg(INFO, "Buffer creation failed !");
514
if (!buffer) logger.msg(INFO, "Buffer creation failed !");
505
515
buffer.speed.set_min_speed(min_speed, min_speed_time);
506
516
buffer.speed.set_min_average_speed(min_average_speed);
507
517
buffer.speed.set_max_inactivity_time(max_inactivity_time);
962
972
if (destination.NextLocation())
963
973
logger.msg(VERBOSE, "(Re)Trying next destination");
964
974
// check for error from callbacks etc
965
if(destination.GetFailureReason() != DataStatus::UnknownError)
975
if(destination.GetFailureReason() != DataStatus::UnknownError) {
966
976
res=destination.GetFailureReason();
968
978
res=DataStatus::WriteError;
970
981
else if (buffer.error_transfer()) {
971
982
// Here is more complicated case - operation timeout
974
985
if (!buffer.for_read()) {
975
986
// No free buffers for 'read' side. Buffer must be full.
976
987
res.SetDesc(destination.GetFailureReason().GetDesc());
977
if (destination.NextLocation())
988
if (destination.NextLocation()) {
978
989
logger.msg(VERBOSE, "(Re)Trying next destination");
980
992
else if (!buffer.for_write()) {
981
993
// Buffer is empty
982
994
res.SetDesc(source.GetFailureReason().GetDesc());
983
if (source.NextLocation())
995
if (source.NextLocation()) {
984
996
logger.msg(VERBOSE, "(Re)Trying next source");
987
1000
// Both endpoints were very slow? Choose randomly.
990
1003
if (r.get_int() < (RAND_MAX / 2)) {
991
1004
res.SetDesc(source.GetFailureReason().GetDesc());
992
if (source.NextLocation())
1005
if (source.NextLocation()) {
993
1006
logger.msg(VERBOSE, "(Re)Trying next source");
996
1010
res.SetDesc(destination.GetFailureReason().GetDesc());
997
if (destination.NextLocation())
1011
if (destination.NextLocation()) {
998
1012
logger.msg(VERBOSE, "(Re)Trying next destination");
1029
1044
logger.msg(ERROR, "Checksum mismatch between checksum given as meta option (%s:%s) and calculated checksum (%s)",
1030
1045
destination.GetURL().MetaDataOption("checksumtype"), destination.GetURL().MetaDataOption("checksumvalue"), calc_csum);
1033
1048
cache.StopAndDelete(canonic_url);
1035
if (!destination.Unregister(replication || destination_meta_initially_stored))
1051
if (!destination.Unregister(replication || destination_meta_initially_stored)) {
1036
1052
logger.msg(WARNING, "Failed to unregister preregistered lfn, You may need to unregister it manually");
1037
1054
res = DataStatus(DataStatus::TransferError, "Checksum mismatch");
1038
if (!Delete(destination, true))
1055
if (!Delete(destination, true)) {
1039
1056
logger.msg(WARNING, "Failed to delete destination, retry may fail");
1040
if (destination.NextLocation())
1058
if (destination.NextLocation()) {
1041
1059
logger.msg(VERBOSE, "(Re)Trying next destination");
1044
1063
if (source.CheckCheckSum()) {
1045
1064
std::string src_csum_s(source.GetCheckSum());
1046
if (src_csum_s.find(':') == src_csum_s.length() -1)
1065
if (src_csum_s.find(':') == src_csum_s.length() -1) {
1047
1066
logger.msg(VERBOSE, "Cannot compare empty checksum");
1048
else if (calc_csum.substr(0, calc_csum.find(":")) != src_csum_s.substr(0, src_csum_s.find(":")))
1067
} else if (calc_csum.substr(0, calc_csum.find(":")) != src_csum_s.substr(0, src_csum_s.find(":"))) {
1049
1068
logger.msg(VERBOSE, "Checksum type of source and calculated checksum differ, cannot compare");
1050
else if (calc_csum.substr(calc_csum.find(":")) != src_csum_s.substr(src_csum_s.find(":"))) {
1069
} else if (calc_csum.substr(calc_csum.find(":")) != src_csum_s.substr(src_csum_s.find(":"))) {
1051
1070
logger.msg(ERROR, "Checksum mismatch between calcuated checksum %s and source checksum %s", calc_csum, source.GetCheckSum());
1054
1073
cache.StopAndDelete(canonic_url);
1056
1076
res = DataStatus::TransferError;
1057
if (source.NextLocation())
1077
if (source.NextLocation()) {
1058
1078
logger.msg(VERBOSE, "(Re)Trying next source");
1062
1083
logger.msg(VERBOSE, "Calculated transfer checksum %s matches source checksum", calc_csum);
1064
1086
// set the destination checksum to be what we calculated
1065
1087
destination.SetCheckSum(calc_csum.c_str());