185
183
self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri)
186
184
self.bucket = self.conn.lookup(self.bucket_name)
188
def put(self, source_path, remote_filename=None):
186
def _retry_cleanup(self):
187
self.resetConnection()
189
def _put(self, source_path, remote_filename):
189
190
from boto.s3.connection import Location
190
191
if globals.s3_european_buckets:
191
192
if not globals.s3_use_new_style:
192
log.FatalError("European bucket creation was requested, but not new-style "
193
"bucket addressing (--s3-use-new-style)",
194
log.ErrorCode.s3_bucket_not_style)
195
#Network glitch may prevent first few attempts of creating/looking up a bucket
196
for n in range(1, globals.num_retries+1):
201
self.resetConnection()
193
raise FatalBackendException("European bucket creation was requested, but not new-style "
194
"bucket addressing (--s3-use-new-style)",
195
code=log.ErrorCode.s3_bucket_not_style)
197
if self.bucket is None:
204
self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
205
except Exception as e:
206
if "NoSuchBucket" in str(e):
207
if globals.s3_european_buckets:
208
self.bucket = self.conn.create_bucket(self.bucket_name,
209
location=Location.EU)
211
self.bucket = self.conn.create_bucket(self.bucket_name)
199
self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
200
except Exception as e:
201
if "NoSuchBucket" in str(e):
202
if globals.s3_european_buckets:
203
self.bucket = self.conn.create_bucket(self.bucket_name,
204
location=Location.EU)
214
except Exception as e:
215
log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
216
"" % (n, self.bucket_name,
217
e.__class__.__name__,
206
self.bucket = self.conn.create_bucket(self.bucket_name)
220
if not remote_filename:
221
remote_filename = source_path.get_filename()
222
210
key = self.bucket.new_key(self.key_prefix + remote_filename)
224
for n in range(1, globals.num_retries+1):
226
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
229
if globals.s3_use_rrs:
230
storage_class = 'REDUCED_REDUNDANCY'
232
storage_class = 'STANDARD'
233
log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
235
if globals.s3_use_sse:
237
'Content-Type': 'application/octet-stream',
238
'x-amz-storage-class': storage_class,
239
'x-amz-server-side-encryption': 'AES256'
243
'Content-Type': 'application/octet-stream',
244
'x-amz-storage-class': storage_class
247
upload_start = time.time()
248
self.upload(source_path.name, key, headers)
249
upload_end = time.time()
250
total_s = abs(upload_end-upload_start) or 1 # prevent a zero value!
251
rough_upload_speed = os.path.getsize(source_path.name)/total_s
252
self.resetConnection()
253
log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
255
except Exception as e:
256
log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
257
"" % (self.straight_url,
260
e.__class__.__name__,
262
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
263
self.resetConnection()
264
log.Warn("Giving up trying to upload %s/%s after %d attempts" %
265
(self.straight_url, remote_filename, globals.num_retries))
266
raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
268
def get(self, remote_filename, local_path):
212
if globals.s3_use_rrs:
213
storage_class = 'REDUCED_REDUNDANCY'
215
storage_class = 'STANDARD'
216
log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
217
if globals.s3_use_sse:
219
'Content-Type': 'application/octet-stream',
220
'x-amz-storage-class': storage_class,
221
'x-amz-server-side-encryption': 'AES256'
225
'Content-Type': 'application/octet-stream',
226
'x-amz-storage-class': storage_class
229
upload_start = time.time()
230
self.upload(source_path.name, key, headers)
231
upload_end = time.time()
232
total_s = abs(upload_end-upload_start) or 1 # prevent a zero value!
233
rough_upload_speed = os.path.getsize(source_path.name)/total_s
234
log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
236
def _get(self, remote_filename, local_path):
269
237
key_name = self.key_prefix + remote_filename
270
238
self.pre_process_download(remote_filename, wait=True)
271
239
key = self._listed_keys[key_name]
272
for n in range(1, globals.num_retries+1):
274
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
276
log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
278
self.resetConnection()
279
key.get_contents_to_filename(local_path.name)
282
except Exception as e:
283
log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
284
"" % (self.straight_url,
287
e.__class__.__name__,
289
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
291
log.Warn("Giving up trying to download %s/%s after %d attempts" %
292
(self.straight_url, remote_filename, globals.num_retries))
293
raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
240
self.resetConnection()
241
key.get_contents_to_filename(local_path.name)
296
244
if not self.bucket:
297
245
raise BackendException("No connection to backend")
299
for n in range(1, globals.num_retries+1):
303
self.resetConnection()
304
log.Info("Listing %s" % self.straight_url)
306
return self._list_filenames_in_bucket()
307
except Exception as e:
308
log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
309
"" % (self.straight_url,
311
e.__class__.__name__,
313
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
314
log.Warn("Giving up trying to list %s after %d attempts" %
315
(self.straight_url, globals.num_retries))
316
raise BackendException("Error listng %s" % self.straight_url)
246
return self._list_filenames_in_bucket()
318
248
def _list_filenames_in_bucket(self):
319
249
# We add a 'd' to the prefix to make sure it is not null (for boto) and
337
267
return filename_list
339
def delete(self, filename_list):
340
for filename in filename_list:
341
self.bucket.delete_key(self.key_prefix + filename)
342
log.Debug("Deleted %s/%s" % (self.straight_url, filename))
269
def _delete(self, filename):
270
self.bucket.delete_key(self.key_prefix + filename)
345
def _query_file_info(self, filename, raise_errors=False):
347
key = self.bucket.lookup(self.key_prefix + filename)
350
return {'size': key.size}
351
except Exception as e:
352
log.Warn("Query %s/%s failed: %s"
353
"" % (self.straight_url,
356
self.resetConnection()
360
return {'size': None}
272
def _query(self, filename):
273
key = self.bucket.lookup(self.key_prefix + filename)
276
return {'size': key.size}
362
278
def upload(self, filename, key, headers):
363
key.set_contents_from_filename(filename, headers,
364
cb=progress.report_transfer,
365
num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
366
) # Max num of callbacks = 8 times x megabyte
279
key.set_contents_from_filename(filename, headers,
280
cb=progress.report_transfer,
281
num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
282
) # Max num of callbacks = 8 times x megabyte
369
def pre_process_download(self, files_to_download, wait=False):
285
def pre_process_download(self, remote_filename, wait=False):
370
286
# Used primarily to move files in Glacier to S3
371
if isinstance(files_to_download, (bytes, str, unicode)):
372
files_to_download = [files_to_download]
287
key_name = self.key_prefix + remote_filename
288
if not self._listed_keys.get(key_name, False):
289
self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
290
key = self._listed_keys[key_name]
374
for remote_filename in files_to_download:
376
for n in range(1, globals.num_retries+1):
378
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
292
if key.storage_class == "GLACIER":
293
# We need to move the file out of glacier
294
if not self.bucket.get_key(key.key).ongoing_restore:
295
log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
296
key.restore(days=1) # Shouldn't need this again after 1 day
298
log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
299
while self.bucket.get_key(key.key).ongoing_restore:
380
301
self.resetConnection()
382
key_name = self.key_prefix + remote_filename
383
if not self._listed_keys.get(key_name, False):
384
self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
385
key = self._listed_keys[key_name]
387
if key.storage_class == "GLACIER":
388
# We need to move the file out of glacier
389
if not self.bucket.get_key(key.key).ongoing_restore:
390
log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
391
key.restore(days=1) # Shouldn't need this again after 1 day
393
log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
394
while self.bucket.get_key(key.key).ongoing_restore:
396
self.resetConnection()
397
log.Info("File %s was successfully restored from Glacier" % remote_filename)
400
except Exception as e:
401
log.Warn("Restoration from Glacier for file %s/%s failed (attempt #%d, reason: %s: %s)"
402
"" % (self.straight_url,
405
e.__class__.__name__,
407
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
409
log.Warn("Giving up trying to restore %s/%s after %d attempts" %
410
(self.straight_url, remote_filename, globals.num_retries))
411
raise BackendException("Error restoring %s/%s from Glacier to S3" % (self.straight_url, remote_filename))
302
log.Info("File %s was successfully restored from Glacier" % remote_filename)