2
# Ubuntu PractiTest Community results processor
3
# Copyright (C) 2015 Canonical
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
"""Script to allow uploading of data to a ubuntu-pt-community service."""
21
# For example if you're running the runserver.py script:
22
# python3 upload_results.py <path to json file> http://localhost:5000/v1/upload
29
from oauthlib import oauth1
32
class TwoFactorRequiredError(RuntimeError):
33
"""Second factor authentication required."""
36
class InvalidCredentialsError(RuntimeError):
37
"""Username or password incorrect for account."""
41
arg_parser = argparse.ArgumentParser()
42
arg_parser.add_argument('file', help='Path to the json file to upload.')
43
arg_parser.add_argument(
45
# Note this should be https
46
help='url to the service to upload to. Must be in the form "http://..'
48
arg_parser.add_argument(
50
help='Email address to use for logging in. If not provided you will be prompted for it.' # NOQA
52
return arg_parser.parse_args()
55
def attempt_ubuntu_sso_login(email_address, password, otp=None):
56
"""Obtain OAuth token that can be used to authenticate with transport
59
:param email_address: email address for user account.
60
:param password: password for user account
61
:param otp: one-time-password to use (if required, i.e. previous call
62
raised `TwoFactorRequiredError`.)
64
:raises TwoFactorRequired: when the one time password `otp` is required to
68
token_name = 'ubuntu-pt-community'
76
post_data['otp'] = otp
79
'Content-Type': 'application/json',
80
'Accept': 'application/json',
83
# sso_base_url = 'https://login.ubuntu.com/api/v2'
84
sso_base_url = 'https://login.staging.ubuntu.com/api/v2'
85
sso_url = '{}/tokens/oauth'.format(sso_base_url)
87
response = requests.post(
89
json.dumps(post_data),
93
response_json = response.json()
96
if response.status_code == 401:
97
if response_json.get('code') == 'TWOFACTOR_REQUIRED':
98
raise TwoFactorRequiredError()
99
elif response_json.get('code') == 'INVALID_CREDENTIALS':
100
raise InvalidCredentialsError()
101
raise ValueError(response.text)
106
def get_oauth_signed_request(url, oauth_credentials, method='POST'):
107
"""Return details for a signed request to `url` using oauth details in
110
:param url: Url for which the signed request will be for.
111
:param oauth_credentials: Dictionary containing OAuth credentials to use
112
for signing the request.
113
Must contain: consumer_key, consumer_secret, token_key, token_secret.
114
:param method: Method type that will be made to the `url` (i.e. POST, GET)
115
:returns: Dictionary containing details relevent for a signed header.
116
:raises KeyError: If lacking any of the required oauth_credential details.
119
client = oauth1.Client(
120
client_key=oauth_credentials['consumer_key'],
121
client_secret=oauth_credentials['consumer_secret'],
122
resource_owner_key=oauth_credentials['token_key'],
123
resource_owner_secret=oauth_credentials['token_secret'],
124
signature_method=oauth1.SIGNATURE_HMAC,
125
realm='Checkbox uploading',
128
uri, headers, body = client.sign(url, method)
132
def get_upload_details():
134
return (args.file, args.url, args.email)
137
if __name__ == '__main__':
138
file_path, url, email = get_upload_details()
140
email = input('Email address: ')
142
password = getpass.getpass('Password: ')
145
oauth_creds = attempt_ubuntu_sso_login(email, password)
146
except TwoFactorRequiredError:
147
otp = input('Your next 2fa code: ').strip()
148
oauth_creds = attempt_ubuntu_sso_login(email, password, otp=otp)
149
except InvalidCredentialsError:
150
print('ERROR: Invalid credentials')
152
headers = get_oauth_signed_request(url, oauth_creds)
154
print('Uploading file to service.')
155
with open(file_path, 'rb') as f:
156
upload_data = dict(data=f)
157
# Right, now we can post the details.
158
requests.post(url, files=upload_data, headers=headers)
159
print('Upload succeeded')