1
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
3
# Copyright 2002 Ben Escoto <ben@emerose.org>
4
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
5
# Copyright 2011 Henrique Carvalho Alves <hcarvalhoalves@gmail.com>
7
# This file is part of duplicity.
9
# Duplicity is free software; you can redistribute it and/or modify it
10
# under the terms of the GNU General Public License as published by the
11
# Free Software Foundation; either version 2 of the License, or (at your
12
# option) any later version.
14
# Duplicity is distributed in the hope that it will be useful, but
15
# WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17
# General Public License for more details.
19
# You should have received a copy of the GNU General Public License
20
# along with duplicity; if not, write to the Free Software Foundation,
21
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27
import duplicity.backend
29
from duplicity import globals
30
from duplicity import log
31
from duplicity.errors import * #@UnusedWildImport
32
from duplicity.util import exception_traceback
33
from duplicity.backend import retry
34
from duplicity.filechunkio import FileChunkIO
36
BOTO_MIN_VERSION = "1.6a"
38
# Multiprocessing is not supported on *BSD
39
if sys.platform not in ('darwin', 'linux2'):
40
from multiprocessing import dummy as multiprocessing
41
log.Debug('Multiprocessing is not supported on %s, will use threads instead.' % sys.platform)
43
import multiprocessing
46
def get_connection(scheme, parsed_url):
49
assert boto.Version >= BOTO_MIN_VERSION
51
from boto.s3.connection import S3Connection
52
assert hasattr(S3Connection, 'lookup')
54
# Newer versions of boto default to using
55
# virtual hosting for buckets as a result of
56
# upstream deprecation of the old-style access
57
# method by Amazon S3. This change is not
58
# backwards compatible (in particular with
59
# respect to upper case characters in bucket
60
# names); so we default to forcing use of the
61
# old-style method unless the user has
62
# explicitly asked us to use new-style bucket
65
# Note that if the user wants to use new-style
66
# buckets, we use the subdomain calling form
67
# rather than given the option of both
68
# subdomain and vhost. The reason being that
69
# anything addressable as a vhost, is also
70
# addressable as a subdomain. Seeing as the
71
# latter is mostly a convenience method of
72
# allowing browse:able content semi-invisibly
73
# being hosted on S3, the former format makes
74
# a lot more sense for us to use - being
75
# explicit about what is happening (the fact
76
# that we are talking to S3 servers).
79
from boto.s3.connection import OrdinaryCallingFormat
80
from boto.s3.connection import SubdomainCallingFormat
82
calling_format = OrdinaryCallingFormat()
87
if globals.s3_use_new_style:
89
calling_format = SubdomainCallingFormat()
91
log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
92
"requested, but does not seem to be supported by the "
93
"boto library. Either you need to upgrade your boto "
94
"library or duplicity has failed to correctly detect "
95
"the appropriate support.",
96
log.ErrorCode.boto_old_style)
99
calling_format = OrdinaryCallingFormat()
101
calling_format = None
104
log.FatalError("This backend (s3) requires boto library, version %s or later, "
105
"(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
106
log.ErrorCode.boto_lib_too_old)
108
if scheme == 's3+http':
109
# Use the default Amazon S3 host.
110
conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
112
assert scheme == 's3'
114
host = parsed_url.hostname,
115
is_secure=(not globals.s3_unencrypted_connection))
117
if hasattr(conn, 'calling_format'):
118
if calling_format is None:
119
log.FatalError("It seems we previously failed to detect support for calling "
120
"formats in the boto library, yet the support is there. This is "
121
"almost certainly a duplicity bug.",
122
log.ErrorCode.boto_calling_format)
124
conn.calling_format = calling_format
127
# Duplicity hangs if boto gets a null bucket name.
128
# HC: Caught a socket error, trying to recover
129
raise BackendException('Boto requires a bucket name.')
133
class BotoBackend(duplicity.backend.Backend):
135
Backend for Amazon's Simple Storage System, (aka Amazon S3), though
136
the use of the boto module, (http://code.google.com/p/boto/).
138
To make use of this backend you must set aws_access_key_id
139
and aws_secret_access_key in your ~/.boto or /etc/boto.cfg
140
with your Amazon Web Services key id and secret respectively.
141
Alternatively you can export the environment variables
142
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
145
def __init__(self, parsed_url):
146
duplicity.backend.Backend.__init__(self, parsed_url)
148
from boto.s3.key import Key
149
from boto.s3.multipart import MultiPartUpload
151
# This folds the null prefix and all null parts, which means that:
152
# //MyBucket/ and //MyBucket are equivalent.
153
# //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
154
self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
157
self.bucket_name = self.url_parts.pop(0)
159
# Duplicity hangs if boto gets a null bucket name.
160
# HC: Caught a socket error, trying to recover
161
raise BackendException('Boto requires a bucket name.')
163
self.scheme = parsed_url.scheme
168
self.key_prefix = '%s/' % '/'.join(self.url_parts)
172
self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
173
self.parsed_url = parsed_url
174
self.resetConnection()
176
def resetConnection(self):
178
self.conn = get_connection(self.scheme, self.parsed_url)
179
self.bucket = self.conn.lookup(self.bucket_name)
181
def put(self, source_path, remote_filename=None):
182
from boto.s3.connection import Location
183
if globals.s3_european_buckets:
184
if not globals.s3_use_new_style:
185
log.FatalError("European bucket creation was requested, but not new-style "
186
"bucket addressing (--s3-use-new-style)",
187
log.ErrorCode.s3_bucket_not_style)
188
#Network glitch may prevent first few attempts of creating/looking up a bucket
189
for n in range(1, globals.num_retries+1):
196
self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
198
if "NoSuchBucket" in str(e):
199
if globals.s3_european_buckets:
200
self.bucket = self.conn.create_bucket(self.bucket_name,
201
location=Location.EU)
203
self.bucket = self.conn.create_bucket(self.bucket_name)
207
log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
208
"" % (n, self.bucket_name,
209
e.__class__.__name__,
211
self.resetConnection()
213
if not remote_filename:
214
remote_filename = source_path.get_filename()
215
key = self.key_prefix + remote_filename
216
for n in range(1, globals.num_retries+1):
218
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
221
if globals.s3_use_rrs:
222
storage_class = 'REDUCED_REDUNDANCY'
224
storage_class = 'STANDARD'
225
log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
228
'Content-Type': 'application/octet-stream',
229
'x-amz-storage-class': storage_class
231
self.upload(source_path.name, key, headers)
232
self.resetConnection()
235
log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
236
"" % (self.straight_url,
239
e.__class__.__name__,
241
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
242
self.resetConnection()
243
log.Warn("Giving up trying to upload %s/%s after %d attempts" %
244
(self.straight_url, remote_filename, globals.num_retries))
245
raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
247
def get(self, remote_filename, local_path):
248
key = self.key_class(self.bucket)
249
key.key = self.key_prefix + remote_filename
250
for n in range(1, globals.num_retries+1):
252
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
254
log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
256
key.get_contents_to_filename(local_path.name)
258
self.resetConnection()
261
log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
262
"" % (self.straight_url,
265
e.__class__.__name__,
267
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
268
self.resetConnection()
269
log.Warn("Giving up trying to download %s/%s after %d attempts" %
270
(self.straight_url, remote_filename, globals.num_retries))
271
raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
277
for n in range(1, globals.num_retries+1):
281
log.Info("Listing %s" % self.straight_url)
283
return self._list_filenames_in_bucket()
285
log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
286
"" % (self.straight_url,
288
e.__class__.__name__,
290
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
291
log.Warn("Giving up trying to list %s after %d attempts" %
292
(self.straight_url, globals.num_retries))
293
raise BackendException("Error listng %s" % self.straight_url)
295
def _list_filenames_in_bucket(self):
296
# We add a 'd' to the prefix to make sure it is not null (for boto) and
297
# to optimize the listing of our filenames, which always begin with 'd'.
298
# This will cause a failure in the regression tests as below:
299
# FAIL: Test basic backend operations
301
# AssertionError: Got list: []
302
# Wanted: ['testfile']
303
# Because of the need for this optimization, it should be left as is.
304
#for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
306
for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
308
filename = k.key.replace(self.key_prefix, '', 1)
309
filename_list.append(filename)
310
log.Debug("Listed %s/%s" % (self.straight_url, filename))
311
except AttributeError:
315
def delete(self, filename_list):
316
for filename in filename_list:
317
self.bucket.delete_key(self.key_prefix + filename)
318
log.Debug("Deleted %s/%s" % (self.straight_url, filename))
321
def _query_file_info(self, filename, raise_errors=False):
323
key = self.bucket.lookup(self.key_prefix + filename)
326
return {'size': key.size}
328
log.Warn("Query %s/%s failed: %s"
329
"" % (self.straight_url,
332
self.resetConnection()
336
return {'size': None}
338
def upload(self, filename, key, headers=None):
339
chunk_size = globals.s3_multipart_chunk_size
341
# Check minimum chunk size for S3
342
if chunk_size < globals.s3_multipart_minimum_chunk_size:
343
log.Warn("Minimum chunk size is %d, but %d specified." % (
344
globals.s3_multipart_minimum_chunk_size, chunk_size))
345
chunk_size = globals.s3_multipart_minimum_chunk_size
347
# Decide in how many chunks to upload
348
bytes = os.path.getsize(filename)
349
if bytes < chunk_size:
352
chunks = bytes / chunk_size
353
if (bytes % chunk_size):
356
log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks))
358
mp = self.bucket.initiate_multipart_upload(key, headers)
360
pool = multiprocessing.Pool(processes=chunks)
361
for n in range(chunks):
363
'scheme': self.scheme,
364
'url': self.parsed_url,
365
'bucket_name': self.bucket_name,
366
'multipart_id': mp.id,
367
'filename': filename,
370
'num_retries': globals.num_retries,
372
pool.apply_async(multipart_upload_worker, kwds=params)
376
if len(mp.get_all_parts()) < chunks:
378
raise BackendException("Multipart upload failed. Aborted.")
380
return mp.complete_upload()
383
def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
384
offset, bytes, num_retries):
386
Worker method for uploading a file chunk to S3 using multipart upload.
387
Note that the file chunk is read into memory, so it's important to keep
388
this number reasonably small.
392
def _upload_callback(uploaded, total):
393
worker_name = multiprocessing.current_process().name
394
log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
396
def _upload(num_retries):
397
worker_name = multiprocessing.current_process().name
398
log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
400
conn = get_connection(scheme, parsed_url)
401
bucket = conn.lookup(bucket_name)
403
for mp in bucket.get_all_multipart_uploads():
404
if mp.id == multipart_id:
405
with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
406
mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback)
409
traceback.print_exc()
411
log.Debug("%s: Upload of chunk %d failed. Retrying %d more times..." % (
412
worker_name, offset + 1, num_retries - 1))
413
return _upload(num_retries - 1)
414
log.Debug("%s: Upload of chunk %d failed. Aborting..." % (
415
worker_name, offset + 1))
417
log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
419
return _upload(num_retries)
421
duplicity.backend.register_backend("s3", BotoBackend)
422
duplicity.backend.register_backend("s3+http", BotoBackend)