~ubuntu-testcase/ubuntu-community-testing/add_and_package_script

« back to all changes in this revision

Viewing changes to tools/upload_results.py

  • Committer: Christopher Lee
  • Date: 2015-08-14 04:45:18 UTC
  • mto: (18.3.1 original)
  • mto: This revision was merged to the branch mainline in revision 24.
  • Revision ID: chris.lee@canonical.com-20150814044518-6w17au29vry13y2d
Add uploading script and enable auth.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Ubuntu PractiTest Community results processor
 
3
# Copyright (C) 2015 Canonical
 
4
#
 
5
# This program is free software: you can redistribute it and/or modify
 
6
# it under the terms of the GNU General Public License as published by
 
7
# the Free Software Foundation, either version 3 of the License, or
 
8
# (at your option) any later version.
 
9
#
 
10
# This program is distributed in the hope that it will be useful,
 
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
# GNU General Public License for more details.
 
14
#
 
15
# You should have received a copy of the GNU General Public License
 
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
17
#
 
18
 
 
19
"""Script to allow uploading of data to a ubuntu-pt-community service."""
 
20
 
 
21
# For example if you're running the runserver.py script:
 
22
# python3 upload_results.py <path to json file> http://localhost:5000/v1/upload
 
23
 
 
24
import argparse
 
25
import json
 
26
import requests
 
27
import getpass
 
28
 
 
29
from oauthlib import oauth1
 
30
 
 
31
 
 
32
class TwoFactorRequiredError(RuntimeError):
 
33
    """Second factor authentication required."""
 
34
 
 
35
 
 
36
class InvalidCredentialsError(RuntimeError):
 
37
    """Username or password incorrect for account."""
 
38
 
 
39
 
 
40
def parse_args():
 
41
    arg_parser = argparse.ArgumentParser()
 
42
    arg_parser.add_argument('file', help='Path to the json file to upload.')
 
43
    arg_parser.add_argument(
 
44
        'url',
 
45
        # Note this should be https
 
46
        help='url to the service to upload to. Must be in the form "http://..'
 
47
    )
 
48
    arg_parser.add_argument(
 
49
        '--email',
 
50
        help='Email address to use for logging in. If not provided you will be prompted for it.'  # NOQA
 
51
    )
 
52
    return arg_parser.parse_args()
 
53
 
 
54
 
 
55
def attempt_ubuntu_sso_login(email_address, password, otp=None):
 
56
    """Obtain OAuth token that can be used to authenticate with transport
 
57
    end-points.
 
58
 
 
59
    :param email_address: email address for user account.
 
60
    :param password: password for user account
 
61
    :param otp: one-time-password to use (if required, i.e. previous call
 
62
      raised `TwoFactorRequiredError`.)
 
63
 
 
64
    :raises TwoFactorRequired: when the one time password `otp` is required to
 
65
      log in.
 
66
 
 
67
    """
 
68
    token_name = 'ubuntu-pt-community'
 
69
    post_data = dict(
 
70
        email=email_address,
 
71
        password=password,
 
72
        token_name=token_name
 
73
    )
 
74
 
 
75
    if otp is not None:
 
76
        post_data['otp'] = otp
 
77
 
 
78
    json_headers = {
 
79
        'Content-Type': 'application/json',
 
80
        'Accept': 'application/json',
 
81
    }
 
82
 
 
83
    # sso_base_url = 'https://login.ubuntu.com/api/v2'
 
84
    sso_base_url = 'https://login.staging.ubuntu.com/api/v2'
 
85
    sso_url = '{}/tokens/oauth'.format(sso_base_url)
 
86
 
 
87
    response = requests.post(
 
88
        sso_url,
 
89
        json.dumps(post_data),
 
90
        headers=json_headers
 
91
    )
 
92
 
 
93
    response_json = response.json()
 
94
 
 
95
    if not response.ok:
 
96
        if response.status_code == 401:
 
97
            if response_json.get('code') == 'TWOFACTOR_REQUIRED':
 
98
                raise TwoFactorRequiredError()
 
99
            elif response_json.get('code') == 'INVALID_CREDENTIALS':
 
100
                raise InvalidCredentialsError()
 
101
        raise ValueError(response.text)
 
102
 
 
103
    return response_json
 
104
 
 
105
 
 
106
def get_oauth_signed_request(url, oauth_credentials, method='POST'):
 
107
    """Return details for a signed request to `url` using oauth details in
 
108
    `oauth_credentials`.
 
109
 
 
110
    :param url: Url for which the signed request will be for.
 
111
    :param oauth_credentials: Dictionary containing OAuth credentials to use
 
112
      for signing the request.
 
113
      Must contain: consumer_key, consumer_secret, token_key, token_secret.
 
114
    :param method: Method type that will be made to the `url` (i.e. POST, GET)
 
115
    :returns: Dictionary containing details relevent for a signed header.
 
116
    :raises KeyError: If lacking any of the required oauth_credential details.
 
117
 
 
118
    """
 
119
    client = oauth1.Client(
 
120
        client_key=oauth_credentials['consumer_key'],
 
121
        client_secret=oauth_credentials['consumer_secret'],
 
122
        resource_owner_key=oauth_credentials['token_key'],
 
123
        resource_owner_secret=oauth_credentials['token_secret'],
 
124
        signature_method=oauth1.SIGNATURE_HMAC,
 
125
        realm='Checkbox uploading',
 
126
    )
 
127
 
 
128
    uri, headers, body = client.sign(url, method)
 
129
    return headers
 
130
 
 
131
 
 
132
def get_upload_details():
 
133
    args = parse_args()
 
134
    return (args.file, args.url, args.email)
 
135
 
 
136
 
 
137
if __name__ == '__main__':
 
138
    file_path, url, email = get_upload_details()
 
139
    if email is None:
 
140
        email = input('Email address: ')
 
141
 
 
142
    password = getpass.getpass('Password: ')
 
143
 
 
144
    try:
 
145
        oauth_creds = attempt_ubuntu_sso_login(email, password)
 
146
    except TwoFactorRequiredError:
 
147
        otp = input('Your next 2fa code: ').strip()
 
148
        oauth_creds = attempt_ubuntu_sso_login(email, password, otp=otp)
 
149
    except InvalidCredentialsError:
 
150
        print('ERROR: Invalid credentials')
 
151
 
 
152
    headers = get_oauth_signed_request(url, oauth_creds)
 
153
 
 
154
    print('Uploading file to service.')
 
155
    with open(file_path, 'rb') as f:
 
156
        upload_data = dict(data=f)
 
157
        # Right, now we can post the details.
 
158
        requests.post(url, files=upload_data, headers=headers)
 
159
        print('Upload succeeded')