~ubuntu-branches/debian/sid/nordugrid-arc/sid

« back to all changes in this revision

Viewing changes to src/hed/dmc/gfal/DataPointGFAL.cpp

  • Committer: Package Import Robot
  • Author(s): Mattias Ellert
  • Date: 2012-12-13 16:41:31 UTC
  • mfrom: (1.1.5)
  • Revision ID: package-import@ubuntu.com-20121213164131-0fumka0jar8mxm07
Tags: 2.0.1-1
* 2.0.1 Release
* Drop patches accepted upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
 
34
34
namespace Arc {
35
35
 
36
 
 
37
 
  static char const * const stdfds[] = {
38
 
    "stdin",
39
 
    "stdout",
40
 
    "stderr"
 
36
  /// Class for locking environment while calling gfal functions.
 
37
  class GFALEnvLocker: public CertEnvLocker {
 
38
  public:
 
39
    static Logger logger;
 
40
    GFALEnvLocker(const UserConfig& usercfg, const std::string& lfc_host): CertEnvLocker(usercfg) {
 
41
      EnvLockUnwrap(false);
 
42
      // if root, we have to set X509_USER_CERT and X509_USER_KEY to
 
43
      // X509_USER_PROXY to force GFAL to use the proxy. If they are undefined
 
44
      // it uses the host cert and key.
 
45
      if (getuid() == 0 && !GetEnv("X509_USER_PROXY").empty()) {
 
46
        SetEnv("X509_USER_KEY", GetEnv("X509_USER_PROXY"), true);
 
47
        SetEnv("X509_USER_CERT", GetEnv("X509_USER_PROXY"), true);
 
48
      }
 
49
      logger.msg(DEBUG, "Using proxy %s", GetEnv("X509_USER_PROXY"));
 
50
      logger.msg(DEBUG, "Using key %s", GetEnv("X509_USER_KEY"));
 
51
      logger.msg(DEBUG, "Using cert %s", GetEnv("X509_USER_CERT"));
 
52
 
 
53
      if (!lfc_host.empty()) {
 
54
        // set LFC retry env variables (don't overwrite if set already)
 
55
        // connection timeout
 
56
        SetEnv("LFC_CONNTIMEOUT", "30", false);
 
57
        // number of retries
 
58
        SetEnv("LFC_CONRETRY", "1", false);
 
59
        // interval between retries
 
60
        SetEnv("LFC_CONRETRYINT", "10", false);
 
61
 
 
62
        // set host name env var
 
63
        SetEnv("LFC_HOST", lfc_host);
 
64
      }
 
65
 
 
66
      EnvLockWrap(false);
 
67
    }
41
68
  };
42
69
 
 
70
  Logger GFALEnvLocker::logger(Logger::getRootLogger(), "GFALEnvLocker");
 
71
 
43
72
  Logger DataPointGFAL::logger(Logger::getRootLogger(), "DataPoint.GFAL");
44
73
 
45
 
  DataPointGFAL::DataPointGFAL(const URL& url, const UserConfig& usercfg, PluginArgument* parg)
46
 
    : DataPointDirect(url, usercfg, parg), reading(false), writing(false) {
 
74
  DataPointGFAL::DataPointGFAL(const URL& u, const UserConfig& usercfg, PluginArgument* parg)
 
75
    : DataPointDirect(u, usercfg, parg), fd(-1), reading(false), writing(false) {
47
76
      LogLevel loglevel = logger.getThreshold();
48
77
      if (loglevel == DEBUG)
49
 
        gfal_set_verbose (GFAL_VERBOSE_VERBOSE | GFAL_VERBOSE_TRACE);
 
78
        gfal_set_verbose (GFAL_VERBOSE_VERBOSE | GFAL_VERBOSE_DEBUG | GFAL_VERBOSE_TRACE);
50
79
      if (loglevel == VERBOSE)
51
80
        gfal_set_verbose (GFAL_VERBOSE_VERBOSE);
 
81
      // lfc:// needs to be converted to lfn:/path or guid:abcd...
 
82
      if (url.Protocol() == "lfc") {
 
83
        lfc_host = url.Host();
 
84
        url.ChangeHost("");
 
85
        url.ChangePort(-1);
 
86
        if (url.MetaDataOption("guid").empty()) {
 
87
          url.ChangeProtocol("lfn");
 
88
        }
 
89
        else {
 
90
          url.ChangeProtocol("guid");
 
91
          // To fix: this call forces leading / on path
 
92
          url.ChangePath(url.MetaDataOption("guid"));
 
93
        }
 
94
      }
52
95
  }
53
96
 
54
97
  DataPointGFAL::~DataPointGFAL() {
60
103
    DataPointPluginArgument *dmcarg = dynamic_cast<DataPointPluginArgument*>(arg);
61
104
    if (!dmcarg)
62
105
      return NULL;
63
 
    if (((const URL &)(*dmcarg)).Protocol() != "rfio")
 
106
    if (((const URL &)(*dmcarg)).Protocol() != "rfio" &&
 
107
        ((const URL &)(*dmcarg)).Protocol() != "dcap" &&
 
108
        ((const URL &)(*dmcarg)).Protocol() != "gsidcap" &&
 
109
        ((const URL &)(*dmcarg)).Protocol() != "root" &&
 
110
        ((const URL &)(*dmcarg)).Protocol() != "gsiftp" &&
 
111
        ((const URL &)(*dmcarg)).Protocol() != "srm" &&
 
112
        ((const URL &)(*dmcarg)).Protocol() != "lfc")
64
113
      return NULL;
65
114
    return new DataPointGFAL(*dmcarg, *dmcarg, dmcarg);
66
115
  }
71
120
    reading = true;
72
121
    
73
122
    // Open the file
74
 
    if ((fd = gfal_open(url.plainstr().c_str(), O_RDONLY, 0)) < 0) {
 
123
    {
 
124
      GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
125
      fd = gfal_open(url.plainstr().c_str(), O_RDONLY, 0);
 
126
    }
 
127
    if (fd < 0) {
75
128
      logger.msg(ERROR, "gfal_open failed: %s", StrError(errno));
 
129
      log_gfal_err();
76
130
      reading = false;
77
131
      return DataStatus::ReadStartError;
78
132
    }
81
135
    buffer = &buf;
82
136
    // StopReading will wait for this condition,
83
137
    // which will be signalled by the separate reading thread
84
 
    transfer_condition.reset();
85
138
    // Create the separate reading thread
86
 
    if (!CreateThreadFunction(&DataPointGFAL::read_file_start, this)) {
 
139
    if (!CreateThreadFunction(&DataPointGFAL::read_file_start, this, &transfer_condition)) {
87
140
      logger.msg(ERROR, "Failed to create reading thread");
88
141
      if (fd != -1) {
89
142
        if (gfal_close(fd) < 0) {
90
 
          logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
 
143
          logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
91
144
        }
92
145
      }
93
146
      reading = false;
110
163
      if (!buffer->for_read(handle, length, true)) {
111
164
        buffer->error_read(true);
112
165
        break;
113
 
      }      
114
 
      
 
166
      }
 
167
 
115
168
      // Read into the buffer
116
169
      bytes_read = gfal_read(fd, (*(buffer))[handle], length);
117
170
            
118
171
      // If there was an error
119
172
      if (bytes_read < 0) {
120
173
        logger.msg(ERROR, "gfal_read failed: %s", StrError(errno));
 
174
        log_gfal_err();
121
175
        buffer->error_read(true);
122
176
        break;
123
177
      }
139
193
    // Close the file
140
194
    if (fd != -1) {
141
195
      if (gfal_close(fd) < 0) {
142
 
        logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
 
196
        logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
143
197
      }
 
198
      fd = -1;
144
199
    }
145
 
    // Signal that we finished reading (even if there was an error)
146
 
    transfer_condition.signal();
147
200
  }
148
201
  
149
202
  DataStatus DataPointGFAL::StopReading() {
150
203
    if (!reading) return DataStatus::ReadStopError;
151
204
    reading = false;
152
 
    // If the reading is not finished yet
153
 
    if (!buffer->eof_read()) {
154
 
      // Trigger reading error
155
 
      buffer->error_read(true);
156
 
      // Close the file
157
 
      if (fd != -1) {
158
 
        if (gfal_close(fd) < 0) {
159
 
          logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
160
 
        }
161
 
      }
162
 
    }
 
205
    if (!buffer) return DataStatus::ReadStopError;
 
206
    // If the reading is not finished yet trigger reading error
 
207
    if (!buffer->eof_read()) buffer->error_read(true);
 
208
 
163
209
    // Wait for the reading thread to finish
164
210
    logger.msg(DEBUG, "StopReading starts waiting for transfer_condition.");
165
211
    transfer_condition.wait();
166
212
    logger.msg(DEBUG, "StopReading finished waiting for transfer_condition.");
 
213
 
 
214
    // Close the file if not already done
 
215
    if (fd != -1) {
 
216
      if (gfal_close(fd) < 0) {
 
217
        logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
 
218
      }
 
219
      fd = -1;
 
220
    }
167
221
    // If there was an error (maybe we triggered it)
168
 
    if (buffer->error_read())
 
222
    if (buffer->error_read()) {
 
223
      buffer = NULL;
169
224
      return DataStatus::ReadError;
 
225
    }
170
226
    // If there was no error (the reading already finished)
 
227
    buffer = NULL;
171
228
    return DataStatus::Success;
172
229
  }
173
230
  
176
233
    if (writing) return DataStatus::IsWritingError;
177
234
    writing = true;
178
235
 
179
 
    // Create the parent directory
180
 
    URL parent_url = URL(url.plainstr());
181
 
    parent_url.ChangePath(Glib::path_get_dirname(url.Path()));
182
 
    if (gfal_mkdir(parent_url.plainstr().c_str(), 0700) < 0) {
183
 
      logger.msg(DEBUG, "Failed to create parent directory, continuing anyway: %s", StrError(errno));
 
236
    {
 
237
      GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
238
      // Open the file
 
239
      fd = gfal_open(url.plainstr().c_str(), O_WRONLY | O_CREAT, 0600);
184
240
    }
 
241
    if (fd < 0) {
 
242
      // If no entry try to create parent directories
 
243
      if (errno == ENOENT) {
 
244
        URL parent_url = URL(url.plainstr());
 
245
        // For SRM the path can be given as SFN HTTP option
 
246
        if ((url.Protocol() == "srm" && !url.HTTPOption("SFN").empty())) {
 
247
          parent_url.AddHTTPOption("SFN", Glib::path_get_dirname(url.HTTPOption("SFN")), true);
 
248
        } else {
 
249
          parent_url.ChangePath(Glib::path_get_dirname(url.Path()));
 
250
        }
185
251
 
186
 
    // Open the file
187
 
    if ((fd = gfal_open(url.plainstr().c_str(), O_WRONLY | O_CREAT, 0600)) < 0) {
188
 
      logger.msg(ERROR, "gfal_open failed: %s", StrError(errno));
189
 
      writing = false;
190
 
      return DataStatus::WriteStartError;
 
252
        {
 
253
          GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
254
          // gfal_mkdir is always recursive
 
255
          if (gfal_mkdir(parent_url.plainstr().c_str(), 0700) == 0) {
 
256
            fd = gfal_open(url.plainstr().c_str(), O_WRONLY | O_CREAT, 0600);
 
257
          } else {
 
258
            logger.msg(ERROR, "Failed to create parent directories: ", StrError(errno));
 
259
          }
 
260
        }
 
261
      }
 
262
      if (fd < 0) {
 
263
        logger.msg(ERROR, "gfal_open failed: %s", StrError(errno));
 
264
        log_gfal_err();
 
265
        writing = false;
 
266
        return DataStatus::WriteStartError;
 
267
      }
191
268
    }
192
269
    
193
270
    // Remember the DataBuffer we got, the separate writing thread will use it
194
271
    buffer = &buf;
195
272
    // StopWriting will wait for this condition,
196
273
    // which will be signalled by the separate writing thread
197
 
    transfer_condition.reset();
198
274
    // Create the separate writing thread
199
 
    if (!CreateThreadFunction(&DataPointGFAL::write_file_start, this)) {
 
275
    if (!CreateThreadFunction(&DataPointGFAL::write_file_start, this, &transfer_condition)) {
200
276
      logger.msg(ERROR, "Failed to create writing thread");
201
277
      if (fd != -1) {
202
278
        if (gfal_close(fd) < 0) {
203
 
          logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
 
279
          logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
204
280
        }
205
281
      }
206
282
      writing = false;
232
308
        }
233
309
        break;
234
310
      }      
235
 
      
 
311
 
236
312
      // if the buffer gives different position than we are currently in the
237
313
      // destination, then we have to seek there
238
314
      if (position != offset) {
260
336
      // if there was an error during writing
261
337
      if (bytes_written < 0) {
262
338
        logger.msg(ERROR, "gfal_write failed: %s", StrError(errno));
 
339
        log_gfal_err();
263
340
        buffer->error_write(true);
264
341
        break;
265
342
      }
268
345
    // Close the file
269
346
    if (fd != -1) {
270
347
      if (gfal_close(fd) < 0) {
271
 
        logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
 
348
        logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
272
349
      }
 
350
      fd = -1;
273
351
    }
274
 
    // Signal that we finished writing (even if there was an error)
275
 
    transfer_condition.signal();
276
352
  }
277
353
    
278
354
  DataStatus DataPointGFAL::StopWriting() {
279
355
    if (!writing) return DataStatus::WriteStopError;
280
356
    writing = false;
 
357
    if (!buffer) return DataStatus::WriteStopError;
281
358
    
282
 
    // If the writing is not finished
283
 
    if (!buffer->eof_write()) {
284
 
      // Trigger error
285
 
      buffer->error_write(true);
286
 
      // Close the file
287
 
      if (fd != -1) {
288
 
        if (gfal_close(fd) < 0) {
289
 
          logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
290
 
        }
291
 
      }
292
 
    }
 
359
    // If the writing is not finished, trigger writing error
 
360
    if (!buffer->eof_write()) buffer->error_write(true);
 
361
 
293
362
    // Wait until the writing thread finishes
294
363
    logger.msg(DEBUG, "StopWriting starts waiting for transfer_condition.");
295
364
    transfer_condition.wait();
296
365
    logger.msg(DEBUG, "StopWriting finished waiting for transfer_condition.");
 
366
 
 
367
    // Close the file if not done already
 
368
    if (fd != -1) {
 
369
      if (gfal_close(fd) < 0) {
 
370
        logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
 
371
      }
 
372
      fd = -1;
 
373
    }
297
374
    // If there was an error (maybe we triggered it)
298
 
    if (buffer->error_write())
 
375
    if (buffer->error_write()) {
 
376
      buffer = NULL;
299
377
      return DataStatus::WriteError;
 
378
    }
 
379
    buffer = NULL;
300
380
    return DataStatus::Success;
301
381
  }  
302
382
  
303
 
  static DataStatus do_stat(URL stat_url, FileInfo& file) {
 
383
  DataStatus DataPointGFAL::do_stat(const URL& stat_url, FileInfo& file) {
304
384
    struct stat st;
305
 
    if (gfal_stat(stat_url.plainstr().c_str(), &st) < 0) {
 
385
    int res;
 
386
 
 
387
    {
 
388
      GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
389
      res = gfal_stat(stat_url.plainstr().c_str(), &st);
 
390
    }
 
391
    if (res < 0) {
 
392
      logger.msg(ERROR, "gfal_stat failed: %s", StrError(errno));
 
393
      log_gfal_err();
306
394
      return DataStatus::StatError;
307
395
    }
308
396
 
317
405
    }
318
406
 
319
407
    std::string path = stat_url.Path();
 
408
    // For SRM the path can be given as SFN HTTP Option
 
409
    if ((stat_url.Protocol() == "srm" && !stat_url.HTTPOption("SFN").empty())) path = stat_url.HTTPOption("SFN");
 
410
 
320
411
    std::string name = Glib::path_get_basename(path);
321
412
    file.SetMetaData("path", path);
322
413
    file.SetName(name);
367
458
    // Open the directory
368
459
    struct dirent *d;
369
460
    DIR *dir;    
370
 
    if ((dir = gfal_opendir(url.plainstr().c_str())) == NULL) {
 
461
    {
 
462
      GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
463
      dir = gfal_opendir(url.plainstr().c_str());
 
464
    }
 
465
    if (!dir) {
371
466
      logger.msg(ERROR, "gfal_opendir failed: %s", StrError(errno));
 
467
      log_gfal_err();
372
468
      return DataStatus::ListError;
373
469
    }
374
470
    
376
472
    while ((d = gfal_readdir (dir))) {
377
473
      // Create a new FileInfo object and add it to the list of files
378
474
      std::list<FileInfo>::iterator f = files.insert(files.end(), FileInfo(d->d_name));
379
 
      DataStatus status_from_stat = DataStatus::StatError;
380
 
      // If information about times and access was also requested, do a stat
381
 
      if (verb & (INFO_TYPE_TIMES | INFO_TYPE_ACCESS)) {
 
475
      // If information about times, type or access was also requested, do a stat
 
476
      if (verb & (INFO_TYPE_TIMES | INFO_TYPE_ACCESS | INFO_TYPE_TYPE)) {
382
477
        URL child_url = URL(url.plainstr() + '/' + d->d_name);
383
478
        logger.msg(DEBUG, "List will stat the URL %s", child_url.plainstr());
384
 
        status_from_stat = do_stat(child_url, *f);
385
 
      }
386
 
      // If something was not OK with Stat (or we didn't call it), just get the type
387
 
      if (status_from_stat != DataStatus::Success) {
388
 
        if (d->d_type == DT_DIR) {
389
 
          f->SetType(FileInfo::file_type_dir);
390
 
        } else if (d->d_type == DT_REG) {
391
 
          f->SetType(FileInfo::file_type_file);
392
 
        }
 
479
        do_stat(child_url, *f);
393
480
      }
394
481
    }
395
482
    
396
483
    // Then close the dir
397
484
    if (gfal_closedir (dir) < 0) {
398
 
      logger.msg(ERROR, "gfal_closedir failed: %s", StrError(errno));
 
485
      logger.msg(WARNING, "gfal_closedir failed: %s", StrError(errno));
399
486
      return DataStatus::ListError;
400
487
    }
401
488
    
409
496
    DataStatus status_from_stat = do_stat(url, file);
410
497
    if (status_from_stat != DataStatus::Success)
411
498
      return DataStatus::DeleteError;
412
 
    if (file.GetType() == FileInfo::file_type_dir) {
413
 
      if (gfal_rmdir(url.plainstr().c_str()) < 0) {
414
 
        logger.msg(ERROR, "gfal_rmdir failed: %s", StrError(errno));
415
 
        return DataStatus::DeleteError;
416
 
      }      
417
 
    } else {
418
 
      if (gfal_unlink(url.plainstr().c_str()) < 0) {
419
 
        logger.msg(ERROR, "gfal_unlink failed: %s", StrError(errno));
420
 
        return DataStatus::DeleteError;
 
499
 
 
500
    int res;
 
501
    {
 
502
      GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
503
 
 
504
      if (file.GetType() == FileInfo::file_type_dir) {
 
505
        res = gfal_rmdir(url.plainstr().c_str());
 
506
      } else {
 
507
        res = gfal_unlink(url.plainstr().c_str());
421
508
      }
422
509
    }
 
510
    if (res < 0) {
 
511
      if (file.GetType() == FileInfo::file_type_dir) logger.msg(ERROR, "gfal_rmdir failed: %s", StrError(errno));
 
512
      else logger.msg(ERROR, "gfal_unlink failed: %s", StrError(errno));
 
513
      log_gfal_err();
 
514
      return DataStatus::DeleteError;
 
515
    }
423
516
    return DataStatus::Success;
424
517
  }
425
518
  
426
519
  DataStatus DataPointGFAL::CreateDirectory(bool with_parents) {
427
 
    if (gfal_mkdir(url.plainstr().c_str(), 0700) < 0) {
 
520
 
 
521
    int res;
 
522
    {
 
523
      GFALEnvLocker gfal_lock(usercfg, lfc_host);
 
524
      // gfal_mkdir is always recursive
 
525
      res = gfal_mkdir(url.plainstr().c_str(), 0700);
 
526
    }
 
527
    if (res < 0) {
428
528
      logger.msg(ERROR, "gfal_mkdir failed: %s", StrError(errno));
 
529
      log_gfal_err();
429
530
      return DataStatus::CreateDirectoryError;
430
531
    }
431
532
    return DataStatus::Success;    
432
533
  }
433
534
  
 
535
  void DataPointGFAL::log_gfal_err() {
 
536
    char errbuf[2048];
 
537
    gfal_posix_strerror_r(errbuf, sizeof(errbuf));
 
538
    logger.msg(ERROR, errbuf);
 
539
    gfal_posix_clear_error();
 
540
  }
434
541
 
435
542
} // namespace Arc
436
543
 
437
544
Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
438
 
  { "rfio", "HED:DMC", "RFIO plugin using the GFAL2 library", 0, &Arc::DataPointGFAL::Instance },
 
545
  { "gfal2", "HED:DMC", "Grid File Access Library 2", 0, &Arc::DataPointGFAL::Instance },
439
546
  { NULL, NULL, NULL, 0, NULL }
440
547
};