~ubuntu-branches/ubuntu/oneiric/nova/oneiric-updates

« back to all changes in this revision

Viewing changes to .pc/security-fix-tenant-bypass.patch/nova/tests/api/openstack/contrib/test_createserverext.py

  • Committer: Package Import Robot
  • Author(s): Jamie Strandboge
  • Date: 2012-01-10 10:26:57 UTC
  • Revision ID: package-import@ubuntu.com-20120110102657-70jhfj9zd1feekq7
Tags: 2011.3-0ubuntu6.4
* SECURITY UPDATE: fix tenant bypass by authenticated users via OpenStack
  API (LP: #904072)
  - CVE-2012-0030

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2010-2011 OpenStack LLC.
 
4
# All Rights Reserved.
 
5
#
 
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
#    not use this file except in compliance with the License. You may obtain
 
8
#    a copy of the License at
 
9
#
 
10
#         http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
#    Unless required by applicable law or agreed to in writing, software
 
13
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
#    License for the specific language governing permissions and limitations
 
16
#    under the License.
 
17
 
 
18
import base64
 
19
import datetime
 
20
import json
 
21
import unittest
 
22
from xml.dom import minidom
 
23
 
 
24
import stubout
 
25
import webob
 
26
 
 
27
from nova import db
 
28
from nova import exception
 
29
from nova import flags
 
30
from nova import test
 
31
import nova.api.openstack
 
32
from nova.tests.api.openstack import fakes
 
33
 
 
34
 
 
35
FLAGS = flags.FLAGS
 
36
FLAGS.verbose = True
 
37
 
 
38
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
 
39
 
 
40
FAKE_NETWORKS = [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12'),
 
41
                 ('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', '10.0.2.12')]
 
42
 
 
43
DUPLICATE_NETWORKS = [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12'),
 
44
                      ('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12')]
 
45
 
 
46
INVALID_NETWORKS = [('invalid', 'invalid-ip-address')]
 
47
 
 
48
INSTANCE = {
 
49
             "id": 1,
 
50
             "display_name": "test_server",
 
51
             "uuid": FAKE_UUID,
 
52
             "created_at": datetime.datetime(2010, 10, 10, 12, 0, 0),
 
53
             "updated_at": datetime.datetime(2010, 11, 11, 11, 0, 0),
 
54
             "security_groups": [{"id": 1, "name": "test"}]
 
55
        }
 
56
 
 
57
 
 
58
def return_server_by_id(context, id, session=None):
 
59
    INSTANCE['id'] = id
 
60
    return INSTANCE
 
61
 
 
62
 
 
63
def return_security_group_non_existing(context, project_id, group_name):
 
64
    raise exception.SecurityGroupNotFoundForProject(project_id=project_id,
 
65
                                                 security_group_id=group_name)
 
66
 
 
67
 
 
68
def return_security_group_get_by_name(context, project_id, group_name):
 
69
    return {'id': 1, 'name': group_name}
 
70
 
 
71
 
 
72
def return_security_group_get(context, security_group_id, session):
 
73
    return {'id': security_group_id}
 
74
 
 
75
 
 
76
def return_instance_add_security_group(context, instance_id,
 
77
                                       security_group_id):
 
78
    pass
 
79
 
 
80
 
 
81
class CreateserverextTest(test.TestCase):
 
82
 
 
83
    def setUp(self):
 
84
        super(CreateserverextTest, self).setUp()
 
85
 
 
86
    def tearDown(self):
 
87
        super(CreateserverextTest, self).tearDown()
 
88
 
 
89
    def _setup_mock_compute_api(self):
 
90
 
 
91
        class MockComputeAPI(nova.compute.API):
 
92
 
 
93
            def __init__(self):
 
94
                self.injected_files = None
 
95
                self.networks = None
 
96
                self.user_data = None
 
97
                self.db = db
 
98
 
 
99
            def create(self, *args, **kwargs):
 
100
                if 'injected_files' in kwargs:
 
101
                    self.injected_files = kwargs['injected_files']
 
102
                else:
 
103
                    self.injected_files = None
 
104
 
 
105
                if 'requested_networks' in kwargs:
 
106
                    self.networks = kwargs['requested_networks']
 
107
                else:
 
108
                    self.networks = None
 
109
 
 
110
                if 'user_data' in kwargs:
 
111
                    self.user_data = kwargs['user_data']
 
112
 
 
113
                return [{'id': '1234', 'display_name': 'fakeinstance',
 
114
                         'uuid': FAKE_UUID,
 
115
                         'user_id': 'fake',
 
116
                         'project_id': 'fake',
 
117
                         'created_at': "",
 
118
                         'updated_at': ""}]
 
119
 
 
120
            def set_admin_password(self, *args, **kwargs):
 
121
                pass
 
122
 
 
123
        def make_stub_method(canned_return):
 
124
            def stub_method(*args, **kwargs):
 
125
                return canned_return
 
126
            return stub_method
 
127
 
 
128
        compute_api = MockComputeAPI()
 
129
        self.stubs.Set(nova.compute, 'API', make_stub_method(compute_api))
 
130
        self.stubs.Set(
 
131
            nova.api.openstack.create_instance_helper.CreateInstanceHelper,
 
132
            '_get_kernel_ramdisk_from_image', make_stub_method((1, 1)))
 
133
        return compute_api
 
134
 
 
135
    def _create_security_group_request_dict(self, security_groups):
 
136
        server = {}
 
137
        server['name'] = 'new-server-test'
 
138
        server['imageRef'] = 1
 
139
        server['flavorRef'] = 1
 
140
        if security_groups is not None:
 
141
            sg_list = []
 
142
            for name in security_groups:
 
143
                sg_list.append({'name': name})
 
144
            server['security_groups'] = sg_list
 
145
        return {'server': server}
 
146
 
 
147
    def _create_networks_request_dict(self, networks):
 
148
        server = {}
 
149
        server['name'] = 'new-server-test'
 
150
        server['imageRef'] = 1
 
151
        server['flavorRef'] = 1
 
152
        if networks is not None:
 
153
            network_list = []
 
154
            for uuid, fixed_ip in networks:
 
155
                network_list.append({'uuid': uuid, 'fixed_ip': fixed_ip})
 
156
            server['networks'] = network_list
 
157
        return {'server': server}
 
158
 
 
159
    def _create_user_data_request_dict(self, user_data):
 
160
        server = {}
 
161
        server['name'] = 'new-server-test'
 
162
        server['imageRef'] = 1
 
163
        server['flavorRef'] = 1
 
164
        server['user_data'] = user_data
 
165
        return {'server': server}
 
166
 
 
167
    def _get_create_request_json(self, body_dict):
 
168
        req = webob.Request.blank('/v1.1/123/os-create-server-ext')
 
169
        req.headers['Content-Type'] = 'application/json'
 
170
        req.method = 'POST'
 
171
        req.body = json.dumps(body_dict)
 
172
        return req
 
173
 
 
174
    def _run_create_instance_with_mock_compute_api(self, request):
 
175
        compute_api = self._setup_mock_compute_api()
 
176
        response = request.get_response(fakes.wsgi_app())
 
177
        return compute_api, response
 
178
 
 
179
    def _format_xml_request_body(self, body_dict):
 
180
        server = body_dict['server']
 
181
        body_parts = []
 
182
        body_parts.extend([
 
183
            '<?xml version="1.0" encoding="UTF-8"?>',
 
184
            '<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.1"',
 
185
            ' name="%s" imageRef="%s" flavorRef="%s">' % (
 
186
                    server['name'], server['imageRef'], server['flavorRef'])])
 
187
        if 'metadata' in server:
 
188
            metadata = server['metadata']
 
189
            body_parts.append('<metadata>')
 
190
            for item in metadata.iteritems():
 
191
                body_parts.append('<meta key="%s">%s</meta>' % item)
 
192
            body_parts.append('</metadata>')
 
193
        if 'personality' in server:
 
194
            personalities = server['personality']
 
195
            body_parts.append('<personality>')
 
196
            for file in personalities:
 
197
                item = (file['path'], file['contents'])
 
198
                body_parts.append('<file path="%s">%s</file>' % item)
 
199
            body_parts.append('</personality>')
 
200
        if 'networks' in server:
 
201
            networks = server['networks']
 
202
            body_parts.append('<networks>')
 
203
            for network in networks:
 
204
                item = (network['uuid'], network['fixed_ip'])
 
205
                body_parts.append('<network uuid="%s" fixed_ip="%s"></network>'
 
206
                                  % item)
 
207
            body_parts.append('</networks>')
 
208
        body_parts.append('</server>')
 
209
        return ''.join(body_parts)
 
210
 
 
211
    def _get_create_request_xml(self, body_dict):
 
212
        req = webob.Request.blank('/v1.1/123/os-create-server-ext')
 
213
        req.content_type = 'application/xml'
 
214
        req.accept = 'application/xml'
 
215
        req.method = 'POST'
 
216
        req.body = self._format_xml_request_body(body_dict)
 
217
        return req
 
218
 
 
219
    def _create_instance_with_networks_json(self, networks):
 
220
        body_dict = self._create_networks_request_dict(networks)
 
221
        request = self._get_create_request_json(body_dict)
 
222
        compute_api, response = \
 
223
            self._run_create_instance_with_mock_compute_api(request)
 
224
        return request, response, compute_api.networks
 
225
 
 
226
    def _create_instance_with_user_data_json(self, networks):
 
227
        body_dict = self._create_user_data_request_dict(networks)
 
228
        request = self._get_create_request_json(body_dict)
 
229
        compute_api, response = \
 
230
            self._run_create_instance_with_mock_compute_api(request)
 
231
        return request, response, compute_api.user_data
 
232
 
 
233
    def _create_instance_with_networks_xml(self, networks):
 
234
        body_dict = self._create_networks_request_dict(networks)
 
235
        request = self._get_create_request_xml(body_dict)
 
236
        compute_api, response = \
 
237
            self._run_create_instance_with_mock_compute_api(request)
 
238
        return request, response, compute_api.networks
 
239
 
 
240
    def test_create_instance_with_no_networks(self):
 
241
        request, response, networks = \
 
242
                self._create_instance_with_networks_json(networks=None)
 
243
        self.assertEquals(response.status_int, 202)
 
244
        self.assertEquals(networks, None)
 
245
 
 
246
    def test_create_instance_with_no_networks_xml(self):
 
247
        request, response, networks = \
 
248
                self._create_instance_with_networks_xml(networks=None)
 
249
        self.assertEquals(response.status_int, 202)
 
250
        self.assertEquals(networks, None)
 
251
 
 
252
    def test_create_instance_with_one_network(self):
 
253
        request, response, networks = \
 
254
            self._create_instance_with_networks_json([FAKE_NETWORKS[0]])
 
255
        self.assertEquals(response.status_int, 202)
 
256
        self.assertEquals(networks, [FAKE_NETWORKS[0]])
 
257
 
 
258
    def test_create_instance_with_one_network_xml(self):
 
259
        request, response, networks = \
 
260
            self._create_instance_with_networks_xml([FAKE_NETWORKS[0]])
 
261
        self.assertEquals(response.status_int, 202)
 
262
        self.assertEquals(networks, [FAKE_NETWORKS[0]])
 
263
 
 
264
    def test_create_instance_with_two_networks(self):
 
265
        request, response, networks = \
 
266
            self._create_instance_with_networks_json(FAKE_NETWORKS)
 
267
        self.assertEquals(response.status_int, 202)
 
268
        self.assertEquals(networks, FAKE_NETWORKS)
 
269
 
 
270
    def test_create_instance_with_two_networks_xml(self):
 
271
        request, response, networks = \
 
272
            self._create_instance_with_networks_xml(FAKE_NETWORKS)
 
273
        self.assertEquals(response.status_int, 202)
 
274
        self.assertEquals(networks, FAKE_NETWORKS)
 
275
 
 
276
    def test_create_instance_with_duplicate_networks(self):
 
277
        request, response, networks = \
 
278
            self._create_instance_with_networks_json(DUPLICATE_NETWORKS)
 
279
        self.assertEquals(response.status_int, 400)
 
280
        self.assertEquals(networks, None)
 
281
 
 
282
    def test_create_instance_with_duplicate_networks_xml(self):
 
283
        request, response, networks = \
 
284
            self._create_instance_with_networks_xml(DUPLICATE_NETWORKS)
 
285
        self.assertEquals(response.status_int, 400)
 
286
        self.assertEquals(networks, None)
 
287
 
 
288
    def test_create_instance_with_network_no_id(self):
 
289
        body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
 
290
        del body_dict['server']['networks'][0]['uuid']
 
291
        request = self._get_create_request_json(body_dict)
 
292
        compute_api, response = \
 
293
            self._run_create_instance_with_mock_compute_api(request)
 
294
        self.assertEquals(response.status_int, 400)
 
295
        self.assertEquals(compute_api.networks, None)
 
296
 
 
297
    def test_create_instance_with_network_no_id_xml(self):
 
298
        body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
 
299
        request = self._get_create_request_xml(body_dict)
 
300
        uuid = ' uuid="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"'
 
301
        request.body = request.body.replace(uuid, '')
 
302
        compute_api, response = \
 
303
            self._run_create_instance_with_mock_compute_api(request)
 
304
        self.assertEquals(response.status_int, 400)
 
305
        self.assertEquals(compute_api.networks, None)
 
306
 
 
307
    def test_create_instance_with_network_invalid_id(self):
 
308
        request, response, networks = \
 
309
            self._create_instance_with_networks_json(INVALID_NETWORKS)
 
310
        self.assertEquals(response.status_int, 400)
 
311
        self.assertEquals(networks, None)
 
312
 
 
313
    def test_create_instance_with_network_invalid_id_xml(self):
 
314
        request, response, networks = \
 
315
            self._create_instance_with_networks_xml(INVALID_NETWORKS)
 
316
        self.assertEquals(response.status_int, 400)
 
317
        self.assertEquals(networks, None)
 
318
 
 
319
    def test_create_instance_with_network_empty_fixed_ip(self):
 
320
        networks = [('1', '')]
 
321
        request, response, networks = \
 
322
            self._create_instance_with_networks_json(networks)
 
323
        self.assertEquals(response.status_int, 400)
 
324
        self.assertEquals(networks, None)
 
325
 
 
326
    def test_create_instance_with_network_non_string_fixed_ip(self):
 
327
        networks = [('1', 12345)]
 
328
        request, response, networks = \
 
329
            self._create_instance_with_networks_json(networks)
 
330
        self.assertEquals(response.status_int, 400)
 
331
        self.assertEquals(networks, None)
 
332
 
 
333
    def test_create_instance_with_network_empty_fixed_ip_xml(self):
 
334
        networks = [('1', '')]
 
335
        request, response, networks = \
 
336
            self._create_instance_with_networks_xml(networks)
 
337
        self.assertEquals(response.status_int, 400)
 
338
        self.assertEquals(networks, None)
 
339
 
 
340
    def test_create_instance_with_network_no_fixed_ip(self):
 
341
        body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
 
342
        del body_dict['server']['networks'][0]['fixed_ip']
 
343
        request = self._get_create_request_json(body_dict)
 
344
        compute_api, response = \
 
345
            self._run_create_instance_with_mock_compute_api(request)
 
346
        self.assertEquals(response.status_int, 202)
 
347
        self.assertEquals(compute_api.networks,
 
348
                          [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', None)])
 
349
 
 
350
    def test_create_instance_with_network_no_fixed_ip_xml(self):
 
351
        body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
 
352
        request = self._get_create_request_xml(body_dict)
 
353
        request.body = request.body.replace(' fixed_ip="10.0.1.12"', '')
 
354
        compute_api, response = \
 
355
            self._run_create_instance_with_mock_compute_api(request)
 
356
        self.assertEquals(response.status_int, 202)
 
357
        self.assertEquals(compute_api.networks,
 
358
                          [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', None)])
 
359
 
 
360
    def test_create_instance_with_userdata(self):
 
361
        user_data_contents = '#!/bin/bash\necho "Oh no!"\n'
 
362
        user_data_contents = base64.b64encode(user_data_contents)
 
363
        request, response, user_data = \
 
364
                self._create_instance_with_user_data_json(user_data_contents)
 
365
        self.assertEquals(response.status_int, 202)
 
366
        self.assertEquals(user_data, user_data_contents)
 
367
 
 
368
    def test_create_instance_with_userdata_none(self):
 
369
        user_data_contents = None
 
370
        request, response, user_data = \
 
371
                self._create_instance_with_user_data_json(user_data_contents)
 
372
        self.assertEquals(response.status_int, 202)
 
373
        self.assertEquals(user_data, user_data_contents)
 
374
 
 
375
    def test_create_instance_with_userdata_with_non_b64_content(self):
 
376
        user_data_contents = '#!/bin/bash\necho "Oh no!"\n'
 
377
        request, response, user_data = \
 
378
                self._create_instance_with_user_data_json(user_data_contents)
 
379
        self.assertEquals(response.status_int, 400)
 
380
        self.assertEquals(user_data, None)
 
381
 
 
382
    def test_create_instance_with_security_group_json(self):
 
383
        security_groups = ['test', 'test1']
 
384
        self.stubs.Set(nova.db.api, 'security_group_get_by_name',
 
385
                       return_security_group_get_by_name)
 
386
        self.stubs.Set(nova.db.api, 'instance_add_security_group',
 
387
                       return_instance_add_security_group)
 
388
        body_dict = self._create_security_group_request_dict(security_groups)
 
389
        request = self._get_create_request_json(body_dict)
 
390
        response = request.get_response(fakes.wsgi_app())
 
391
        self.assertEquals(response.status_int, 202)
 
392
 
 
393
    def test_get_server_by_id_verify_security_groups_json(self):
 
394
        self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
 
395
        req = webob.Request.blank('/v1.1/123/os-create-server-ext/1')
 
396
        req.headers['Content-Type'] = 'application/json'
 
397
        response = req.get_response(fakes.wsgi_app())
 
398
        self.assertEquals(response.status_int, 200)
 
399
        res_dict = json.loads(response.body)
 
400
        expected_security_group = [{"name": "test"}]
 
401
        self.assertEquals(res_dict['server']['security_groups'],
 
402
                          expected_security_group)
 
403
 
 
404
    def test_get_server_by_id_verify_security_groups_xml(self):
 
405
        self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
 
406
        req = webob.Request.blank('/v1.1/123/os-create-server-ext/1')
 
407
        req.headers['Accept'] = 'application/xml'
 
408
        response = req.get_response(fakes.wsgi_app())
 
409
        self.assertEquals(response.status_int, 200)
 
410
        dom = minidom.parseString(response.body)
 
411
        server = dom.childNodes[0]
 
412
        sec_groups = server.getElementsByTagName('security_groups')[0]
 
413
        sec_group = sec_groups.getElementsByTagName('security_group')[0]
 
414
        self.assertEqual(INSTANCE['security_groups'][0]['name'],
 
415
                         sec_group.getAttribute("name"))