~ed.so/duplicity/lftp.ncftp.and.prefixes

« back to all changes in this revision

Viewing changes to duplicity/backends/_boto_single.py

  • Committer: Michael Terry
  • Date: 2014-04-21 19:21:45 UTC
  • mto: This revision was merged to the branch mainline in revision 981.
  • Revision ID: michael.terry@canonical.com-20140421192145-b1vlb0hppnn8jrtl
Checkpoint

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import duplicity.backend
26
26
from duplicity import globals
27
27
from duplicity import log
28
 
from duplicity.errors import * #@UnusedWildImport
29
 
from duplicity.util import exception_traceback
30
 
from duplicity.backend import retry
 
28
from duplicity.errors import FatalBackendException, BackendException
31
29
from duplicity import progress
32
30
 
33
31
BOTO_MIN_VERSION = "2.1.1"
163
161
        self.resetConnection()
164
162
        self._listed_keys = {}
165
163
 
166
 
    def close(self):
 
164
    def _close(self):
167
165
        del self._listed_keys
168
166
        self._listed_keys = {}
169
167
        self.bucket = None
185
183
        self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri)
186
184
        self.bucket = self.conn.lookup(self.bucket_name)
187
185
 
188
 
    def put(self, source_path, remote_filename=None):
 
186
    def _retry_cleanup(self):
 
187
        self.resetConnection()
 
188
 
 
189
    def _put(self, source_path, remote_filename):
189
190
        from boto.s3.connection import Location
190
191
        if globals.s3_european_buckets:
191
192
            if not globals.s3_use_new_style:
192
 
                log.FatalError("European bucket creation was requested, but not new-style "
193
 
                               "bucket addressing (--s3-use-new-style)",
194
 
                               log.ErrorCode.s3_bucket_not_style)
195
 
        #Network glitch may prevent first few attempts of creating/looking up a bucket
196
 
        for n in range(1, globals.num_retries+1):
197
 
            if self.bucket:
198
 
                break
199
 
            if n > 1:
200
 
                time.sleep(30)
201
 
                self.resetConnection()
 
193
                raise FatalBackendException("European bucket creation was requested, but not new-style "
 
194
                                            "bucket addressing (--s3-use-new-style)",
 
195
                                            code=log.ErrorCode.s3_bucket_not_style)
 
196
 
 
197
        if self.bucket is None:
202
198
            try:
203
 
                try:
204
 
                    self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
205
 
                except Exception as e:
206
 
                    if "NoSuchBucket" in str(e):
207
 
                        if globals.s3_european_buckets:
208
 
                            self.bucket = self.conn.create_bucket(self.bucket_name,
209
 
                                                                  location=Location.EU)
210
 
                        else:
211
 
                            self.bucket = self.conn.create_bucket(self.bucket_name)
 
199
                self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
 
200
            except Exception as e:
 
201
                if "NoSuchBucket" in str(e):
 
202
                    if globals.s3_european_buckets:
 
203
                        self.bucket = self.conn.create_bucket(self.bucket_name,
 
204
                                                              location=Location.EU)
212
205
                    else:
213
 
                        raise e
214
 
            except Exception as e:
215
 
                log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
216
 
                         "" % (n, self.bucket_name,
217
 
                               e.__class__.__name__,
218
 
                               str(e)))
 
206
                        self.bucket = self.conn.create_bucket(self.bucket_name)
 
207
                else:
 
208
                    raise
219
209
 
220
 
        if not remote_filename:
221
 
            remote_filename = source_path.get_filename()
222
210
        key = self.bucket.new_key(self.key_prefix + remote_filename)
223
211
 
224
 
        for n in range(1, globals.num_retries+1):
225
 
            if n > 1:
226
 
                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
227
 
                time.sleep(10)
228
 
 
229
 
            if globals.s3_use_rrs:
230
 
                storage_class = 'REDUCED_REDUNDANCY'
231
 
            else:
232
 
                storage_class = 'STANDARD'
233
 
            log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
234
 
            try:
235
 
                if globals.s3_use_sse:
236
 
                    headers = {
237
 
                    'Content-Type': 'application/octet-stream',
238
 
                    'x-amz-storage-class': storage_class,
239
 
                    'x-amz-server-side-encryption': 'AES256'
240
 
                }
241
 
                else:
242
 
                    headers = {
243
 
                    'Content-Type': 'application/octet-stream',
244
 
                    'x-amz-storage-class': storage_class
245
 
                }
246
 
                
247
 
                upload_start = time.time()
248
 
                self.upload(source_path.name, key, headers)
249
 
                upload_end = time.time()
250
 
                total_s = abs(upload_end-upload_start) or 1  # prevent a zero value!
251
 
                rough_upload_speed = os.path.getsize(source_path.name)/total_s
252
 
                self.resetConnection()
253
 
                log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
254
 
                return
255
 
            except Exception as e:
256
 
                log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
257
 
                         "" % (self.straight_url,
258
 
                               remote_filename,
259
 
                               n,
260
 
                               e.__class__.__name__,
261
 
                               str(e)))
262
 
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
263
 
                self.resetConnection()
264
 
        log.Warn("Giving up trying to upload %s/%s after %d attempts" %
265
 
                 (self.straight_url, remote_filename, globals.num_retries))
266
 
        raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
267
 
 
268
 
    def get(self, remote_filename, local_path):
 
212
        if globals.s3_use_rrs:
 
213
            storage_class = 'REDUCED_REDUNDANCY'
 
214
        else:
 
215
            storage_class = 'STANDARD'
 
216
        log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
 
217
        if globals.s3_use_sse:
 
218
            headers = {
 
219
            'Content-Type': 'application/octet-stream',
 
220
            'x-amz-storage-class': storage_class,
 
221
            'x-amz-server-side-encryption': 'AES256'
 
222
        }
 
223
        else:
 
224
            headers = {
 
225
            'Content-Type': 'application/octet-stream',
 
226
            'x-amz-storage-class': storage_class
 
227
        }
 
228
        
 
229
        upload_start = time.time()
 
230
        self.upload(source_path.name, key, headers)
 
231
        upload_end = time.time()
 
232
        total_s = abs(upload_end-upload_start) or 1  # prevent a zero value!
 
233
        rough_upload_speed = os.path.getsize(source_path.name)/total_s
 
234
        log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
 
235
 
 
236
    def _get(self, remote_filename, local_path):
269
237
        key_name = self.key_prefix + remote_filename
270
238
        self.pre_process_download(remote_filename, wait=True)
271
239
        key = self._listed_keys[key_name]
272
 
        for n in range(1, globals.num_retries+1):
273
 
            if n > 1:
274
 
                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
275
 
                time.sleep(10)
276
 
            log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
277
 
            try:
278
 
                self.resetConnection()
279
 
                key.get_contents_to_filename(local_path.name)
280
 
                local_path.setdata()
281
 
                return
282
 
            except Exception as e:
283
 
                log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
284
 
                         "" % (self.straight_url,
285
 
                               remote_filename,
286
 
                               n,
287
 
                               e.__class__.__name__,
288
 
                               str(e)), 1)
289
 
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
290
 
 
291
 
        log.Warn("Giving up trying to download %s/%s after %d attempts" %
292
 
                (self.straight_url, remote_filename, globals.num_retries))
293
 
        raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
 
240
        self.resetConnection()
 
241
        key.get_contents_to_filename(local_path.name)
294
242
 
295
243
    def _list(self):
296
244
        if not self.bucket:
297
245
            raise BackendException("No connection to backend")
298
 
 
299
 
        for n in range(1, globals.num_retries+1):
300
 
            if n > 1:
301
 
                # sleep before retry
302
 
                time.sleep(30)
303
 
                self.resetConnection()
304
 
            log.Info("Listing %s" % self.straight_url)
305
 
            try:
306
 
                return self._list_filenames_in_bucket()
307
 
            except Exception as e:
308
 
                log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
309
 
                         "" % (self.straight_url,
310
 
                               n,
311
 
                               e.__class__.__name__,
312
 
                               str(e)), 1)
313
 
                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
314
 
        log.Warn("Giving up trying to list %s after %d attempts" %
315
 
                (self.straight_url, globals.num_retries))
316
 
        raise BackendException("Error listng %s" % self.straight_url)
 
246
        return self._list_filenames_in_bucket()
317
247
 
318
248
    def _list_filenames_in_bucket(self):
319
249
        # We add a 'd' to the prefix to make sure it is not null (for boto) and
336
266
                pass
337
267
        return filename_list
338
268
 
339
 
    def delete(self, filename_list):
340
 
        for filename in filename_list:
341
 
            self.bucket.delete_key(self.key_prefix + filename)
342
 
            log.Debug("Deleted %s/%s" % (self.straight_url, filename))
 
269
    def _delete(self, filename):
 
270
        self.bucket.delete_key(self.key_prefix + filename)
343
271
 
344
 
    @retry
345
 
    def _query_file_info(self, filename, raise_errors=False):
346
 
        try:
347
 
            key = self.bucket.lookup(self.key_prefix + filename)
348
 
            if key is None:
349
 
                return {'size': -1}
350
 
            return {'size': key.size}
351
 
        except Exception as e:
352
 
            log.Warn("Query %s/%s failed: %s"
353
 
                     "" % (self.straight_url,
354
 
                           filename,
355
 
                           str(e)))
356
 
            self.resetConnection()
357
 
            if raise_errors:
358
 
                raise e
359
 
            else:
360
 
                return {'size': None}
 
272
    def _query(self, filename):
 
273
        key = self.bucket.lookup(self.key_prefix + filename)
 
274
        if key is None:
 
275
            return {'size': -1}
 
276
        return {'size': key.size}
361
277
 
362
278
    def upload(self, filename, key, headers):
363
 
            key.set_contents_from_filename(filename, headers,
364
 
                                           cb=progress.report_transfer,
365
 
                                           num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
366
 
                                           )  # Max num of callbacks = 8 times x megabyte
367
 
            key.close()
 
279
        key.set_contents_from_filename(filename, headers,
 
280
                                       cb=progress.report_transfer,
 
281
                                       num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
 
282
                                       )  # Max num of callbacks = 8 times x megabyte
 
283
        key.close()
368
284
 
369
 
    def pre_process_download(self, files_to_download, wait=False):
 
285
    def pre_process_download(self, remote_filename, wait=False):
370
286
        # Used primarily to move files in Glacier to S3
371
 
        if isinstance(files_to_download, (bytes, str, unicode)):
372
 
            files_to_download = [files_to_download]
 
287
        key_name = self.key_prefix + remote_filename
 
288
        if not self._listed_keys.get(key_name, False):
 
289
            self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
 
290
        key = self._listed_keys[key_name]
373
291
 
374
 
        for remote_filename in files_to_download:
375
 
            success = False
376
 
            for n in range(1, globals.num_retries+1):
377
 
                if n > 1:
378
 
                    # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
379
 
                    time.sleep(10)
 
292
        if key.storage_class == "GLACIER":
 
293
            # We need to move the file out of glacier
 
294
            if not self.bucket.get_key(key.key).ongoing_restore:
 
295
                log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
 
296
                key.restore(days=1)  # Shouldn't need this again after 1 day
 
297
            if wait:
 
298
                log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
 
299
                while self.bucket.get_key(key.key).ongoing_restore:
 
300
                    time.sleep(60)
380
301
                    self.resetConnection()
381
 
                try:
382
 
                    key_name = self.key_prefix + remote_filename
383
 
                    if not self._listed_keys.get(key_name, False):
384
 
                        self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
385
 
                    key = self._listed_keys[key_name]
386
 
 
387
 
                    if key.storage_class == "GLACIER":
388
 
                        # We need to move the file out of glacier
389
 
                        if not self.bucket.get_key(key.key).ongoing_restore:
390
 
                            log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
391
 
                            key.restore(days=1)  # Shouldn't need this again after 1 day
392
 
                        if wait:
393
 
                            log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
394
 
                            while self.bucket.get_key(key.key).ongoing_restore:
395
 
                                time.sleep(60)
396
 
                                self.resetConnection()
397
 
                            log.Info("File %s was successfully restored from Glacier" % remote_filename)
398
 
                    success = True
399
 
                    break
400
 
                except Exception as e:
401
 
                    log.Warn("Restoration from Glacier for file %s/%s failed (attempt #%d, reason: %s: %s)"
402
 
                             "" % (self.straight_url,
403
 
                                   remote_filename,
404
 
                                   n,
405
 
                                   e.__class__.__name__,
406
 
                                   str(e)), 1)
407
 
                    log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
408
 
            if not success:
409
 
                log.Warn("Giving up trying to restore %s/%s after %d attempts" %
410
 
                        (self.straight_url, remote_filename, globals.num_retries))
411
 
                raise BackendException("Error restoring %s/%s from Glacier to S3" % (self.straight_url, remote_filename))
 
302
                log.Info("File %s was successfully restored from Glacier" % remote_filename)