~ubuntu-branches/ubuntu/trusty/duplicity/trusty

« back to all changes in this revision

Viewing changes to duplicity/backends/_boto_multi.py

  • Committer: Package Import Robot
  • Author(s): Michael Terry
  • Date: 2011-12-06 14:15:01 UTC
  • mfrom: (1.9.4)
  • Revision ID: package-import@ubuntu.com-20111206141501-nvfaaauqivpwyb7f
Tags: 0.6.17-0ubuntu1
* New upstream release
* debian/patches/06_use_passphrase.dpatch,
  debian/patches/07_large_rackspace_list.dpatch,
  debian/patches/08_check_volumes.dpatch:
  - Dropped, applied upstream
* debian/rules:
  - Run new upstream test suite during build
* debian/control:
  - Add rdiff as a build-dep to run above test suite
* debian/patches/06testfixes.dpatch:
  - Fix a few tests to not fail erroneously
* debian/patches/07fixincresume.dpatch:
  - Fix a bug with resuming an incremental backup that would result in
    a bogus error.  Also patches in a test for it.
* debian/tests/full-cycle-local:
  - New DEP-8 test script that backs up locally, restores, and checks files
* debian/tests/full-cycle-u1:
  - New DEP-8 test script that does the same as above, but to Ubuntu One
* debian/tests/control:
  - Start of DEP-8 test suite.  Only enable above full-cycle-local test
    for automatic execution.  The other is for manual testing right now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
 
2
#
 
3
# Copyright 2002 Ben Escoto <ben@emerose.org>
 
4
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
 
5
# Copyright 2011 Henrique Carvalho Alves <hcarvalhoalves@gmail.com>
 
6
#
 
7
# This file is part of duplicity.
 
8
#
 
9
# Duplicity is free software; you can redistribute it and/or modify it
 
10
# under the terms of the GNU General Public License as published by the
 
11
# Free Software Foundation; either version 2 of the License, or (at your
 
12
# option) any later version.
 
13
#
 
14
# Duplicity is distributed in the hope that it will be useful, but
 
15
# WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
17
# General Public License for more details.
 
18
#
 
19
# You should have received a copy of the GNU General Public License
 
20
# along with duplicity; if not, write to the Free Software Foundation,
 
21
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
22
 
 
23
import os
 
24
import sys
 
25
import time
 
26
 
 
27
import duplicity.backend
 
28
 
 
29
from duplicity import globals
 
30
from duplicity import log
 
31
from duplicity.errors import * #@UnusedWildImport
 
32
from duplicity.util import exception_traceback
 
33
from duplicity.backend import retry
 
34
from duplicity.filechunkio import FileChunkIO
 
35
 
 
36
BOTO_MIN_VERSION = "1.6a"
 
37
 
 
38
# Multiprocessing is not supported on *BSD
 
39
if sys.platform not in ('darwin', 'linux2'):
 
40
    from multiprocessing import dummy as multiprocessing
 
41
    log.Debug('Multiprocessing is not supported on %s, will use threads instead.' % sys.platform)
 
42
else:
 
43
    import multiprocessing
 
44
 
 
45
 
 
46
def get_connection(scheme, parsed_url):
 
47
    try:
 
48
        import boto
 
49
        assert boto.Version >= BOTO_MIN_VERSION
 
50
 
 
51
        from boto.s3.connection import S3Connection
 
52
        assert hasattr(S3Connection, 'lookup')
 
53
 
 
54
        # Newer versions of boto default to using
 
55
        # virtual hosting for buckets as a result of
 
56
        # upstream deprecation of the old-style access
 
57
        # method by Amazon S3. This change is not
 
58
        # backwards compatible (in particular with
 
59
        # respect to upper case characters in bucket
 
60
        # names); so we default to forcing use of the
 
61
        # old-style method unless the user has
 
62
        # explicitly asked us to use new-style bucket
 
63
        # access.
 
64
        #
 
65
        # Note that if the user wants to use new-style
 
66
        # buckets, we use the subdomain calling form
 
67
        # rather than given the option of both
 
68
        # subdomain and vhost. The reason being that
 
69
        # anything addressable as a vhost, is also
 
70
        # addressable as a subdomain. Seeing as the
 
71
        # latter is mostly a convenience method of
 
72
        # allowing browse:able content semi-invisibly
 
73
        # being hosted on S3, the former format makes
 
74
        # a lot more sense for us to use - being
 
75
        # explicit about what is happening (the fact
 
76
        # that we are talking to S3 servers).
 
77
 
 
78
        try:
 
79
            from boto.s3.connection import OrdinaryCallingFormat
 
80
            from boto.s3.connection import SubdomainCallingFormat
 
81
            cfs_supported = True
 
82
            calling_format = OrdinaryCallingFormat()
 
83
        except ImportError:
 
84
            cfs_supported = False
 
85
            calling_format = None
 
86
 
 
87
        if globals.s3_use_new_style:
 
88
            if cfs_supported:
 
89
                calling_format = SubdomainCallingFormat()
 
90
            else:
 
91
                log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
 
92
                               "requested, but does not seem to be supported by the "
 
93
                               "boto library. Either you need to upgrade your boto "
 
94
                               "library or duplicity has failed to correctly detect "
 
95
                               "the appropriate support.",
 
96
                               log.ErrorCode.boto_old_style)
 
97
        else:
 
98
            if cfs_supported:
 
99
                calling_format = OrdinaryCallingFormat()
 
100
            else:
 
101
                calling_format = None
 
102
 
 
103
    except ImportError:
 
104
        log.FatalError("This backend (s3) requires boto library, version %s or later, "
 
105
                       "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
 
106
                       log.ErrorCode.boto_lib_too_old)
 
107
 
 
108
    if scheme == 's3+http':
 
109
        # Use the default Amazon S3 host.
 
110
        conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
 
111
    else:
 
112
        assert scheme == 's3'
 
113
        conn = S3Connection(
 
114
            host = parsed_url.hostname,
 
115
            is_secure=(not globals.s3_unencrypted_connection))
 
116
 
 
117
    if hasattr(conn, 'calling_format'):
 
118
        if calling_format is None:
 
119
            log.FatalError("It seems we previously failed to detect support for calling "
 
120
                           "formats in the boto library, yet the support is there. This is "
 
121
                           "almost certainly a duplicity bug.",
 
122
                           log.ErrorCode.boto_calling_format)
 
123
        else:
 
124
            conn.calling_format = calling_format
 
125
 
 
126
    else:
 
127
        # Duplicity hangs if boto gets a null bucket name.
 
128
        # HC: Caught a socket error, trying to recover
 
129
        raise BackendException('Boto requires a bucket name.')
 
130
    return conn
 
131
 
 
132
 
 
133
class BotoBackend(duplicity.backend.Backend):
 
134
    """
 
135
    Backend for Amazon's Simple Storage System, (aka Amazon S3), though
 
136
    the use of the boto module, (http://code.google.com/p/boto/).
 
137
 
 
138
    To make use of this backend you must set aws_access_key_id
 
139
    and aws_secret_access_key in your ~/.boto or /etc/boto.cfg
 
140
    with your Amazon Web Services key id and secret respectively.
 
141
    Alternatively you can export the environment variables
 
142
    AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
 
143
    """
 
144
 
 
145
    def __init__(self, parsed_url):
 
146
        duplicity.backend.Backend.__init__(self, parsed_url)
 
147
 
 
148
        from boto.s3.key import Key
 
149
        from boto.s3.multipart import MultiPartUpload
 
150
 
 
151
        # This folds the null prefix and all null parts, which means that:
 
152
        #  //MyBucket/ and //MyBucket are equivalent.
 
153
        #  //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
 
154
        self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
 
155
 
 
156
        if self.url_parts:
 
157
            self.bucket_name = self.url_parts.pop(0)
 
158
        else:
 
159
            # Duplicity hangs if boto gets a null bucket name.
 
160
            # HC: Caught a socket error, trying to recover
 
161
            raise BackendException('Boto requires a bucket name.')
 
162
 
 
163
        self.scheme = parsed_url.scheme
 
164
 
 
165
        self.key_class = Key
 
166
 
 
167
        if self.url_parts:
 
168
            self.key_prefix = '%s/' % '/'.join(self.url_parts)
 
169
        else:
 
170
            self.key_prefix = ''
 
171
 
 
172
        self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
 
173
        self.parsed_url = parsed_url
 
174
        self.resetConnection()
 
175
 
 
176
    def resetConnection(self):
 
177
        self.bucket = None
 
178
        self.conn = get_connection(self.scheme, self.parsed_url)
 
179
        self.bucket = self.conn.lookup(self.bucket_name)
 
180
 
 
181
    def put(self, source_path, remote_filename=None):
 
182
        from boto.s3.connection import Location
 
183
        if globals.s3_european_buckets:
 
184
            if not globals.s3_use_new_style:
 
185
                log.FatalError("European bucket creation was requested, but not new-style "
 
186
                               "bucket addressing (--s3-use-new-style)",
 
187
                               log.ErrorCode.s3_bucket_not_style)
 
188
        #Network glitch may prevent first few attempts of creating/looking up a bucket
 
189
        for n in range(1, globals.num_retries+1):
 
190
            if self.bucket:
 
191
                break
 
192
            if n > 1:
 
193
                time.sleep(30)
 
194
            try:
 
195
                try:
 
196
                    self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
 
197
                except Exception, e:
 
198
                    if "NoSuchBucket" in str(e):
 
199
                        if globals.s3_european_buckets:
 
200
                            self.bucket = self.conn.create_bucket(self.bucket_name,
 
201
                                                                  location=Location.EU)
 
202
                        else:
 
203
                            self.bucket = self.conn.create_bucket(self.bucket_name)
 
204
                    else:
 
205
                        raise e
 
206
            except Exception, e:
 
207
                log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
 
208
                         "" % (n, self.bucket_name,
 
209
                               e.__class__.__name__,
 
210
                               str(e)))
 
211
                self.resetConnection()
 
212
 
 
213
        if not remote_filename:
 
214
            remote_filename = source_path.get_filename()
 
215
        key = self.key_prefix + remote_filename
 
216
        for n in range(1, globals.num_retries+1):
 
217
            if n > 1:
 
218
                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
 
219
                time.sleep(10)
 
220
 
 
221
            if globals.s3_use_rrs:
 
222
                storage_class = 'REDUCED_REDUNDANCY'
 
223
            else:
 
224
                storage_class = 'STANDARD'
 
225
            log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
 
226
            try:
 
227
                headers = {
 
228
                    'Content-Type': 'application/octet-stream',
 
229
                    'x-amz-storage-class': storage_class
 
230
                }
 
231
                self.upload(source_path.name, key, headers)
 
232
                self.resetConnection()
 
233
                return
 
234
            except Exception, e:
 
235
                log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
 
236
                         "" % (self.straight_url,
 
237
                               remote_filename,
 
238
                               n,
 
239
                               e.__class__.__name__,
 
240
                               str(e)))
 
241
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
 
242
                self.resetConnection()
 
243
        log.Warn("Giving up trying to upload %s/%s after %d attempts" %
 
244
                 (self.straight_url, remote_filename, globals.num_retries))
 
245
        raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
 
246
 
 
247
    def get(self, remote_filename, local_path):
 
248
        key = self.key_class(self.bucket)
 
249
        key.key = self.key_prefix + remote_filename
 
250
        for n in range(1, globals.num_retries+1):
 
251
            if n > 1:
 
252
                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
 
253
                time.sleep(10)
 
254
            log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
 
255
            try:
 
256
                key.get_contents_to_filename(local_path.name)
 
257
                local_path.setdata()
 
258
                self.resetConnection()
 
259
                return
 
260
            except Exception, e:
 
261
                log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
 
262
                         "" % (self.straight_url,
 
263
                               remote_filename,
 
264
                               n,
 
265
                               e.__class__.__name__,
 
266
                               str(e)), 1)
 
267
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
 
268
                self.resetConnection()
 
269
        log.Warn("Giving up trying to download %s/%s after %d attempts" %
 
270
                (self.straight_url, remote_filename, globals.num_retries))
 
271
        raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
 
272
 
 
273
    def list(self):
 
274
        if not self.bucket:
 
275
            return []
 
276
 
 
277
        for n in range(1, globals.num_retries+1):
 
278
            if n > 1:
 
279
                # sleep before retry
 
280
                time.sleep(30)
 
281
            log.Info("Listing %s" % self.straight_url)
 
282
            try:
 
283
                return self._list_filenames_in_bucket()
 
284
            except Exception, e:
 
285
                log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
 
286
                         "" % (self.straight_url,
 
287
                               n,
 
288
                               e.__class__.__name__,
 
289
                               str(e)), 1)
 
290
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
 
291
        log.Warn("Giving up trying to list %s after %d attempts" %
 
292
                (self.straight_url, globals.num_retries))
 
293
        raise BackendException("Error listng %s" % self.straight_url)
 
294
 
 
295
    def _list_filenames_in_bucket(self):
 
296
        # We add a 'd' to the prefix to make sure it is not null (for boto) and
 
297
        # to optimize the listing of our filenames, which always begin with 'd'.
 
298
        # This will cause a failure in the regression tests as below:
 
299
        #   FAIL: Test basic backend operations
 
300
        #   <tracback snipped>
 
301
        #   AssertionError: Got list: []
 
302
        #   Wanted: ['testfile']
 
303
        # Because of the need for this optimization, it should be left as is.
 
304
        #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
 
305
        filename_list = []
 
306
        for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
 
307
            try:
 
308
                filename = k.key.replace(self.key_prefix, '', 1)
 
309
                filename_list.append(filename)
 
310
                log.Debug("Listed %s/%s" % (self.straight_url, filename))
 
311
            except AttributeError:
 
312
                pass
 
313
        return filename_list
 
314
 
 
315
    def delete(self, filename_list):
 
316
        for filename in filename_list:
 
317
            self.bucket.delete_key(self.key_prefix + filename)
 
318
            log.Debug("Deleted %s/%s" % (self.straight_url, filename))
 
319
 
 
320
    @retry
 
321
    def _query_file_info(self, filename, raise_errors=False):
 
322
        try:
 
323
            key = self.bucket.lookup(self.key_prefix + filename)
 
324
            if key is None:
 
325
                return {'size': -1}
 
326
            return {'size': key.size}
 
327
        except Exception, e:
 
328
            log.Warn("Query %s/%s failed: %s"
 
329
                     "" % (self.straight_url,
 
330
                           filename,
 
331
                           str(e)))
 
332
            self.resetConnection()
 
333
            if raise_errors:
 
334
                raise e
 
335
            else:
 
336
                return {'size': None}
 
337
 
 
338
    def upload(self, filename, key, headers=None):
 
339
        chunk_size = globals.s3_multipart_chunk_size
 
340
 
 
341
        # Check minimum chunk size for S3
 
342
        if chunk_size < globals.s3_multipart_minimum_chunk_size:
 
343
            log.Warn("Minimum chunk size is %d, but %d specified." % (
 
344
                globals.s3_multipart_minimum_chunk_size, chunk_size))
 
345
            chunk_size = globals.s3_multipart_minimum_chunk_size
 
346
 
 
347
        # Decide in how many chunks to upload
 
348
        bytes = os.path.getsize(filename)
 
349
        if bytes < chunk_size:
 
350
            chunks = 1
 
351
        else:
 
352
            chunks = bytes / chunk_size
 
353
            if (bytes % chunk_size):
 
354
                chunks += 1
 
355
 
 
356
        log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks))
 
357
 
 
358
        mp = self.bucket.initiate_multipart_upload(key, headers)
 
359
 
 
360
        pool = multiprocessing.Pool(processes=chunks)
 
361
        for n in range(chunks):
 
362
            params = {
 
363
                'scheme': self.scheme,
 
364
                'url': self.parsed_url,
 
365
                'bucket_name': self.bucket_name,
 
366
                'multipart_id': mp.id,
 
367
                'filename': filename,
 
368
                'offset': n,
 
369
                'bytes': chunk_size,
 
370
                'num_retries': globals.num_retries,
 
371
            }
 
372
            pool.apply_async(multipart_upload_worker, kwds=params)
 
373
        pool.close()
 
374
        pool.join()
 
375
 
 
376
        if len(mp.get_all_parts()) < chunks:
 
377
            mp.cancel_upload()
 
378
            raise BackendException("Multipart upload failed. Aborted.")
 
379
 
 
380
        return mp.complete_upload()
 
381
 
 
382
 
 
383
def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
 
384
                            offset, bytes, num_retries):
 
385
    """
 
386
    Worker method for uploading a file chunk to S3 using multipart upload.
 
387
    Note that the file chunk is read into memory, so it's important to keep
 
388
    this number reasonably small.
 
389
    """
 
390
    import traceback
 
391
 
 
392
    def _upload_callback(uploaded, total):
 
393
        worker_name = multiprocessing.current_process().name
 
394
        log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
 
395
 
 
396
    def _upload(num_retries):
 
397
        worker_name = multiprocessing.current_process().name
 
398
        log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
 
399
        try:
 
400
            conn = get_connection(scheme, parsed_url)
 
401
            bucket = conn.lookup(bucket_name)
 
402
 
 
403
            for mp in bucket.get_all_multipart_uploads():
 
404
                if mp.id == multipart_id:
 
405
                    with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
 
406
                        mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback)
 
407
                    break
 
408
        except Exception, e:
 
409
            traceback.print_exc()
 
410
            if num_retries:
 
411
                log.Debug("%s: Upload of chunk %d failed. Retrying %d more times..." % (
 
412
                    worker_name, offset + 1, num_retries - 1))
 
413
                return _upload(num_retries - 1)
 
414
            log.Debug("%s: Upload of chunk %d failed. Aborting..." % (
 
415
                worker_name, offset + 1))
 
416
            raise e
 
417
        log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
 
418
 
 
419
    return _upload(num_retries)
 
420
 
 
421
duplicity.backend.register_backend("s3", BotoBackend)
 
422
duplicity.backend.register_backend("s3+http", BotoBackend)