1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
|
import getpass
import hmac
import json
import os
import pprint
import shlex
import sys
import time
from decimal import Decimal
from hashlib import sha256
from urlparse import urljoin
try:
import requests
import requests_oauthlib
except ImportError:
print('To run this script you need to have requests and requests_oauthlib '
'installed.')
sys.exit(1)
SSO_ROOT_URL = os.environ.get(
'SSO_ROOT_URL', 'https://login.staging.ubuntu.com')
SCA_ROOT_URL = os.environ.get(
'SCA_ROOT_URL', 'https://myapps.developer.staging.ubuntu.com')
def get_oauth_token_data(email, password, otp='', token_name='test'):
url = urljoin(SSO_ROOT_URL, '/api/v2/tokens/oauth')
data = {
'email': email,
'password': password,
'token_name': token_name,
}
if otp:
data['otp'] = otp
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
response = requests.post(url, data=json.dumps(data), headers=headers)
if not response.ok:
raise Exception(
'Could not get oauth token data. Response: %s' % response.content)
data = response.json()
return {
'client_key': data['consumer_key'],
'client_secret': data['consumer_secret'],
'resource_owner_key': data['token_key'],
'resource_owner_secret': data['token_secret'],
}
def get_openid(client):
return "%s/+id/%s" % (SSO_ROOT_URL, client.auth.client.client_key)
def make_client(authentication='oauth', **kwargs):
if authentication == 'oauth':
client = requests_oauthlib.OAuth1Session(**kwargs)
else:
client = requests.Session()
client.headers.update({
'Content-Type': 'application/json',
'Cache-Control': 'no-cache',
})
return client
def make_trusted_client(secret):
client = make_client(authentication='userproxy')
client.secret = secret
return client
def output(title, response):
print('\n{}'.format(title))
print('Response: {}'.format(response))
if response.status_code == 500:
response_content = response.content
oops_id = response_content.split('id="oops_id">')[1].split('</p>')[0]
sentry_id = response_content.split(
'id="sentry_id">')[1].split('</p>')[0]
content = [oops_id, sentry_id]
else:
try:
content = response.json()
except ValueError:
content = response.content
pprint.pprint(content)
print('')
def ask_for_package():
return raw_input('Enter the name of the package to query '
'(eg, demo35.ricardokirkner): ')
def ask_for_purchase_currency():
return raw_input('Enter a valid currency (eg, USD): ')
def ask_for_input(msg, default):
if not default:
msg += ": "
else:
msg += " or press ENTER to keep the current value: %s\n" % default
return raw_input(msg) or default
def ask_for_sku(default=''):
msg = "Enter the SKU (must be unique) of the item (eg, item-1-sku)"
return ask_for_input(msg, default)
def ask_for_title(default=''):
msg = "Enter the item's title (eg, Item 1 Title)"
return ask_for_input(msg, default)
def ask_for_description(default=''):
msg = "Enter the item's description (eg, Item 1 Description)"
return ask_for_input(msg, default)
def ask_for_type(default=''):
msg = "Enter the item's type (consumable or unlockable)"
return ask_for_input(msg, default)
def ask_for_prices(default=None):
if default is None:
prices = {
'USD': None,
'EUR': None,
'GBP': None,
}
else:
prices = default
currency_msg = (
"Enter a valid currency (USD, EUR, GBP) for the item's price or "
"press ENTER to continue: ")
while True:
currency = raw_input(currency_msg)
if not currency:
if not prices.get('USD'):
print('You must provide a price in USD.')
continue
# exit the loop
break
elif currency not in ('USD', 'EUR', 'GBP'):
print("Invalid currency. Must be one of: USD, EUR, GBP")
continue
amount = raw_input("Enter the amount in %s: " % currency)
try:
# validate amount is decimal
Decimal(amount)
except:
print('Invalid amount. Must be a valid decimal value.')
continue
prices[currency] = amount
return prices
def get_api_versions(client):
url = urljoin(SCA_ROOT_URL, '/inventory/api')
response = client.get(url)
output('API Versions', response)
return response.ok
def get_package_detail(client, package_name):
url = urljoin(SCA_ROOT_URL, '/inventory/api/v1/packages/' + package_name)
response = client.get(url)
output('Details for %s' % package_name, response)
return response.ok
def get_package_items(client, package_name):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/items' % package_name)
response = client.get(url)
output('Items for %s' % package_name, response)
return response.ok
def get_item_detail(client, package_name, item_id):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/items/%s' % (
package_name, item_id))
response = client.get(url)
output('Details for item %s' % item_id, response)
return response.ok
def get_package_purchases(client, package_name):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/purchases' % package_name)
response = client.get(url)
output('Purchases for %s' % package_name, response)
return response.ok
def get_purchase_detail(client, package_name, purchase_id):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/purchases/%s' % (
package_name, purchase_id))
response = client.get(url)
output('Details for purchase %s' % purchase_id, response)
return response.ok
def purchase_item(client, package_name, item_sku, currency,
device_id='iap.py'):
url = urljoin(SCA_ROOT_URL, '/api/2.0/click/purchases/')
data = {
'name': package_name,
'item_sku': item_sku,
'currency': currency,
}
response = client.post(
url, data=json.dumps(data), headers={'X-Device-Id': device_id})
output('Result of purchasing of item %s' % item_sku, response)
return response.ok
def acknowledge_purchase(client, package_name, item_sku,
device_id='iap.py'):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/items/by-sku/%s' % (
package_name, item_sku))
data = {
'state': 'acknowledged'
}
response = client.post(
url, data=json.dumps(data), headers={'X-Device-Id': device_id})
output('Result of acknowleding purchase of item %s' % item_sku, response)
return response.ok
def complete_purchase(client, package_name, purchase_id,
status, openid, secret):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/purchases/%s' % (
package_name, purchase_id))
trusted_client = make_trusted_client(secret)
headers = {
'Authorization': make_auth_header(secret)
}
data = {
'status': status,
'openid': openid,
}
response = trusted_client.post(url, data=json.dumps(data), headers=headers)
output('Result of completing purchase %s' % purchase_id, response)
return response.ok
def create_item(client, package_name, sku, title, description, type,
USD, EUR=None, GBP=None, secret=None):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/items' % package_name)
data = {
'sku': sku,
'title': title,
'description': description,
'type': type,
'prices': {
'USD': USD,
}
}
if EUR:
data['prices']['EUR'] = EUR
if GBP:
data['prices']['GBP'] = GBP
if secret is not None:
trusted_client = make_trusted_client(secret)
headers = {
'Authorization': make_auth_header(secret)
}
response = trusted_client.post(
url, data=json.dumps(data), headers=headers)
else:
response = client.post(url, data=json.dumps(data))
output('Result of creating item %s' % sku, response)
return response.ok
def update_item(client, package_name, item_id, sku, title, description, type,
USD, EUR=None, GBP=None, secret=None):
url = urljoin(
SCA_ROOT_URL, '/inventory/api/v1/packages/%s/items/%s' % (
package_name, item_id))
data = {
'sku': sku,
'title': title,
'description': description,
'type': type,
'prices': {
'USD': USD,
}
}
if EUR:
data['prices']['EUR'] = EUR
if GBP:
data['prices']['GBP'] = GBP
if secret is not None:
trusted_client = make_trusted_client(secret)
headers = {
'Authorization': make_auth_header(secret)
}
response = trusted_client.put(
url, data=json.dumps(data), headers=headers)
else:
response = client.put(url, data=json.dumps(data))
output('Result of updating item %s (id: %s)' % (sku, item_id), response)
return response.ok
def make_auth_header(secret):
timestamp = str(time.time())
signature = hmac.new(secret, timestamp, sha256).hexdigest()
return 'UserProxy Timestamp=%s Signature=%s' % (timestamp, signature)
def login():
credentials = load_credentials()
if credentials is None:
email = raw_input('Email: ')
password = getpass.getpass('Password: ')
otp = raw_input('OTP: ')
credentials = get_oauth_token_data(email, password, otp)
save_credentials(credentials)
return make_client(**credentials)
def load_credentials():
try:
with open('.credentials.json', 'r') as fp:
credentials = json.load(fp)
except IOError:
credentials = None
return credentials
def save_credentials(credentials):
with open('.credentials.json', 'w') as fp:
json.dump(credentials, fp)
def logout():
return make_client({'client_key': ''})
def clear_credentials():
if os.path.exists('.credentials.json'):
os.unlink('.credentials.json')
def tour_device_flow():
# authenticate
print('All requests against the inventory API need to be oauth signed.')
print('For this we first need to get an oauth token.')
print('The first time, you will be asked for email address, password and '
'a two-factor password; if your account does not require two-factor '
'authentication, simply leave it empty.')
raw_input('Press ENTER to continue.\n')
client = login()
# list items for package
print('Good, we have the credentials now, so we will retrieve the list '
'of items registered for a package.')
package = ask_for_package()
success = get_package_items(client, package)
if not success:
print('Unexpected scenario. Aborting tour.')
return
# list purchases for item
print('And also list any existing purchases requested for any item '
'in that package.')
raw_input('Press ENTER to continue.\n')
success = get_package_purchases(client, package)
if not success:
print('Unexpected scenario. Aborting tour.')
return
# purchase item
print('Next, we are going to request purchasing an item.')
sku = ask_for_sku()
currency = ask_for_purchase_currency()
success = purchase_item(client, package, sku, currency)
if not success:
print('Unexpected scenario. Aborting tour.')
return
# acknowledge purchase
print('In order to be able to request another purchase for this item, '
'we need to acknowledge its use by the user.')
raw_input('Press ENTER to continue.\n')
success = acknowledge_purchase(client, package, sku)
if not success:
print('Unexpected scenario. Aborting tour.')
return
# list purchases for item
print('If we now retrieve the list of items again, the item will be '
'displayed as available once more.')
raw_input('Press ENTER to continue.\n')
success = get_package_purchases(client, package)
if not success:
print('Unexpected scenario. Aborting tour.')
return
print("Congratulations. Tour is now complete.")
def tour_developer_flow():
# authenticate
print('All requests against the inventory API need to be oauth signed.')
print('For this we first need to get an oauth token.')
print('The first time, you will be asked for email address, password and '
'a two-factor password; if your account does not require two-factor '
'authentication, simply leave it empty.')
raw_input('Press ENTER to continue.\n')
client = login()
# list items for package
print('Good, we have the credentials now, so we will retrieve the list '
'of items registered for a package.')
package = ask_for_package()
success = get_package_items(client, package)
if not success:
print('Unexpected scenario. Aborting tour.')
return
# create item
print('Next, we are going to create a new item for this package.')
sku = ask_for_sku()
title = ask_for_title()
description = ask_for_description()
type = ask_for_type()
prices = ask_for_prices()
success = create_item(client, package, sku, title, description, type,
**prices)
if not success:
print('Unexpected scenario. Aborting tour.')
return
print("Let's request the items once more to confirm the item is now "
"available: ")
raw_input('Press ENTER to continue.\n')
success = get_package_items(client, package)
if not success:
print('Unexpected scenario. Aborting tour.')
return
# edit item
print("Let's now change one item: ")
id = raw_input("Enter the 'id' of the item to modify (eg, 1): ")
sku = ask_for_sku(default=sku)
title = ask_for_title(default=title)
description = ask_for_description(default=description)
type = ask_for_type(default=type)
prices = ask_for_prices(default=prices)
success = update_item(client, package, id, sku, title, description, type,
**prices)
if not success:
print('Unexpected scenario. Aborting tour.')
return
print("Let's request the items once more to confirm the item is now "
"available: ")
raw_input('Press ENTER to continue.\n')
success = get_package_items(client, package)
if not success:
print('Unexpected scenario. Aborting tour.')
return
print("Congratulations. Tour is now complete.")
def main():
client = make_client(authentication=None)
# REPL
exit = False
while not exit:
parts = shlex.split(raw_input('>>> '))
if not parts:
continue
cmd = parts[0]
if len(parts) > 1:
args = parts[1:]
else:
args = []
if cmd in ('exit', 'quit'):
exit = True
continue
elif cmd == 'help':
print('Available commands:')
print('\thelp -- display this help text')
print('\texit -- quit')
print('\tquit -- quit')
print('\tlogin -- establish an authenticated session')
print('\tlogout -- clear authentication')
print('\ttour_device_flow -- walkthrough of the common use cases '
'expected to be performed from devices.')
print('\ttour_developer_flow -- walkthrough of the common use '
'cases allowed to be performed by developers.')
print('\nPublic API operations (unauthenticated):')
print('\tget_api_versions -- list all api versions')
print('\nPublic API operations (authenticated):')
print('\tget_package_detail package_name -- retrieve details '
'about package')
print('\tget_package_items package_name -- retrieve list of items '
'for package')
print('\tget_item_detail package_name item_id -- retrieve details '
'of item')
print('\tget_package_purchases package_name -- retrieve list of '
'item purchases for package')
print('\tget_purchase_detail package_name purchase_id -- retrieve '
'details of purchase')
print('\tpurchase_item package_name item_sku currency '
'[device_id] -- request purchase of an item')
print('\tacknowledge_purchase package_name item_sku [device_id]'
' -- acknowledge purchase of an item on a device')
print('\nDeveloper restricted API operations:')
print('\tcreate_item package_name sku title description type '
'USD_amount [EUR_amount] [GBP_amount] -- create item')
print('\tupdate_item package_name item_id sku title description '
'type USD_amount [EUR_amount] [GBP_amount] -- update item')
print('\nPrivate API operations (require a trusted client):')
print('\tcomplete_purchase package_name purchase_id status openid '
'secret -- mark purchase as completed')
print('\tcreate_item package_name sku title description type '
'USD_amount [EUR_amount] [GBP_amount] secret -- create item')
print('\tupdate_item package_name item_id sku title description '
'type USD_amount [EUR_amount] [GBP_amount] secret'
' -- update item')
continue
handler = globals().get(cmd, None)
if handler is None or not callable(handler):
print('Invalid command: %s' % cmd)
continue
if cmd in ('login', 'logout'):
client = handler()
elif cmd in ('clear_credentials', 'tour_device_flow',
'tour_developer_flow'):
handler()
elif cmd in ('get_api_versions',):
handler(client)
else:
if not args:
print('Arguments needed for command: %s' % cmd)
continue
handler(client, *args)
if __name__ == '__main__':
import readline
import rlcompleter # noqa
readline.parse_and_bind('tab: complete')
main()
|