62
62
while not self.finish:
64
args = self.queue.get(True, 1)
64
args = self.queue.get(True, 1)
65
65
progress.report_transfer(args[0], args[1])
66
66
except Queue.Empty, e:
70
def get_connection(scheme, parsed_url):
73
assert boto.Version >= BOTO_MIN_VERSION
75
from boto.s3.connection import S3Connection
76
assert hasattr(S3Connection, 'lookup')
78
# Newer versions of boto default to using
79
# virtual hosting for buckets as a result of
80
# upstream deprecation of the old-style access
81
# method by Amazon S3. This change is not
82
# backwards compatible (in particular with
83
# respect to upper case characters in bucket
84
# names); so we default to forcing use of the
85
# old-style method unless the user has
86
# explicitly asked us to use new-style bucket
89
# Note that if the user wants to use new-style
90
# buckets, we use the subdomain calling form
91
# rather than given the option of both
92
# subdomain and vhost. The reason being that
93
# anything addressable as a vhost, is also
94
# addressable as a subdomain. Seeing as the
95
# latter is mostly a convenience method of
96
# allowing browse:able content semi-invisibly
97
# being hosted on S3, the former format makes
98
# a lot more sense for us to use - being
99
# explicit about what is happening (the fact
100
# that we are talking to S3 servers).
103
from boto.s3.connection import OrdinaryCallingFormat
104
from boto.s3.connection import SubdomainCallingFormat
106
calling_format = OrdinaryCallingFormat()
108
cfs_supported = False
109
calling_format = None
111
if globals.s3_use_new_style:
113
calling_format = SubdomainCallingFormat()
115
log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
116
"requested, but does not seem to be supported by the "
117
"boto library. Either you need to upgrade your boto "
118
"library or duplicity has failed to correctly detect "
119
"the appropriate support.",
120
log.ErrorCode.boto_old_style)
123
calling_format = OrdinaryCallingFormat()
125
calling_format = None
128
log.FatalError("This backend (s3) requires boto library, version %s or later, "
129
"(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
130
log.ErrorCode.boto_lib_too_old)
132
if scheme == 's3+http':
133
# Use the default Amazon S3 host.
134
conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
136
assert scheme == 's3'
138
host = parsed_url.hostname,
139
is_secure=(not globals.s3_unencrypted_connection))
141
if hasattr(conn, 'calling_format'):
142
if calling_format is None:
143
log.FatalError("It seems we previously failed to detect support for calling "
144
"formats in the boto library, yet the support is there. This is "
145
"almost certainly a duplicity bug.",
146
log.ErrorCode.boto_calling_format)
148
conn.calling_format = calling_format
151
# Duplicity hangs if boto gets a null bucket name.
152
# HC: Caught a socket error, trying to recover
153
raise BackendException('Boto requires a bucket name.')
157
class BotoBackend(duplicity.backend.Backend):
70
class BotoBackend(BotoSingleBackend):
159
72
Backend for Amazon's Simple Storage System, (aka Amazon S3), though
160
73
the use of the boto module, (http://code.google.com/p/boto/).
169
82
def __init__(self, parsed_url):
170
duplicity.backend.Backend.__init__(self, parsed_url)
172
from boto.s3.key import Key
173
from boto.s3.multipart import MultiPartUpload
175
# This folds the null prefix and all null parts, which means that:
176
# //MyBucket/ and //MyBucket are equivalent.
177
# //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
178
self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
181
self.bucket_name = self.url_parts.pop(0)
183
# Duplicity hangs if boto gets a null bucket name.
184
# HC: Caught a socket error, trying to recover
185
raise BackendException('Boto requires a bucket name.')
187
self.scheme = parsed_url.scheme
192
self.key_prefix = '%s/' % '/'.join(self.url_parts)
196
self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
197
self.parsed_url = parsed_url
198
self.resetConnection()
200
def resetConnection(self):
202
self.conn = get_connection(self.scheme, self.parsed_url)
203
self.bucket = self.conn.lookup(self.bucket_name)
205
def put(self, source_path, remote_filename=None):
206
from boto.s3.connection import Location
207
if globals.s3_european_buckets:
208
if not globals.s3_use_new_style:
209
log.FatalError("European bucket creation was requested, but not new-style "
210
"bucket addressing (--s3-use-new-style)",
211
log.ErrorCode.s3_bucket_not_style)
212
#Network glitch may prevent first few attempts of creating/looking up a bucket
213
for n in range(1, globals.num_retries+1):
220
self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
222
if "NoSuchBucket" in str(e):
223
if globals.s3_european_buckets:
224
self.bucket = self.conn.create_bucket(self.bucket_name,
225
location=Location.EU)
227
self.bucket = self.conn.create_bucket(self.bucket_name)
231
log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
232
"" % (n, self.bucket_name,
233
e.__class__.__name__,
235
self.resetConnection()
237
if not remote_filename:
238
remote_filename = source_path.get_filename()
239
key = self.key_prefix + remote_filename
240
for n in range(1, globals.num_retries+1):
242
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
245
if globals.s3_use_rrs:
246
storage_class = 'REDUCED_REDUNDANCY'
248
storage_class = 'STANDARD'
249
log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
252
'Content-Type': 'application/octet-stream',
253
'x-amz-storage-class': storage_class
255
self.upload(source_path.name, key, headers)
256
self.resetConnection()
259
log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
260
"" % (self.straight_url,
263
e.__class__.__name__,
265
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
266
self.resetConnection()
267
log.Warn("Giving up trying to upload %s/%s after %d attempts" %
268
(self.straight_url, remote_filename, globals.num_retries))
269
raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
271
def get(self, remote_filename, local_path):
272
key = self.key_class(self.bucket)
273
key.key = self.key_prefix + remote_filename
274
for n in range(1, globals.num_retries+1):
276
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
278
log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
280
key.get_contents_to_filename(local_path.name)
282
self.resetConnection()
285
log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
286
"" % (self.straight_url,
289
e.__class__.__name__,
291
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
292
self.resetConnection()
293
log.Warn("Giving up trying to download %s/%s after %d attempts" %
294
(self.straight_url, remote_filename, globals.num_retries))
295
raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
299
raise BackendException("No connection to backend")
301
for n in range(1, globals.num_retries+1):
305
log.Info("Listing %s" % self.straight_url)
307
return self._list_filenames_in_bucket()
309
log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
310
"" % (self.straight_url,
312
e.__class__.__name__,
314
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
315
log.Warn("Giving up trying to list %s after %d attempts" %
316
(self.straight_url, globals.num_retries))
317
raise BackendException("Error listng %s" % self.straight_url)
319
def _list_filenames_in_bucket(self):
320
# We add a 'd' to the prefix to make sure it is not null (for boto) and
321
# to optimize the listing of our filenames, which always begin with 'd'.
322
# This will cause a failure in the regression tests as below:
323
# FAIL: Test basic backend operations
325
# AssertionError: Got list: []
326
# Wanted: ['testfile']
327
# Because of the need for this optimization, it should be left as is.
328
#for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
330
for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
332
filename = k.key.replace(self.key_prefix, '', 1)
333
filename_list.append(filename)
334
log.Debug("Listed %s/%s" % (self.straight_url, filename))
335
except AttributeError:
339
def delete(self, filename_list):
340
for filename in filename_list:
341
self.bucket.delete_key(self.key_prefix + filename)
342
log.Debug("Deleted %s/%s" % (self.straight_url, filename))
345
def _query_file_info(self, filename, raise_errors=False):
347
key = self.bucket.lookup(self.key_prefix + filename)
350
return {'size': key.size}
352
log.Warn("Query %s/%s failed: %s"
353
"" % (self.straight_url,
356
self.resetConnection()
360
return {'size': None}
83
BotoSingleBackend.__init__(self, parsed_url)
86
def _setup_pool(self):
87
number_of_procs = globals.s3_multipart_max_procs
88
if not number_of_procs:
89
number_of_procs = multiprocessing.cpu_count()
91
if getattr(self, '_pool', False):
92
log.Debug("A process pool already exists. Destroying previous pool.")
93
self._pool.terminate()
97
log.Debug("Setting multipart boto backend process pool to %d processes" % number_of_procs)
99
self._pool = multiprocessing.Pool(processes=number_of_procs)
102
BotoSingleBackend.close(self)
103
log.Debug("Closing pool")
104
self._pool.terminate()
362
107
def upload(self, filename, key, headers=None):
363
109
chunk_size = globals.s3_multipart_chunk_size
365
111
# Check minimum chunk size for S3
389
135
queue = manager.Queue()
390
136
consumer = ConsumerThread(queue)
393
pool = multiprocessing.Pool(processes=chunks)
394
139
for n in range(chunks):
395
params = [self.scheme, self.parsed_url, self.bucket_name,
396
mp.id, filename, n, chunk_size, globals.num_retries,
398
pool.apply_async(multipart_upload_worker, params)
140
storage_uri = boto.storage_uri(self.boto_uri_str)
141
params = [self.scheme, self.parsed_url, storage_uri, self.bucket_name,
142
mp.id, filename, n, chunk_size, globals.num_retries,
144
tasks.append(self._pool.apply_async(multipart_upload_worker, params))
146
log.Debug("Waiting for the pool to finish processing %s tasks" % len(tasks))
149
tasks[0].wait(timeout=globals.s3_multipart_max_timeout)
151
if tasks[0].successful():
154
log.Debug("Part upload not successful, aborting multipart upload.")
158
raise multiprocessing.TimeoutError
159
except multiprocessing.TimeoutError:
160
log.Debug("%s tasks did not finish by the specified timeout, aborting multipart upload and resetting pool." % len(tasks))
164
log.Debug("Done waiting for the pool to finish processing")
402
166
# Terminate the consumer thread, if any
403
167
if globals.progress:
404
168
consumer.finish = True
407
if len(mp.get_all_parts()) < chunks:
171
if len(tasks) > 0 or len(mp.get_all_parts()) < chunks:
408
172
mp.cancel_upload()
409
173
raise BackendException("Multipart upload failed. Aborted.")
411
175
return mp.complete_upload()
414
def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
415
offset, bytes, num_retries, queue):
178
def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
179
filename, offset, bytes, num_retries, queue):
417
181
Worker method for uploading a file chunk to S3 using multipart upload.
418
182
Note that the file chunk is read into memory, so it's important to keep
419
183
this number reasonably small.
423
186
def _upload_callback(uploaded, total):
424
187
worker_name = multiprocessing.current_process().name
425
188
log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
426
189
if not queue is None:
427
queue.put([uploaded, total]) # Push data to the consumer thread
190
queue.put([uploaded, total]) # Push data to the consumer thread
429
192
def _upload(num_retries):
430
193
worker_name = multiprocessing.current_process().name
431
194
log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
433
conn = get_connection(scheme, parsed_url)
196
conn = get_connection(scheme, parsed_url, storage_uri)
434
197
bucket = conn.lookup(bucket_name)
436
for mp in bucket.get_all_multipart_uploads():
199
for mp in bucket.list_multipart_uploads():
437
200
if mp.id == multipart_id:
438
201
with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
439
203
mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
440
num_cb=max(2, 8 * bytes / (1024 * 1024))
441
) # Max num of callbacks = 8 times x megabyte
204
num_cb=max(2, 8 * bytes / (1024 * 1024))
205
) # Max num of callbacks = 8 times x megabyte
207
log.Debug("{name}: Uploaded chunk {chunk} at roughly {speed} bytes/second".format(name=worker_name, chunk=offset+1, speed=(bytes/max(1, abs(end-start)))))
443
213
except Exception, e:
444
214
traceback.print_exc()