1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# Copyright 2015 Canonical Ltd. This software is licensed under the
# GNU General Public License version 3 (see the file LICENSE).
import json
import os
import time
from functools import wraps
import requests
from requests_oauthlib import OAuth1Session
from storeapi.compat import urljoin
from storeapi.constants import MYAPPS_API_ROOT_URL
def get_oauth_session(config):
"""Return a client configured to allow oauth signed requests."""
try:
session = OAuth1Session(
config['consumer_key'],
client_secret=config['consumer_secret'],
resource_owner_key=config['token_key'],
resource_owner_secret=config['token_secret'],
signature_method='PLAINTEXT',
)
except KeyError:
session = None
return session
def myapps_api_call(path, session=None, method='GET', data=None):
"""Issue a request for a particular endpoint of the MyApps API."""
result = {'success': False, 'errors': [], 'data': None}
if session is not None:
client = session
else:
client = requests
root_url = os.environ.get('MYAPPS_API_ROOT_URL', MYAPPS_API_ROOT_URL)
url = urljoin(root_url, path)
if method == 'GET':
response = client.get(url)
elif method == 'POST':
response = client.post(url, data=data and json.dumps(data) or None,
headers={'Content-Type': 'application/json'})
else:
raise ValueError('Method {} not supported'.format(method))
if response.ok:
result['success'] = True
result['data'] = response.json()
else:
result['errors'] = [response.text]
return result
def is_scan_completed(response):
"""Return True if the response indicates the scan process completed."""
if response.ok:
return response.json().get('completed', False)
return False
def retry(terminator=None, retries=3, delay=3, backoff=2, logger=None):
"""Decorate a function to automatically retry calling it on failure.
Arguments:
- terminator: this should be a callable that returns a boolean;
it is used to determine if the function call was successful
and the retry loop should be stopped
- retries: an integer specifying the maximum number of retries
- delay: initial number of seconds to wait for the first retry
- backoff: exponential factor to use to adapt the delay between
subsequent retries
- logger: logging.Logger instance to use for logging
The decorated function will return as soon as any of the following
conditions are met:
1. terminator evaluates function output as True
2. there are no more retries left
If the terminator callable is not provided, the function will be called
exactly once and will not be retried.
"""
def decorated(func):
if retries != int(retries) or retries < 0:
raise ValueError(
'retries value must be a positive integer or zero')
if delay < 0:
raise ValueError('delay value must be positive')
if backoff != int(backoff) or backoff < 1:
raise ValueError('backoff value must be a positive integer')
@wraps(func)
def wrapped(*args, **kwargs):
retries_left, current_delay = retries, delay
result = func(*args, **kwargs)
if terminator is not None:
while not terminator(result) and retries_left > 0:
msg = "... retrying in %d seconds" % current_delay
if logger:
logger.warning(msg)
# sleep
time.sleep(current_delay)
retries_left -= 1
current_delay *= backoff
# retry
result = func(*args, **kwargs)
return result, retries_left == 0
return wrapped
return decorated
|