1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010-2011 OpenStack LLC.
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
22
from xml.dom import minidom
28
from nova import exception
29
from nova import flags
31
import nova.api.openstack
32
from nova.tests.api.openstack import fakes
38
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
40
FAKE_NETWORKS = [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12'),
41
('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', '10.0.2.12')]
43
DUPLICATE_NETWORKS = [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12'),
44
('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12')]
46
INVALID_NETWORKS = [('invalid', 'invalid-ip-address')]
50
"display_name": "test_server",
52
"created_at": datetime.datetime(2010, 10, 10, 12, 0, 0),
53
"updated_at": datetime.datetime(2010, 11, 11, 11, 0, 0),
54
"security_groups": [{"id": 1, "name": "test"}]
58
def return_server_by_id(context, id, session=None):
63
def return_security_group_non_existing(context, project_id, group_name):
64
raise exception.SecurityGroupNotFoundForProject(project_id=project_id,
65
security_group_id=group_name)
68
def return_security_group_get_by_name(context, project_id, group_name):
69
return {'id': 1, 'name': group_name}
72
def return_security_group_get(context, security_group_id, session):
73
return {'id': security_group_id}
76
def return_instance_add_security_group(context, instance_id,
81
class CreateserverextTest(test.TestCase):
84
super(CreateserverextTest, self).setUp()
87
super(CreateserverextTest, self).tearDown()
89
def _setup_mock_compute_api(self):
91
class MockComputeAPI(nova.compute.API):
94
self.injected_files = None
99
def create(self, *args, **kwargs):
100
if 'injected_files' in kwargs:
101
self.injected_files = kwargs['injected_files']
103
self.injected_files = None
105
if 'requested_networks' in kwargs:
106
self.networks = kwargs['requested_networks']
110
if 'user_data' in kwargs:
111
self.user_data = kwargs['user_data']
113
return [{'id': '1234', 'display_name': 'fakeinstance',
116
'project_id': 'fake',
120
def set_admin_password(self, *args, **kwargs):
123
def make_stub_method(canned_return):
124
def stub_method(*args, **kwargs):
128
compute_api = MockComputeAPI()
129
self.stubs.Set(nova.compute, 'API', make_stub_method(compute_api))
131
nova.api.openstack.create_instance_helper.CreateInstanceHelper,
132
'_get_kernel_ramdisk_from_image', make_stub_method((1, 1)))
135
def _create_security_group_request_dict(self, security_groups):
137
server['name'] = 'new-server-test'
138
server['imageRef'] = 1
139
server['flavorRef'] = 1
140
if security_groups is not None:
142
for name in security_groups:
143
sg_list.append({'name': name})
144
server['security_groups'] = sg_list
145
return {'server': server}
147
def _create_networks_request_dict(self, networks):
149
server['name'] = 'new-server-test'
150
server['imageRef'] = 1
151
server['flavorRef'] = 1
152
if networks is not None:
154
for uuid, fixed_ip in networks:
155
network_list.append({'uuid': uuid, 'fixed_ip': fixed_ip})
156
server['networks'] = network_list
157
return {'server': server}
159
def _create_user_data_request_dict(self, user_data):
161
server['name'] = 'new-server-test'
162
server['imageRef'] = 1
163
server['flavorRef'] = 1
164
server['user_data'] = user_data
165
return {'server': server}
167
def _get_create_request_json(self, body_dict):
168
req = webob.Request.blank('/v1.1/123/os-create-server-ext')
169
req.headers['Content-Type'] = 'application/json'
171
req.body = json.dumps(body_dict)
174
def _run_create_instance_with_mock_compute_api(self, request):
175
compute_api = self._setup_mock_compute_api()
176
response = request.get_response(fakes.wsgi_app())
177
return compute_api, response
179
def _format_xml_request_body(self, body_dict):
180
server = body_dict['server']
183
'<?xml version="1.0" encoding="UTF-8"?>',
184
'<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.1"',
185
' name="%s" imageRef="%s" flavorRef="%s">' % (
186
server['name'], server['imageRef'], server['flavorRef'])])
187
if 'metadata' in server:
188
metadata = server['metadata']
189
body_parts.append('<metadata>')
190
for item in metadata.iteritems():
191
body_parts.append('<meta key="%s">%s</meta>' % item)
192
body_parts.append('</metadata>')
193
if 'personality' in server:
194
personalities = server['personality']
195
body_parts.append('<personality>')
196
for file in personalities:
197
item = (file['path'], file['contents'])
198
body_parts.append('<file path="%s">%s</file>' % item)
199
body_parts.append('</personality>')
200
if 'networks' in server:
201
networks = server['networks']
202
body_parts.append('<networks>')
203
for network in networks:
204
item = (network['uuid'], network['fixed_ip'])
205
body_parts.append('<network uuid="%s" fixed_ip="%s"></network>'
207
body_parts.append('</networks>')
208
body_parts.append('</server>')
209
return ''.join(body_parts)
211
def _get_create_request_xml(self, body_dict):
212
req = webob.Request.blank('/v1.1/123/os-create-server-ext')
213
req.content_type = 'application/xml'
214
req.accept = 'application/xml'
216
req.body = self._format_xml_request_body(body_dict)
219
def _create_instance_with_networks_json(self, networks):
220
body_dict = self._create_networks_request_dict(networks)
221
request = self._get_create_request_json(body_dict)
222
compute_api, response = \
223
self._run_create_instance_with_mock_compute_api(request)
224
return request, response, compute_api.networks
226
def _create_instance_with_user_data_json(self, networks):
227
body_dict = self._create_user_data_request_dict(networks)
228
request = self._get_create_request_json(body_dict)
229
compute_api, response = \
230
self._run_create_instance_with_mock_compute_api(request)
231
return request, response, compute_api.user_data
233
def _create_instance_with_networks_xml(self, networks):
234
body_dict = self._create_networks_request_dict(networks)
235
request = self._get_create_request_xml(body_dict)
236
compute_api, response = \
237
self._run_create_instance_with_mock_compute_api(request)
238
return request, response, compute_api.networks
240
def test_create_instance_with_no_networks(self):
241
request, response, networks = \
242
self._create_instance_with_networks_json(networks=None)
243
self.assertEquals(response.status_int, 202)
244
self.assertEquals(networks, None)
246
def test_create_instance_with_no_networks_xml(self):
247
request, response, networks = \
248
self._create_instance_with_networks_xml(networks=None)
249
self.assertEquals(response.status_int, 202)
250
self.assertEquals(networks, None)
252
def test_create_instance_with_one_network(self):
253
request, response, networks = \
254
self._create_instance_with_networks_json([FAKE_NETWORKS[0]])
255
self.assertEquals(response.status_int, 202)
256
self.assertEquals(networks, [FAKE_NETWORKS[0]])
258
def test_create_instance_with_one_network_xml(self):
259
request, response, networks = \
260
self._create_instance_with_networks_xml([FAKE_NETWORKS[0]])
261
self.assertEquals(response.status_int, 202)
262
self.assertEquals(networks, [FAKE_NETWORKS[0]])
264
def test_create_instance_with_two_networks(self):
265
request, response, networks = \
266
self._create_instance_with_networks_json(FAKE_NETWORKS)
267
self.assertEquals(response.status_int, 202)
268
self.assertEquals(networks, FAKE_NETWORKS)
270
def test_create_instance_with_two_networks_xml(self):
271
request, response, networks = \
272
self._create_instance_with_networks_xml(FAKE_NETWORKS)
273
self.assertEquals(response.status_int, 202)
274
self.assertEquals(networks, FAKE_NETWORKS)
276
def test_create_instance_with_duplicate_networks(self):
277
request, response, networks = \
278
self._create_instance_with_networks_json(DUPLICATE_NETWORKS)
279
self.assertEquals(response.status_int, 400)
280
self.assertEquals(networks, None)
282
def test_create_instance_with_duplicate_networks_xml(self):
283
request, response, networks = \
284
self._create_instance_with_networks_xml(DUPLICATE_NETWORKS)
285
self.assertEquals(response.status_int, 400)
286
self.assertEquals(networks, None)
288
def test_create_instance_with_network_no_id(self):
289
body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
290
del body_dict['server']['networks'][0]['uuid']
291
request = self._get_create_request_json(body_dict)
292
compute_api, response = \
293
self._run_create_instance_with_mock_compute_api(request)
294
self.assertEquals(response.status_int, 400)
295
self.assertEquals(compute_api.networks, None)
297
def test_create_instance_with_network_no_id_xml(self):
298
body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
299
request = self._get_create_request_xml(body_dict)
300
uuid = ' uuid="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"'
301
request.body = request.body.replace(uuid, '')
302
compute_api, response = \
303
self._run_create_instance_with_mock_compute_api(request)
304
self.assertEquals(response.status_int, 400)
305
self.assertEquals(compute_api.networks, None)
307
def test_create_instance_with_network_invalid_id(self):
308
request, response, networks = \
309
self._create_instance_with_networks_json(INVALID_NETWORKS)
310
self.assertEquals(response.status_int, 400)
311
self.assertEquals(networks, None)
313
def test_create_instance_with_network_invalid_id_xml(self):
314
request, response, networks = \
315
self._create_instance_with_networks_xml(INVALID_NETWORKS)
316
self.assertEquals(response.status_int, 400)
317
self.assertEquals(networks, None)
319
def test_create_instance_with_network_empty_fixed_ip(self):
320
networks = [('1', '')]
321
request, response, networks = \
322
self._create_instance_with_networks_json(networks)
323
self.assertEquals(response.status_int, 400)
324
self.assertEquals(networks, None)
326
def test_create_instance_with_network_non_string_fixed_ip(self):
327
networks = [('1', 12345)]
328
request, response, networks = \
329
self._create_instance_with_networks_json(networks)
330
self.assertEquals(response.status_int, 400)
331
self.assertEquals(networks, None)
333
def test_create_instance_with_network_empty_fixed_ip_xml(self):
334
networks = [('1', '')]
335
request, response, networks = \
336
self._create_instance_with_networks_xml(networks)
337
self.assertEquals(response.status_int, 400)
338
self.assertEquals(networks, None)
340
def test_create_instance_with_network_no_fixed_ip(self):
341
body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
342
del body_dict['server']['networks'][0]['fixed_ip']
343
request = self._get_create_request_json(body_dict)
344
compute_api, response = \
345
self._run_create_instance_with_mock_compute_api(request)
346
self.assertEquals(response.status_int, 202)
347
self.assertEquals(compute_api.networks,
348
[('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', None)])
350
def test_create_instance_with_network_no_fixed_ip_xml(self):
351
body_dict = self._create_networks_request_dict([FAKE_NETWORKS[0]])
352
request = self._get_create_request_xml(body_dict)
353
request.body = request.body.replace(' fixed_ip="10.0.1.12"', '')
354
compute_api, response = \
355
self._run_create_instance_with_mock_compute_api(request)
356
self.assertEquals(response.status_int, 202)
357
self.assertEquals(compute_api.networks,
358
[('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', None)])
360
def test_create_instance_with_userdata(self):
361
user_data_contents = '#!/bin/bash\necho "Oh no!"\n'
362
user_data_contents = base64.b64encode(user_data_contents)
363
request, response, user_data = \
364
self._create_instance_with_user_data_json(user_data_contents)
365
self.assertEquals(response.status_int, 202)
366
self.assertEquals(user_data, user_data_contents)
368
def test_create_instance_with_userdata_none(self):
369
user_data_contents = None
370
request, response, user_data = \
371
self._create_instance_with_user_data_json(user_data_contents)
372
self.assertEquals(response.status_int, 202)
373
self.assertEquals(user_data, user_data_contents)
375
def test_create_instance_with_userdata_with_non_b64_content(self):
376
user_data_contents = '#!/bin/bash\necho "Oh no!"\n'
377
request, response, user_data = \
378
self._create_instance_with_user_data_json(user_data_contents)
379
self.assertEquals(response.status_int, 400)
380
self.assertEquals(user_data, None)
382
def test_create_instance_with_security_group_json(self):
383
security_groups = ['test', 'test1']
384
self.stubs.Set(nova.db.api, 'security_group_get_by_name',
385
return_security_group_get_by_name)
386
self.stubs.Set(nova.db.api, 'instance_add_security_group',
387
return_instance_add_security_group)
388
body_dict = self._create_security_group_request_dict(security_groups)
389
request = self._get_create_request_json(body_dict)
390
response = request.get_response(fakes.wsgi_app())
391
self.assertEquals(response.status_int, 202)
393
def test_get_server_by_id_verify_security_groups_json(self):
394
self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
395
req = webob.Request.blank('/v1.1/123/os-create-server-ext/1')
396
req.headers['Content-Type'] = 'application/json'
397
response = req.get_response(fakes.wsgi_app())
398
self.assertEquals(response.status_int, 200)
399
res_dict = json.loads(response.body)
400
expected_security_group = [{"name": "test"}]
401
self.assertEquals(res_dict['server']['security_groups'],
402
expected_security_group)
404
def test_get_server_by_id_verify_security_groups_xml(self):
405
self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
406
req = webob.Request.blank('/v1.1/123/os-create-server-ext/1')
407
req.headers['Accept'] = 'application/xml'
408
response = req.get_response(fakes.wsgi_app())
409
self.assertEquals(response.status_int, 200)
410
dom = minidom.parseString(response.body)
411
server = dom.childNodes[0]
412
sec_groups = server.getElementsByTagName('security_groups')[0]
413
sec_group = sec_groups.getElementsByTagName('security_group')[0]
414
self.assertEqual(INSTANCE['security_groups'][0]['name'],
415
sec_group.getAttribute("name"))