~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to test/unit/common/middleware/test_formpost.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2013-08-13 10:37:13 UTC
  • mfrom: (1.2.21)
  • Revision ID: package-import@ubuntu.com-20130813103713-1ctbx4zifyljs2aq
Tags: 1.9.1-0ubuntu1
[ James Page ]
* d/control: Update VCS fields for new branch locations.

[ Chuck Short ]
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
import hmac
17
17
import unittest
18
18
from hashlib import sha1
19
 
from contextlib import contextmanager
20
19
from StringIO import StringIO
21
20
from time import time
22
21
 
23
22
from swift.common.swob import Request, Response
24
23
from swift.common.middleware import tempauth, formpost
25
 
 
26
 
 
27
 
class FakeMemcache(object):
28
 
 
29
 
    def __init__(self):
30
 
        self.store = {}
31
 
 
32
 
    def get(self, key):
33
 
        return self.store.get(key)
34
 
 
35
 
    def set(self, key, value, time=0):
36
 
        self.store[key] = value
37
 
        return True
38
 
 
39
 
    def incr(self, key, timeout=0):
40
 
        self.store[key] = self.store.setdefault(key, 0) + 1
41
 
        return self.store[key]
42
 
 
43
 
    @contextmanager
44
 
    def soft_lock(self, key, timeout=0, retries=5):
45
 
        yield True
46
 
 
47
 
    def delete(self, key):
48
 
        try:
49
 
            del self.store[key]
50
 
        except Exception:
51
 
            pass
52
 
        return True
 
24
from swift.common.utils import split_path
53
25
 
54
26
 
55
27
class FakeApp(object):
298
270
        self.auth = tempauth.filter_factory({})(self.app)
299
271
        self.formpost = formpost.filter_factory({})(self.auth)
300
272
 
301
 
    def _make_request(self, path, **kwargs):
 
273
    def _make_request(self, path, tempurl_keys=(), **kwargs):
302
274
        req = Request.blank(path, **kwargs)
303
 
        req.environ['swift.cache'] = FakeMemcache()
 
275
 
 
276
        # Fake out the caching layer so that get_account_info() finds its
 
277
        # data. Include something that isn't tempurl keys to prove we skip it.
 
278
        meta = {'user-job-title': 'Personal Trainer',
 
279
                'user-real-name': 'Jim Shortz'}
 
280
        for idx, key in enumerate(tempurl_keys):
 
281
            meta_name = 'temp-url-key' + (("-%d" % (idx + 1) if idx else ""))
 
282
            if key:
 
283
                meta[meta_name] = key
 
284
 
 
285
        _junk, account, _junk, _junk = split_path(path, 2, 4)
 
286
        req.environ['swift.account/' + account] = self._fake_cache_env(
 
287
            account, tempurl_keys)
304
288
        return req
305
289
 
 
290
    def _fake_cache_env(self, account, tempurl_keys=()):
 
291
        # Fake out the caching layer so that get_account_info() finds its
 
292
        # data. Include something that isn't tempurl keys to prove we skip it.
 
293
        meta = {'user-job-title': 'Personal Trainer',
 
294
                'user-real-name': 'Jim Shortz'}
 
295
        for idx, key in enumerate(tempurl_keys):
 
296
            meta_name = 'temp-url-key' + ("-%d" % (idx + 1) if idx else "")
 
297
            if key:
 
298
                meta[meta_name] = key
 
299
 
 
300
        return {'status': 204,
 
301
                'container_count': '0',
 
302
                'total_object_count': '0',
 
303
                'bytes': '0',
 
304
                'meta': meta}
 
305
 
306
306
    def _make_sig_env_body(self, path, redirect, max_file_size, max_file_count,
307
307
                           expires, key, user_agent=True):
308
308
        sig = hmac.new(
404
404
            '%s\n%s\n%s\n%s\n%s' % (
405
405
                path, redirect, max_file_size, max_file_count, expires),
406
406
            sha1).hexdigest()
407
 
        memcache = FakeMemcache()
408
 
        memcache.set('temp-url-key/AUTH_test', key)
409
407
        wsgi_input = StringIO('\r\n'.join([
410
408
            '------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
411
409
            'Content-Disposition: form-data; name="redirect"',
468
466
            'SERVER_NAME': '172.16.83.128',
469
467
            'SERVER_PORT': '8080',
470
468
            'SERVER_PROTOCOL': 'HTTP/1.0',
471
 
            'swift.cache': memcache,
 
469
            'swift.account/AUTH_test': self._fake_cache_env('AUTH_test', [key]),
472
470
            'wsgi.errors': wsgi_errors,
473
471
            'wsgi.input': wsgi_input,
474
472
            'wsgi.multiprocess': False,
518
516
            '%s\n%s\n%s\n%s\n%s' % (
519
517
                path, redirect, max_file_size, max_file_count, expires),
520
518
            sha1).hexdigest()
521
 
        memcache = FakeMemcache()
522
 
        memcache.set('temp-url-key/AUTH_test', key)
523
519
        wsgi_input = StringIO('\r\n'.join([
524
520
            '-----------------------------168072824752491622650073',
525
521
            'Content-Disposition: form-data; name="redirect"',
581
577
            'SERVER_NAME': '172.16.83.128',
582
578
            'SERVER_PORT': '8080',
583
579
            'SERVER_PROTOCOL': 'HTTP/1.0',
584
 
            'swift.cache': memcache,
 
580
            'swift.account/AUTH_test': self._fake_cache_env('AUTH_test', [key]),
585
581
            'wsgi.errors': wsgi_errors,
586
582
            'wsgi.input': wsgi_input,
587
583
            'wsgi.multiprocess': False,
631
627
            '%s\n%s\n%s\n%s\n%s' % (
632
628
                path, redirect, max_file_size, max_file_count, expires),
633
629
            sha1).hexdigest()
634
 
        memcache = FakeMemcache()
635
 
        memcache.set('temp-url-key/AUTH_test', key)
636
630
        wsgi_input = StringIO('\r\n'.join([
637
631
            '------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
638
632
            'Content-Disposition: form-data; name="redirect"',
697
691
            'SERVER_NAME': '172.16.83.128',
698
692
            'SERVER_PORT': '8080',
699
693
            'SERVER_PROTOCOL': 'HTTP/1.0',
700
 
            'swift.cache': memcache,
 
694
            'swift.account/AUTH_test': self._fake_cache_env('AUTH_test', [key]),
701
695
            'wsgi.errors': wsgi_errors,
702
696
            'wsgi.input': wsgi_input,
703
697
            'wsgi.multiprocess': False,
747
741
            '%s\n%s\n%s\n%s\n%s' % (
748
742
                path, redirect, max_file_size, max_file_count, expires),
749
743
            sha1).hexdigest()
750
 
        memcache = FakeMemcache()
751
 
        memcache.set('temp-url-key/AUTH_test', key)
752
744
        wsgi_input = StringIO('\r\n'.join([
753
745
            '-----------------------------7db20d93017c',
754
746
            'Content-Disposition: form-data; name="redirect"',
809
801
            'SERVER_NAME': '172.16.83.128',
810
802
            'SERVER_PORT': '8080',
811
803
            'SERVER_PROTOCOL': 'HTTP/1.0',
812
 
            'swift.cache': memcache,
 
804
            'swift.account/AUTH_test': self._fake_cache_env('AUTH_test', [key]),
813
805
            'wsgi.errors': wsgi_errors,
814
806
            'wsgi.input': wsgi_input,
815
807
            'wsgi.multiprocess': False,
853
845
            '/v1/AUTH_test/container', 'http://brim.net', 5, 10,
854
846
            int(time() + 86400), key)
855
847
        env['wsgi.input'] = StringIO('XX' + '\r\n'.join(body))
856
 
        env['swift.cache'] = FakeMemcache()
857
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
848
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
849
            'AUTH_test', [key])
858
850
        self.app = FakeApp(iter([('201 Created', {}, ''),
859
851
                                 ('201 Created', {}, '')]))
860
852
        self.auth = tempauth.filter_factory({})(self.app)
888
880
            '/v1/AUTH_test/container', 'http://brim.net', 5, 10,
889
881
            int(time() + 86400), key)
890
882
        env['wsgi.input'] = StringIO('\r\n'.join(body))
891
 
        env['swift.cache'] = FakeMemcache()
892
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
883
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
884
            'AUTH_test', [key])
893
885
        self.app = FakeApp(iter([('201 Created', {}, ''),
894
886
                                 ('201 Created', {}, '')]))
895
887
        self.auth = tempauth.filter_factory({})(self.app)
918
910
            '/v1/AUTH_test/container', 'http://brim.net', 1024, 1,
919
911
            int(time() + 86400), key)
920
912
        env['wsgi.input'] = StringIO('\r\n'.join(body))
921
 
        env['swift.cache'] = FakeMemcache()
922
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
913
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
914
            'AUTH_test', [key])
923
915
        self.app = FakeApp(iter([('201 Created', {}, ''),
924
916
                                 ('201 Created', {}, '')]))
925
917
        self.auth = tempauth.filter_factory({})(self.app)
958
950
            '/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
959
951
        env['QUERY_STRING'] = 'this=should&not=get&passed'
960
952
        env['wsgi.input'] = StringIO('\r\n'.join(body))
961
 
        env['swift.cache'] = FakeMemcache()
962
 
        # We don't cache the key so that it's asked for (and FakeApp verifies
963
 
        # that no QUERY_STRING got passed).
 
953
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
954
            'AUTH_test', [key])
964
955
        self.app = FakeApp(
965
 
            iter([('200 Ok', {'x-account-meta-temp-url-key': 'abc'}, ''),
966
 
                  ('201 Created', {}, ''),
 
956
            iter([('201 Created', {}, ''),
967
957
                  ('201 Created', {}, '')]),
968
958
            check_no_query_string=True)
969
959
        self.auth = tempauth.filter_factory({})(self.app)
982
972
        headers = headers[0]
983
973
        exc_info = exc_info[0]
984
974
        # Make sure we 201 Created, which means we made the final subrequest
985
 
        # (and FakeAp verifies that no QUERY_STRING got passed).
 
975
        # (and FakeApp verifies that no QUERY_STRING got passed).
986
976
        self.assertEquals(status, '201 Created')
987
977
        self.assertEquals(exc_info, None)
988
978
        self.assertTrue('201 Created' in body)
989
 
        self.assertEquals(len(self.app.requests), 3)
 
979
        self.assertEquals(len(self.app.requests), 2)
990
980
 
991
981
    def test_subrequest_fails(self):
992
982
        key = 'abc'
994
984
            '/v1/AUTH_test/container', 'http://brim.net', 1024, 10,
995
985
            int(time() + 86400), key)
996
986
        env['wsgi.input'] = StringIO('\r\n'.join(body))
997
 
        env['swift.cache'] = FakeMemcache()
998
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
987
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
988
            'AUTH_test', [key])
999
989
        self.app = FakeApp(iter([('404 Not Found', {}, ''),
1000
990
                                 ('201 Created', {}, '')]))
1001
991
        self.auth = tempauth.filter_factory({})(self.app)
1076
1066
            '------WebKitFormBoundaryNcxTqxSlX7t4TDkR--',
1077
1067
            '',
1078
1068
        ]))
1079
 
        env['swift.cache'] = FakeMemcache()
1080
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1069
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1070
            'AUTH_test', [key])
1081
1071
        self.app = FakeApp(iter([('201 Created', {}, ''),
1082
1072
                                 ('201 Created', {}, '')]))
1083
1073
        self.auth = tempauth.filter_factory({})(self.app)
1143
1133
            '------WebKitFormBoundaryNcxTqxSlX7t4TDkR--',
1144
1134
            '',
1145
1135
        ]))
1146
 
        env['swift.cache'] = FakeMemcache()
1147
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1136
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1137
            'AUTH_test', [key])
1148
1138
        self.app = FakeApp(iter([('201 Created', {}, ''),
1149
1139
                                 ('201 Created', {}, '')]))
1150
1140
        self.auth = tempauth.filter_factory({})(self.app)
1182
1172
            '/v1/AUTH_test/container', 'http://redirect', 1024, 10,
1183
1173
            int(time() + 86400), key, user_agent=False)
1184
1174
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1185
 
        env['swift.cache'] = FakeMemcache()
1186
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1175
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1176
            'AUTH_test', [key])
1187
1177
        self.app = FakeApp(iter([('201 Created', {}, ''),
1188
1178
                                 ('201 Created', {}, '')]))
1189
1179
        self.auth = tempauth.filter_factory({})(self.app)
1196
1186
        self.assertEquals(self.app.requests[0].headers['User-Agent'],
1197
1187
                          'FormPost')
1198
1188
 
 
1189
    def test_formpost_with_multiple_keys(self):
 
1190
        key = 'ernie'
 
1191
        sig, env, body = self._make_sig_env_body(
 
1192
            '/v1/AUTH_test/container', 'http://redirect', 1024, 10,
 
1193
            int(time() + 86400), key)
 
1194
        env['wsgi.input'] = StringIO('\r\n'.join(body))
 
1195
        # Stick it in X-Account-Meta-Temp-URL-Key-2 and make sure we get it
 
1196
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1197
            'AUTH_test', ['bert', key])
 
1198
        self.app = FakeApp(iter([('201 Created', {}, ''),
 
1199
                                 ('201 Created', {}, '')]))
 
1200
        self.auth = tempauth.filter_factory({})(self.app)
 
1201
        self.formpost = formpost.filter_factory({})(self.auth)
 
1202
 
 
1203
        status = [None]
 
1204
        headers = [None]
 
1205
 
 
1206
        def start_response(s, h, e=None):
 
1207
            status[0] = s
 
1208
            headers[0] = h
 
1209
        body = ''.join(self.formpost(env, start_response))
 
1210
        print repr(headers)
 
1211
        self.assertEqual('303 See Other', status[0])
 
1212
        self.assertEqual(
 
1213
            'http://redirect?status=201&message=',
 
1214
            dict(headers[0]).get('Location'))
 
1215
 
1199
1216
    def test_redirect(self):
1200
1217
        key = 'abc'
1201
1218
        sig, env, body = self._make_sig_env_body(
1202
1219
            '/v1/AUTH_test/container', 'http://redirect', 1024, 10,
1203
1220
            int(time() + 86400), key)
1204
1221
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1205
 
        env['swift.cache'] = FakeMemcache()
1206
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1222
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1223
            'AUTH_test', [key])
1207
1224
        self.app = FakeApp(iter([('201 Created', {}, ''),
1208
1225
                                 ('201 Created', {}, '')]))
1209
1226
        self.auth = tempauth.filter_factory({})(self.app)
1239
1256
            '/v1/AUTH_test/container', 'http://redirect?one=two', 1024, 10,
1240
1257
            int(time() + 86400), key)
1241
1258
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1242
 
        env['swift.cache'] = FakeMemcache()
1243
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1259
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1260
            'AUTH_test', [key])
1244
1261
        self.app = FakeApp(iter([('201 Created', {}, ''),
1245
1262
                                 ('201 Created', {}, '')]))
1246
1263
        self.auth = tempauth.filter_factory({})(self.app)
1276
1293
        sig, env, body = self._make_sig_env_body(
1277
1294
            '/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
1278
1295
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1279
 
        env['swift.cache'] = FakeMemcache()
1280
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1296
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1297
            'AUTH_test', [key])
1281
1298
        self.app = FakeApp(iter([('201 Created', {}, ''),
1282
1299
                                 ('201 Created', {}, '')]))
1283
1300
        self.auth = tempauth.filter_factory({})(self.app)
1312
1329
        sig, env, body = self._make_sig_env_body(
1313
1330
            '/v1/AUTH_test/container', '', 1024, 10, int(time() - 10), key)
1314
1331
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1315
 
        env['swift.cache'] = FakeMemcache()
1316
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1332
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1333
            'AUTH_test', [key])
1317
1334
        self.app = FakeApp(iter([('201 Created', {}, ''),
1318
1335
                                 ('201 Created', {}, '')]))
1319
1336
        self.auth = tempauth.filter_factory({})(self.app)
1345
1362
        sig, env, body = self._make_sig_env_body(
1346
1363
            '/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
1347
1364
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1348
 
        env['swift.cache'] = FakeMemcache()
1349
1365
        # Change key to invalidate sig
1350
 
        key = 'def'
1351
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1366
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1367
            'AUTH_test', [key + ' is bogus now'])
1352
1368
        self.app = FakeApp(iter([('201 Created', {}, ''),
1353
1369
                                 ('201 Created', {}, '')]))
1354
1370
        self.auth = tempauth.filter_factory({})(self.app)
1380
1396
        sig, env, body = self._make_sig_env_body(
1381
1397
            '/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
1382
1398
        env['wsgi.input'] = StringIO('XX' + '\r\n'.join(body))
1383
 
        env['swift.cache'] = FakeMemcache()
1384
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1399
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1400
            'AUTH_test', [key])
1385
1401
        self.app = FakeApp(iter([('201 Created', {}, ''),
1386
1402
                                 ('201 Created', {}, '')]))
1387
1403
        self.auth = tempauth.filter_factory({})(self.app)
1413
1429
        sig, env, body = self._make_sig_env_body(
1414
1430
            '/v2/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
1415
1431
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1416
 
        env['swift.cache'] = FakeMemcache()
1417
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1432
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1433
            'AUTH_test', [key])
1418
1434
        self.app = FakeApp(iter([('201 Created', {}, ''),
1419
1435
                                 ('201 Created', {}, '')]))
1420
1436
        self.auth = tempauth.filter_factory({})(self.app)
1446
1462
        sig, env, body = self._make_sig_env_body(
1447
1463
            '//AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
1448
1464
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1449
 
        env['swift.cache'] = FakeMemcache()
1450
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1465
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1466
            'AUTH_test', [key])
1451
1467
        self.app = FakeApp(iter([('201 Created', {}, ''),
1452
1468
                                 ('201 Created', {}, '')]))
1453
1469
        self.auth = tempauth.filter_factory({})(self.app)
1479
1495
        sig, env, body = self._make_sig_env_body(
1480
1496
            '/v1//container', '', 1024, 10, int(time() + 86400), key)
1481
1497
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1482
 
        env['swift.cache'] = FakeMemcache()
1483
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1498
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1499
            'AUTH_test', [key])
1484
1500
        self.app = FakeApp(iter([('201 Created', {}, ''),
1485
1501
                                 ('201 Created', {}, '')]))
1486
1502
        self.auth = tempauth.filter_factory({})(self.app)
1512
1528
        sig, env, body = self._make_sig_env_body(
1513
1529
            '/v1/AUTH_tst/container', '', 1024, 10, int(time() + 86400), key)
1514
1530
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1515
 
        env['swift.cache'] = FakeMemcache()
1516
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1531
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1532
            'AUTH_test', [key])
1517
1533
        self.app = FakeApp(iter([
1518
1534
            ('200 Ok', {'x-account-meta-temp-url-key': 'def'}, ''),
1519
1535
            ('201 Created', {}, ''),
1547
1563
        sig, env, body = self._make_sig_env_body(
1548
1564
            '/v1/AUTH_test', '', 1024, 10, int(time() + 86400), key)
1549
1565
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1550
 
        env['swift.cache'] = FakeMemcache()
1551
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1566
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1567
            'AUTH_test', [key])
1552
1568
        self.app = FakeApp(iter([('201 Created', {}, ''),
1553
1569
                                 ('201 Created', {}, '')]))
1554
1570
        self.auth = tempauth.filter_factory({})(self.app)
1585
1601
                body[i] = 'badvalue'
1586
1602
                break
1587
1603
        env['wsgi.input'] = StringIO('\r\n'.join(body))
1588
 
        env['swift.cache'] = FakeMemcache()
1589
 
        env['swift.cache'].set('temp-url-key/AUTH_test', key)
 
1604
        env['swift.account/AUTH_test'] = self._fake_cache_env(
 
1605
            'AUTH_test', [key])
1590
1606
        self.app = FakeApp(iter([('201 Created', {}, ''),
1591
1607
                                 ('201 Created', {}, '')]))
1592
1608
        self.auth = tempauth.filter_factory({})(self.app)