~mterry/duplicity/drop-pexpect

« back to all changes in this revision

Viewing changes to duplicity/backends/_boto_multi.py

  • Committer: Kenneth Loafman
  • Date: 2014-02-26 19:58:06 UTC
  • mfrom: (919.5.15 duplicity)
  • Revision ID: kenneth@loafman.com-20140226195806-cc9k8f94ag9wb4ul
* Merged in lp:~prateek/duplicity/s3-glacier
  - Fixes https://bugs.launchpad.net/duplicity/+bug/1039511
    - Adds support to detect when a file is on Glacier and initiates a restore
      to S3. Also merges overlapping code in the boto backends
  - Fixes https://bugs.launchpad.net/duplicity/+bug/1243246
    - Adds a --s3_multipart_max_timeout input option to limit the max execution
      time of a chunked upload to S3. Also adds debug message to calculate
      upload speed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
import os
24
24
import sys
25
 
import time
26
25
import threading
27
26
import Queue
28
 
 
29
 
import duplicity.backend
 
27
import time
 
28
import traceback
30
29
 
31
30
from duplicity import globals
32
31
from duplicity import log
33
32
from duplicity.errors import * #@UnusedWildImport
34
 
from duplicity.util import exception_traceback
35
 
from duplicity.backend import retry
36
33
from duplicity.filechunkio import FileChunkIO
37
34
from duplicity import progress
38
35
 
 
36
from _boto_single import BotoBackend as BotoSingleBackend
 
37
from _boto_single import get_connection
 
38
 
39
39
BOTO_MIN_VERSION = "2.1.1"
40
40
 
41
41
# Multiprocessing is not supported on *BSD
61
61
    def run(self):
62
62
        while not self.finish:
63
63
            try:
64
 
                args = self.queue.get(True, 1) 
 
64
                args = self.queue.get(True, 1)
65
65
                progress.report_transfer(args[0], args[1])
66
66
            except Queue.Empty, e:
67
67
                pass
68
 
            
69
 
 
70
 
def get_connection(scheme, parsed_url):
71
 
    try:
72
 
        import boto
73
 
        assert boto.Version >= BOTO_MIN_VERSION
74
 
 
75
 
        from boto.s3.connection import S3Connection
76
 
        assert hasattr(S3Connection, 'lookup')
77
 
 
78
 
        # Newer versions of boto default to using
79
 
        # virtual hosting for buckets as a result of
80
 
        # upstream deprecation of the old-style access
81
 
        # method by Amazon S3. This change is not
82
 
        # backwards compatible (in particular with
83
 
        # respect to upper case characters in bucket
84
 
        # names); so we default to forcing use of the
85
 
        # old-style method unless the user has
86
 
        # explicitly asked us to use new-style bucket
87
 
        # access.
88
 
        #
89
 
        # Note that if the user wants to use new-style
90
 
        # buckets, we use the subdomain calling form
91
 
        # rather than given the option of both
92
 
        # subdomain and vhost. The reason being that
93
 
        # anything addressable as a vhost, is also
94
 
        # addressable as a subdomain. Seeing as the
95
 
        # latter is mostly a convenience method of
96
 
        # allowing browse:able content semi-invisibly
97
 
        # being hosted on S3, the former format makes
98
 
        # a lot more sense for us to use - being
99
 
        # explicit about what is happening (the fact
100
 
        # that we are talking to S3 servers).
101
 
 
102
 
        try:
103
 
            from boto.s3.connection import OrdinaryCallingFormat
104
 
            from boto.s3.connection import SubdomainCallingFormat
105
 
            cfs_supported = True
106
 
            calling_format = OrdinaryCallingFormat()
107
 
        except ImportError:
108
 
            cfs_supported = False
109
 
            calling_format = None
110
 
 
111
 
        if globals.s3_use_new_style:
112
 
            if cfs_supported:
113
 
                calling_format = SubdomainCallingFormat()
114
 
            else:
115
 
                log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
116
 
                               "requested, but does not seem to be supported by the "
117
 
                               "boto library. Either you need to upgrade your boto "
118
 
                               "library or duplicity has failed to correctly detect "
119
 
                               "the appropriate support.",
120
 
                               log.ErrorCode.boto_old_style)
121
 
        else:
122
 
            if cfs_supported:
123
 
                calling_format = OrdinaryCallingFormat()
124
 
            else:
125
 
                calling_format = None
126
 
 
127
 
    except ImportError:
128
 
        log.FatalError("This backend (s3) requires boto library, version %s or later, "
129
 
                       "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
130
 
                       log.ErrorCode.boto_lib_too_old)
131
 
 
132
 
    if scheme == 's3+http':
133
 
        # Use the default Amazon S3 host.
134
 
        conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
135
 
    else:
136
 
        assert scheme == 's3'
137
 
        conn = S3Connection(
138
 
            host = parsed_url.hostname,
139
 
            is_secure=(not globals.s3_unencrypted_connection))
140
 
 
141
 
    if hasattr(conn, 'calling_format'):
142
 
        if calling_format is None:
143
 
            log.FatalError("It seems we previously failed to detect support for calling "
144
 
                           "formats in the boto library, yet the support is there. This is "
145
 
                           "almost certainly a duplicity bug.",
146
 
                           log.ErrorCode.boto_calling_format)
147
 
        else:
148
 
            conn.calling_format = calling_format
149
 
 
150
 
    else:
151
 
        # Duplicity hangs if boto gets a null bucket name.
152
 
        # HC: Caught a socket error, trying to recover
153
 
        raise BackendException('Boto requires a bucket name.')
154
 
    return conn
155
 
 
156
 
 
157
 
class BotoBackend(duplicity.backend.Backend):
 
68
 
 
69
 
 
70
class BotoBackend(BotoSingleBackend):
158
71
    """
159
72
    Backend for Amazon's Simple Storage System, (aka Amazon S3), though
160
73
    the use of the boto module, (http://code.google.com/p/boto/).
167
80
    """
168
81
 
169
82
    def __init__(self, parsed_url):
170
 
        duplicity.backend.Backend.__init__(self, parsed_url)
171
 
 
172
 
        from boto.s3.key import Key
173
 
        from boto.s3.multipart import MultiPartUpload
174
 
 
175
 
        # This folds the null prefix and all null parts, which means that:
176
 
        #  //MyBucket/ and //MyBucket are equivalent.
177
 
        #  //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
178
 
        self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
179
 
 
180
 
        if self.url_parts:
181
 
            self.bucket_name = self.url_parts.pop(0)
182
 
        else:
183
 
            # Duplicity hangs if boto gets a null bucket name.
184
 
            # HC: Caught a socket error, trying to recover
185
 
            raise BackendException('Boto requires a bucket name.')
186
 
 
187
 
        self.scheme = parsed_url.scheme
188
 
 
189
 
        self.key_class = Key
190
 
 
191
 
        if self.url_parts:
192
 
            self.key_prefix = '%s/' % '/'.join(self.url_parts)
193
 
        else:
194
 
            self.key_prefix = ''
195
 
 
196
 
        self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
197
 
        self.parsed_url = parsed_url
198
 
        self.resetConnection()
199
 
 
200
 
    def resetConnection(self):
201
 
        self.bucket = None
202
 
        self.conn = get_connection(self.scheme, self.parsed_url)
203
 
        self.bucket = self.conn.lookup(self.bucket_name)
204
 
 
205
 
    def put(self, source_path, remote_filename=None):
206
 
        from boto.s3.connection import Location
207
 
        if globals.s3_european_buckets:
208
 
            if not globals.s3_use_new_style:
209
 
                log.FatalError("European bucket creation was requested, but not new-style "
210
 
                               "bucket addressing (--s3-use-new-style)",
211
 
                               log.ErrorCode.s3_bucket_not_style)
212
 
        #Network glitch may prevent first few attempts of creating/looking up a bucket
213
 
        for n in range(1, globals.num_retries+1):
214
 
            if self.bucket:
215
 
                break
216
 
            if n > 1:
217
 
                time.sleep(30)
218
 
            try:
219
 
                try:
220
 
                    self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
221
 
                except Exception, e:
222
 
                    if "NoSuchBucket" in str(e):
223
 
                        if globals.s3_european_buckets:
224
 
                            self.bucket = self.conn.create_bucket(self.bucket_name,
225
 
                                                                  location=Location.EU)
226
 
                        else:
227
 
                            self.bucket = self.conn.create_bucket(self.bucket_name)
228
 
                    else:
229
 
                        raise e
230
 
            except Exception, e:
231
 
                log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
232
 
                         "" % (n, self.bucket_name,
233
 
                               e.__class__.__name__,
234
 
                               str(e)))
235
 
                self.resetConnection()
236
 
 
237
 
        if not remote_filename:
238
 
            remote_filename = source_path.get_filename()
239
 
        key = self.key_prefix + remote_filename
240
 
        for n in range(1, globals.num_retries+1):
241
 
            if n > 1:
242
 
                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
243
 
                time.sleep(10)
244
 
 
245
 
            if globals.s3_use_rrs:
246
 
                storage_class = 'REDUCED_REDUNDANCY'
247
 
            else:
248
 
                storage_class = 'STANDARD'
249
 
            log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
250
 
            try:
251
 
                headers = {
252
 
                    'Content-Type': 'application/octet-stream',
253
 
                    'x-amz-storage-class': storage_class
254
 
                }
255
 
                self.upload(source_path.name, key, headers)
256
 
                self.resetConnection()
257
 
                return
258
 
            except Exception, e:
259
 
                log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
260
 
                         "" % (self.straight_url,
261
 
                               remote_filename,
262
 
                               n,
263
 
                               e.__class__.__name__,
264
 
                               str(e)))
265
 
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
266
 
                self.resetConnection()
267
 
        log.Warn("Giving up trying to upload %s/%s after %d attempts" %
268
 
                 (self.straight_url, remote_filename, globals.num_retries))
269
 
        raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
270
 
 
271
 
    def get(self, remote_filename, local_path):
272
 
        key = self.key_class(self.bucket)
273
 
        key.key = self.key_prefix + remote_filename
274
 
        for n in range(1, globals.num_retries+1):
275
 
            if n > 1:
276
 
                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
277
 
                time.sleep(10)
278
 
            log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
279
 
            try:
280
 
                key.get_contents_to_filename(local_path.name)
281
 
                local_path.setdata()
282
 
                self.resetConnection()
283
 
                return
284
 
            except Exception, e:
285
 
                log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
286
 
                         "" % (self.straight_url,
287
 
                               remote_filename,
288
 
                               n,
289
 
                               e.__class__.__name__,
290
 
                               str(e)), 1)
291
 
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
292
 
                self.resetConnection()
293
 
        log.Warn("Giving up trying to download %s/%s after %d attempts" %
294
 
                (self.straight_url, remote_filename, globals.num_retries))
295
 
        raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
296
 
 
297
 
    def _list(self):
298
 
        if not self.bucket:
299
 
            raise BackendException("No connection to backend")
300
 
 
301
 
        for n in range(1, globals.num_retries+1):
302
 
            if n > 1:
303
 
                # sleep before retry
304
 
                time.sleep(30)
305
 
            log.Info("Listing %s" % self.straight_url)
306
 
            try:
307
 
                return self._list_filenames_in_bucket()
308
 
            except Exception, e:
309
 
                log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
310
 
                         "" % (self.straight_url,
311
 
                               n,
312
 
                               e.__class__.__name__,
313
 
                               str(e)), 1)
314
 
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
315
 
        log.Warn("Giving up trying to list %s after %d attempts" %
316
 
                (self.straight_url, globals.num_retries))
317
 
        raise BackendException("Error listng %s" % self.straight_url)
318
 
 
319
 
    def _list_filenames_in_bucket(self):
320
 
        # We add a 'd' to the prefix to make sure it is not null (for boto) and
321
 
        # to optimize the listing of our filenames, which always begin with 'd'.
322
 
        # This will cause a failure in the regression tests as below:
323
 
        #   FAIL: Test basic backend operations
324
 
        #   <tracback snipped>
325
 
        #   AssertionError: Got list: []
326
 
        #   Wanted: ['testfile']
327
 
        # Because of the need for this optimization, it should be left as is.
328
 
        #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
329
 
        filename_list = []
330
 
        for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
331
 
            try:
332
 
                filename = k.key.replace(self.key_prefix, '', 1)
333
 
                filename_list.append(filename)
334
 
                log.Debug("Listed %s/%s" % (self.straight_url, filename))
335
 
            except AttributeError:
336
 
                pass
337
 
        return filename_list
338
 
 
339
 
    def delete(self, filename_list):
340
 
        for filename in filename_list:
341
 
            self.bucket.delete_key(self.key_prefix + filename)
342
 
            log.Debug("Deleted %s/%s" % (self.straight_url, filename))
343
 
 
344
 
    @retry
345
 
    def _query_file_info(self, filename, raise_errors=False):
346
 
        try:
347
 
            key = self.bucket.lookup(self.key_prefix + filename)
348
 
            if key is None:
349
 
                return {'size': -1}
350
 
            return {'size': key.size}
351
 
        except Exception, e:
352
 
            log.Warn("Query %s/%s failed: %s"
353
 
                     "" % (self.straight_url,
354
 
                           filename,
355
 
                           str(e)))
356
 
            self.resetConnection()
357
 
            if raise_errors:
358
 
                raise e
359
 
            else:
360
 
                return {'size': None}
 
83
        BotoSingleBackend.__init__(self, parsed_url)
 
84
        self._setup_pool()
 
85
 
 
86
    def _setup_pool(self):
 
87
        number_of_procs = globals.s3_multipart_max_procs
 
88
        if not number_of_procs:
 
89
            number_of_procs = multiprocessing.cpu_count()
 
90
 
 
91
        if getattr(self, '_pool', False):
 
92
            log.Debug("A process pool already exists. Destroying previous pool.")
 
93
            self._pool.terminate()
 
94
            self._pool.join()
 
95
            self._pool = None
 
96
 
 
97
        log.Debug("Setting multipart boto backend process pool to %d processes" % number_of_procs)
 
98
 
 
99
        self._pool = multiprocessing.Pool(processes=number_of_procs)
 
100
 
 
101
    def close(self):
 
102
        BotoSingleBackend.close(self)
 
103
        log.Debug("Closing pool")
 
104
        self._pool.terminate()
 
105
        self._pool.join()
361
106
 
362
107
    def upload(self, filename, key, headers=None):
 
108
        import boto
363
109
        chunk_size = globals.s3_multipart_chunk_size
364
110
 
365
111
        # Check minimum chunk size for S3
379
125
 
380
126
        log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks))
381
127
 
382
 
        mp = self.bucket.initiate_multipart_upload(key, headers)
 
128
        mp = self.bucket.initiate_multipart_upload(key.key, headers)
383
129
 
384
130
        # Initiate a queue to share progress data between the pool
385
131
        # workers and a consumer thread, that will collect and report
389
135
            queue = manager.Queue()
390
136
            consumer = ConsumerThread(queue)
391
137
            consumer.start()
392
 
 
393
 
        pool = multiprocessing.Pool(processes=chunks)
 
138
        tasks = []
394
139
        for n in range(chunks):
395
 
             params = [self.scheme, self.parsed_url, self.bucket_name, 
396
 
                 mp.id, filename, n, chunk_size, globals.num_retries, 
397
 
                 queue]
398
 
             pool.apply_async(multipart_upload_worker, params)
399
 
        pool.close()
400
 
        pool.join()
 
140
            storage_uri = boto.storage_uri(self.boto_uri_str)
 
141
            params = [self.scheme, self.parsed_url, storage_uri, self.bucket_name,
 
142
                      mp.id, filename, n, chunk_size, globals.num_retries,
 
143
                      queue]
 
144
            tasks.append(self._pool.apply_async(multipart_upload_worker, params))
 
145
 
 
146
        log.Debug("Waiting for the pool to finish processing %s tasks" % len(tasks))
 
147
        while tasks:
 
148
            try:
 
149
                tasks[0].wait(timeout=globals.s3_multipart_max_timeout)
 
150
                if tasks[0].ready():
 
151
                    if tasks[0].successful():
 
152
                        del tasks[0]
 
153
                    else:
 
154
                        log.Debug("Part upload not successful, aborting multipart upload.")
 
155
                        self._setup_pool()
 
156
                        break
 
157
                else:
 
158
                    raise multiprocessing.TimeoutError
 
159
            except multiprocessing.TimeoutError:
 
160
                log.Debug("%s tasks did not finish by the specified timeout, aborting multipart upload and resetting pool." % len(tasks))
 
161
                self._setup_pool()
 
162
                break
 
163
 
 
164
        log.Debug("Done waiting for the pool to finish processing")
401
165
 
402
166
        # Terminate the consumer thread, if any
403
167
        if globals.progress:
404
168
            consumer.finish = True
405
169
            consumer.join()
406
170
 
407
 
        if len(mp.get_all_parts()) < chunks:
 
171
        if len(tasks) > 0 or len(mp.get_all_parts()) < chunks:
408
172
            mp.cancel_upload()
409
173
            raise BackendException("Multipart upload failed. Aborted.")
410
174
 
411
175
        return mp.complete_upload()
412
176
 
413
177
 
414
 
def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
415
 
                            offset, bytes, num_retries, queue):
 
178
def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
 
179
                            filename, offset, bytes, num_retries, queue):
416
180
    """
417
181
    Worker method for uploading a file chunk to S3 using multipart upload.
418
182
    Note that the file chunk is read into memory, so it's important to keep
419
183
    this number reasonably small.
420
184
    """
421
 
    import traceback
422
185
 
423
186
    def _upload_callback(uploaded, total):
424
187
        worker_name = multiprocessing.current_process().name
425
188
        log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
426
189
        if not queue is None:
427
 
            queue.put([uploaded, total]) # Push data to the consumer thread
 
190
            queue.put([uploaded, total])  # Push data to the consumer thread
428
191
 
429
192
    def _upload(num_retries):
430
193
        worker_name = multiprocessing.current_process().name
431
194
        log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
432
195
        try:
433
 
            conn = get_connection(scheme, parsed_url)
 
196
            conn = get_connection(scheme, parsed_url, storage_uri)
434
197
            bucket = conn.lookup(bucket_name)
435
198
 
436
 
            for mp in bucket.get_all_multipart_uploads():
 
199
            for mp in bucket.list_multipart_uploads():
437
200
                if mp.id == multipart_id:
438
201
                    with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
 
202
                        start = time.time()
439
203
                        mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
440
 
                                                    num_cb=max(2, 8 * bytes / (1024 * 1024))
441
 
                                                ) # Max num of callbacks = 8 times x megabyte
 
204
                                                 num_cb=max(2, 8 * bytes / (1024 * 1024))
 
205
                                                 )  # Max num of callbacks = 8 times x megabyte
 
206
                        end = time.time()
 
207
                        log.Debug("{name}: Uploaded chunk {chunk} at roughly {speed} bytes/second".format(name=worker_name, chunk=offset+1, speed=(bytes/max(1, abs(end-start)))))
442
208
                    break
 
209
            conn.close()
 
210
            conn = None
 
211
            bucket = None
 
212
            del conn
443
213
        except Exception, e:
444
214
            traceback.print_exc()
445
215
            if num_retries:
452
222
        log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
453
223
 
454
224
    return _upload(num_retries)
455
 
 
456
 
duplicity.backend.register_backend("s3", BotoBackend)
457
 
duplicity.backend.register_backend("s3+http", BotoBackend)