37
static char const * const stdfds[] = {
36
/// Class for locking environment while calling gfal functions.
37
class GFALEnvLocker: public CertEnvLocker {
40
GFALEnvLocker(const UserConfig& usercfg, const std::string& lfc_host): CertEnvLocker(usercfg) {
42
// if root, we have to set X509_USER_CERT and X509_USER_KEY to
43
// X509_USER_PROXY to force GFAL to use the proxy. If they are undefined
44
// it uses the host cert and key.
45
if (getuid() == 0 && !GetEnv("X509_USER_PROXY").empty()) {
46
SetEnv("X509_USER_KEY", GetEnv("X509_USER_PROXY"), true);
47
SetEnv("X509_USER_CERT", GetEnv("X509_USER_PROXY"), true);
49
logger.msg(DEBUG, "Using proxy %s", GetEnv("X509_USER_PROXY"));
50
logger.msg(DEBUG, "Using key %s", GetEnv("X509_USER_KEY"));
51
logger.msg(DEBUG, "Using cert %s", GetEnv("X509_USER_CERT"));
53
if (!lfc_host.empty()) {
54
// set LFC retry env variables (don't overwrite if set already)
56
SetEnv("LFC_CONNTIMEOUT", "30", false);
58
SetEnv("LFC_CONRETRY", "1", false);
59
// interval between retries
60
SetEnv("LFC_CONRETRYINT", "10", false);
62
// set host name env var
63
SetEnv("LFC_HOST", lfc_host);
70
Logger GFALEnvLocker::logger(Logger::getRootLogger(), "GFALEnvLocker");
43
72
Logger DataPointGFAL::logger(Logger::getRootLogger(), "DataPoint.GFAL");
45
DataPointGFAL::DataPointGFAL(const URL& url, const UserConfig& usercfg, PluginArgument* parg)
46
: DataPointDirect(url, usercfg, parg), reading(false), writing(false) {
74
DataPointGFAL::DataPointGFAL(const URL& u, const UserConfig& usercfg, PluginArgument* parg)
75
: DataPointDirect(u, usercfg, parg), fd(-1), reading(false), writing(false) {
47
76
LogLevel loglevel = logger.getThreshold();
48
77
if (loglevel == DEBUG)
49
gfal_set_verbose (GFAL_VERBOSE_VERBOSE | GFAL_VERBOSE_TRACE);
78
gfal_set_verbose (GFAL_VERBOSE_VERBOSE | GFAL_VERBOSE_DEBUG | GFAL_VERBOSE_TRACE);
50
79
if (loglevel == VERBOSE)
51
80
gfal_set_verbose (GFAL_VERBOSE_VERBOSE);
81
// lfc:// needs to be converted to lfn:/path or guid:abcd...
82
if (url.Protocol() == "lfc") {
83
lfc_host = url.Host();
86
if (url.MetaDataOption("guid").empty()) {
87
url.ChangeProtocol("lfn");
90
url.ChangeProtocol("guid");
91
// To fix: this call forces leading / on path
92
url.ChangePath(url.MetaDataOption("guid"));
54
97
DataPointGFAL::~DataPointGFAL() {
60
103
DataPointPluginArgument *dmcarg = dynamic_cast<DataPointPluginArgument*>(arg);
63
if (((const URL &)(*dmcarg)).Protocol() != "rfio")
106
if (((const URL &)(*dmcarg)).Protocol() != "rfio" &&
107
((const URL &)(*dmcarg)).Protocol() != "dcap" &&
108
((const URL &)(*dmcarg)).Protocol() != "gsidcap" &&
109
((const URL &)(*dmcarg)).Protocol() != "root" &&
110
((const URL &)(*dmcarg)).Protocol() != "gsiftp" &&
111
((const URL &)(*dmcarg)).Protocol() != "srm" &&
112
((const URL &)(*dmcarg)).Protocol() != "lfc")
65
114
return new DataPointGFAL(*dmcarg, *dmcarg, dmcarg);
74
if ((fd = gfal_open(url.plainstr().c_str(), O_RDONLY, 0)) < 0) {
124
GFALEnvLocker gfal_lock(usercfg, lfc_host);
125
fd = gfal_open(url.plainstr().c_str(), O_RDONLY, 0);
75
128
logger.msg(ERROR, "gfal_open failed: %s", StrError(errno));
77
131
return DataStatus::ReadStartError;
82
136
// StopReading will wait for this condition,
83
137
// which will be signalled by the separate reading thread
84
transfer_condition.reset();
85
138
// Create the separate reading thread
86
if (!CreateThreadFunction(&DataPointGFAL::read_file_start, this)) {
139
if (!CreateThreadFunction(&DataPointGFAL::read_file_start, this, &transfer_condition)) {
87
140
logger.msg(ERROR, "Failed to create reading thread");
89
142
if (gfal_close(fd) < 0) {
90
logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
143
logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
110
163
if (!buffer->for_read(handle, length, true)) {
111
164
buffer->error_read(true);
115
168
// Read into the buffer
116
169
bytes_read = gfal_read(fd, (*(buffer))[handle], length);
118
171
// If there was an error
119
172
if (bytes_read < 0) {
120
173
logger.msg(ERROR, "gfal_read failed: %s", StrError(errno));
121
175
buffer->error_read(true);
139
193
// Close the file
141
195
if (gfal_close(fd) < 0) {
142
logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
196
logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
145
// Signal that we finished reading (even if there was an error)
146
transfer_condition.signal();
149
202
DataStatus DataPointGFAL::StopReading() {
150
203
if (!reading) return DataStatus::ReadStopError;
152
// If the reading is not finished yet
153
if (!buffer->eof_read()) {
154
// Trigger reading error
155
buffer->error_read(true);
158
if (gfal_close(fd) < 0) {
159
logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
205
if (!buffer) return DataStatus::ReadStopError;
206
// If the reading is not finished yet trigger reading error
207
if (!buffer->eof_read()) buffer->error_read(true);
163
209
// Wait for the reading thread to finish
164
210
logger.msg(DEBUG, "StopReading starts waiting for transfer_condition.");
165
211
transfer_condition.wait();
166
212
logger.msg(DEBUG, "StopReading finished waiting for transfer_condition.");
214
// Close the file if not already done
216
if (gfal_close(fd) < 0) {
217
logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
167
221
// If there was an error (maybe we triggered it)
168
if (buffer->error_read())
222
if (buffer->error_read()) {
169
224
return DataStatus::ReadError;
170
226
// If there was no error (the reading already finished)
171
228
return DataStatus::Success;
176
233
if (writing) return DataStatus::IsWritingError;
179
// Create the parent directory
180
URL parent_url = URL(url.plainstr());
181
parent_url.ChangePath(Glib::path_get_dirname(url.Path()));
182
if (gfal_mkdir(parent_url.plainstr().c_str(), 0700) < 0) {
183
logger.msg(DEBUG, "Failed to create parent directory, continuing anyway: %s", StrError(errno));
237
GFALEnvLocker gfal_lock(usercfg, lfc_host);
239
fd = gfal_open(url.plainstr().c_str(), O_WRONLY | O_CREAT, 0600);
242
// If no entry try to create parent directories
243
if (errno == ENOENT) {
244
URL parent_url = URL(url.plainstr());
245
// For SRM the path can be given as SFN HTTP option
246
if ((url.Protocol() == "srm" && !url.HTTPOption("SFN").empty())) {
247
parent_url.AddHTTPOption("SFN", Glib::path_get_dirname(url.HTTPOption("SFN")), true);
249
parent_url.ChangePath(Glib::path_get_dirname(url.Path()));
187
if ((fd = gfal_open(url.plainstr().c_str(), O_WRONLY | O_CREAT, 0600)) < 0) {
188
logger.msg(ERROR, "gfal_open failed: %s", StrError(errno));
190
return DataStatus::WriteStartError;
253
GFALEnvLocker gfal_lock(usercfg, lfc_host);
254
// gfal_mkdir is always recursive
255
if (gfal_mkdir(parent_url.plainstr().c_str(), 0700) == 0) {
256
fd = gfal_open(url.plainstr().c_str(), O_WRONLY | O_CREAT, 0600);
258
logger.msg(ERROR, "Failed to create parent directories: ", StrError(errno));
263
logger.msg(ERROR, "gfal_open failed: %s", StrError(errno));
266
return DataStatus::WriteStartError;
193
270
// Remember the DataBuffer we got, the separate writing thread will use it
195
272
// StopWriting will wait for this condition,
196
273
// which will be signalled by the separate writing thread
197
transfer_condition.reset();
198
274
// Create the separate writing thread
199
if (!CreateThreadFunction(&DataPointGFAL::write_file_start, this)) {
275
if (!CreateThreadFunction(&DataPointGFAL::write_file_start, this, &transfer_condition)) {
200
276
logger.msg(ERROR, "Failed to create writing thread");
202
278
if (gfal_close(fd) < 0) {
203
logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
279
logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
268
345
// Close the file
270
347
if (gfal_close(fd) < 0) {
271
logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
348
logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
274
// Signal that we finished writing (even if there was an error)
275
transfer_condition.signal();
278
354
DataStatus DataPointGFAL::StopWriting() {
279
355
if (!writing) return DataStatus::WriteStopError;
357
if (!buffer) return DataStatus::WriteStopError;
282
// If the writing is not finished
283
if (!buffer->eof_write()) {
285
buffer->error_write(true);
288
if (gfal_close(fd) < 0) {
289
logger.msg(ERROR, "gfal_close failed: %s", StrError(errno));
359
// If the writing is not finished, trigger writing error
360
if (!buffer->eof_write()) buffer->error_write(true);
293
362
// Wait until the writing thread finishes
294
363
logger.msg(DEBUG, "StopWriting starts waiting for transfer_condition.");
295
364
transfer_condition.wait();
296
365
logger.msg(DEBUG, "StopWriting finished waiting for transfer_condition.");
367
// Close the file if not done already
369
if (gfal_close(fd) < 0) {
370
logger.msg(WARNING, "gfal_close failed: %s", StrError(errno));
297
374
// If there was an error (maybe we triggered it)
298
if (buffer->error_write())
375
if (buffer->error_write()) {
299
377
return DataStatus::WriteError;
300
380
return DataStatus::Success;
303
static DataStatus do_stat(URL stat_url, FileInfo& file) {
383
DataStatus DataPointGFAL::do_stat(const URL& stat_url, FileInfo& file) {
305
if (gfal_stat(stat_url.plainstr().c_str(), &st) < 0) {
388
GFALEnvLocker gfal_lock(usercfg, lfc_host);
389
res = gfal_stat(stat_url.plainstr().c_str(), &st);
392
logger.msg(ERROR, "gfal_stat failed: %s", StrError(errno));
306
394
return DataStatus::StatError;
376
472
while ((d = gfal_readdir (dir))) {
377
473
// Create a new FileInfo object and add it to the list of files
378
474
std::list<FileInfo>::iterator f = files.insert(files.end(), FileInfo(d->d_name));
379
DataStatus status_from_stat = DataStatus::StatError;
380
// If information about times and access was also requested, do a stat
381
if (verb & (INFO_TYPE_TIMES | INFO_TYPE_ACCESS)) {
475
// If information about times, type or access was also requested, do a stat
476
if (verb & (INFO_TYPE_TIMES | INFO_TYPE_ACCESS | INFO_TYPE_TYPE)) {
382
477
URL child_url = URL(url.plainstr() + '/' + d->d_name);
383
478
logger.msg(DEBUG, "List will stat the URL %s", child_url.plainstr());
384
status_from_stat = do_stat(child_url, *f);
386
// If something was not OK with Stat (or we didn't call it), just get the type
387
if (status_from_stat != DataStatus::Success) {
388
if (d->d_type == DT_DIR) {
389
f->SetType(FileInfo::file_type_dir);
390
} else if (d->d_type == DT_REG) {
391
f->SetType(FileInfo::file_type_file);
479
do_stat(child_url, *f);
396
483
// Then close the dir
397
484
if (gfal_closedir (dir) < 0) {
398
logger.msg(ERROR, "gfal_closedir failed: %s", StrError(errno));
485
logger.msg(WARNING, "gfal_closedir failed: %s", StrError(errno));
399
486
return DataStatus::ListError;
409
496
DataStatus status_from_stat = do_stat(url, file);
410
497
if (status_from_stat != DataStatus::Success)
411
498
return DataStatus::DeleteError;
412
if (file.GetType() == FileInfo::file_type_dir) {
413
if (gfal_rmdir(url.plainstr().c_str()) < 0) {
414
logger.msg(ERROR, "gfal_rmdir failed: %s", StrError(errno));
415
return DataStatus::DeleteError;
418
if (gfal_unlink(url.plainstr().c_str()) < 0) {
419
logger.msg(ERROR, "gfal_unlink failed: %s", StrError(errno));
420
return DataStatus::DeleteError;
502
GFALEnvLocker gfal_lock(usercfg, lfc_host);
504
if (file.GetType() == FileInfo::file_type_dir) {
505
res = gfal_rmdir(url.plainstr().c_str());
507
res = gfal_unlink(url.plainstr().c_str());
511
if (file.GetType() == FileInfo::file_type_dir) logger.msg(ERROR, "gfal_rmdir failed: %s", StrError(errno));
512
else logger.msg(ERROR, "gfal_unlink failed: %s", StrError(errno));
514
return DataStatus::DeleteError;
423
516
return DataStatus::Success;
426
519
DataStatus DataPointGFAL::CreateDirectory(bool with_parents) {
427
if (gfal_mkdir(url.plainstr().c_str(), 0700) < 0) {
523
GFALEnvLocker gfal_lock(usercfg, lfc_host);
524
// gfal_mkdir is always recursive
525
res = gfal_mkdir(url.plainstr().c_str(), 0700);
428
528
logger.msg(ERROR, "gfal_mkdir failed: %s", StrError(errno));
429
530
return DataStatus::CreateDirectoryError;
431
532
return DataStatus::Success;
535
void DataPointGFAL::log_gfal_err() {
537
gfal_posix_strerror_r(errbuf, sizeof(errbuf));
538
logger.msg(ERROR, errbuf);
539
gfal_posix_clear_error();
435
542
} // namespace Arc
437
544
Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
438
{ "rfio", "HED:DMC", "RFIO plugin using the GFAL2 library", 0, &Arc::DataPointGFAL::Instance },
545
{ "gfal2", "HED:DMC", "Grid File Access Library 2", 0, &Arc::DataPointGFAL::Instance },
439
546
{ NULL, NULL, NULL, 0, NULL }