1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
1 |
# Copyright 2014-2015 Canonical Limited.
|
2 |
#
|
|
3 |
# This file is part of charm-helpers.
|
|
4 |
#
|
|
5 |
# charm-helpers is free software: you can redistribute it and/or modify
|
|
6 |
# it under the terms of the GNU Lesser General Public License version 3 as
|
|
7 |
# published by the Free Software Foundation.
|
|
8 |
#
|
|
9 |
# charm-helpers is distributed in the hope that it will be useful,
|
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
12 |
# GNU Lesser General Public License for more details.
|
|
13 |
#
|
|
14 |
# You should have received a copy of the GNU Lesser General Public License
|
|
15 |
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
||
14
by Paul Larson
Add support for xenial |
17 |
import amulet |
18 |
import json |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
19 |
import logging |
20 |
import os |
|
14
by Paul Larson
Add support for xenial |
21 |
import re |
22 |
import six |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
23 |
import time |
24 |
import urllib |
|
25 |
||
14
by Paul Larson
Add support for xenial |
26 |
import cinderclient.v1.client as cinder_client |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
27 |
import glanceclient.v1.client as glance_client |
14
by Paul Larson
Add support for xenial |
28 |
import heatclient.v1.client as heat_client |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
29 |
import keystoneclient.v2_0 as keystone_client |
14
by Paul Larson
Add support for xenial |
30 |
from keystoneclient.auth.identity import v3 as keystone_id_v3 |
31 |
from keystoneclient import session as keystone_session |
|
32 |
from keystoneclient.v3 import client as keystone_client_v3 |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
33 |
|
14
by Paul Larson
Add support for xenial |
34 |
import novaclient.client as nova_client |
35 |
import pika |
|
36 |
import swiftclient |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
37 |
|
38 |
from charmhelpers.contrib.amulet.utils import ( |
|
39 |
AmuletUtils
|
|
40 |
)
|
|
41 |
||
42 |
DEBUG = logging.DEBUG |
|
43 |
ERROR = logging.ERROR |
|
44 |
||
14
by Paul Larson
Add support for xenial |
45 |
NOVA_CLIENT_VERSION = "2" |
46 |
||
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
47 |
|
48 |
class OpenStackAmuletUtils(AmuletUtils): |
|
49 |
"""OpenStack amulet utilities.
|
|
50 |
||
51 |
This class inherits from AmuletUtils and has additional support
|
|
14
by Paul Larson
Add support for xenial |
52 |
that is specifically for use by OpenStack charm tests.
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
53 |
"""
|
54 |
||
55 |
def __init__(self, log_level=ERROR): |
|
56 |
"""Initialize the deployment environment."""
|
|
57 |
super(OpenStackAmuletUtils, self).__init__(log_level) |
|
58 |
||
59 |
def validate_endpoint_data(self, endpoints, admin_port, internal_port, |
|
60 |
public_port, expected): |
|
61 |
"""Validate endpoint data.
|
|
62 |
||
63 |
Validate actual endpoint data vs expected endpoint data. The ports
|
|
64 |
are used to find the matching endpoint.
|
|
65 |
"""
|
|
14
by Paul Larson
Add support for xenial |
66 |
self.log.debug('Validating endpoint data...') |
67 |
self.log.debug('actual: {}'.format(repr(endpoints))) |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
68 |
found = False |
69 |
for ep in endpoints: |
|
70 |
self.log.debug('endpoint: {}'.format(repr(ep))) |
|
71 |
if (admin_port in ep.adminurl and |
|
72 |
internal_port in ep.internalurl and |
|
73 |
public_port in ep.publicurl): |
|
74 |
found = True |
|
75 |
actual = {'id': ep.id, |
|
76 |
'region': ep.region, |
|
77 |
'adminurl': ep.adminurl, |
|
78 |
'internalurl': ep.internalurl, |
|
79 |
'publicurl': ep.publicurl, |
|
80 |
'service_id': ep.service_id} |
|
81 |
ret = self._validate_dict_data(expected, actual) |
|
82 |
if ret: |
|
83 |
return 'unexpected endpoint data - {}'.format(ret) |
|
84 |
||
85 |
if not found: |
|
86 |
return 'endpoint not found' |
|
87 |
||
88 |
def validate_svc_catalog_endpoint_data(self, expected, actual): |
|
89 |
"""Validate service catalog endpoint data.
|
|
90 |
||
91 |
Validate a list of actual service catalog endpoints vs a list of
|
|
92 |
expected service catalog endpoints.
|
|
93 |
"""
|
|
14
by Paul Larson
Add support for xenial |
94 |
self.log.debug('Validating service catalog endpoint data...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
95 |
self.log.debug('actual: {}'.format(repr(actual))) |
96 |
for k, v in six.iteritems(expected): |
|
97 |
if k in actual: |
|
98 |
ret = self._validate_dict_data(expected[k][0], actual[k][0]) |
|
99 |
if ret: |
|
100 |
return self.endpoint_error(k, ret) |
|
101 |
else: |
|
102 |
return "endpoint {} does not exist".format(k) |
|
103 |
return ret |
|
104 |
||
105 |
def validate_tenant_data(self, expected, actual): |
|
106 |
"""Validate tenant data.
|
|
107 |
||
108 |
Validate a list of actual tenant data vs list of expected tenant
|
|
109 |
data.
|
|
110 |
"""
|
|
14
by Paul Larson
Add support for xenial |
111 |
self.log.debug('Validating tenant data...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
112 |
self.log.debug('actual: {}'.format(repr(actual))) |
113 |
for e in expected: |
|
114 |
found = False |
|
115 |
for act in actual: |
|
116 |
a = {'enabled': act.enabled, 'description': act.description, |
|
117 |
'name': act.name, 'id': act.id} |
|
118 |
if e['name'] == a['name']: |
|
119 |
found = True |
|
120 |
ret = self._validate_dict_data(e, a) |
|
121 |
if ret: |
|
122 |
return "unexpected tenant data - {}".format(ret) |
|
123 |
if not found: |
|
124 |
return "tenant {} does not exist".format(e['name']) |
|
125 |
return ret |
|
126 |
||
127 |
def validate_role_data(self, expected, actual): |
|
128 |
"""Validate role data.
|
|
129 |
||
130 |
Validate a list of actual role data vs a list of expected role
|
|
131 |
data.
|
|
132 |
"""
|
|
14
by Paul Larson
Add support for xenial |
133 |
self.log.debug('Validating role data...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
134 |
self.log.debug('actual: {}'.format(repr(actual))) |
135 |
for e in expected: |
|
136 |
found = False |
|
137 |
for act in actual: |
|
138 |
a = {'name': act.name, 'id': act.id} |
|
139 |
if e['name'] == a['name']: |
|
140 |
found = True |
|
141 |
ret = self._validate_dict_data(e, a) |
|
142 |
if ret: |
|
143 |
return "unexpected role data - {}".format(ret) |
|
144 |
if not found: |
|
145 |
return "role {} does not exist".format(e['name']) |
|
146 |
return ret |
|
147 |
||
14
by Paul Larson
Add support for xenial |
148 |
def validate_user_data(self, expected, actual, api_version=None): |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
149 |
"""Validate user data.
|
150 |
||
151 |
Validate a list of actual user data vs a list of expected user
|
|
152 |
data.
|
|
153 |
"""
|
|
14
by Paul Larson
Add support for xenial |
154 |
self.log.debug('Validating user data...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
155 |
self.log.debug('actual: {}'.format(repr(actual))) |
156 |
for e in expected: |
|
157 |
found = False |
|
158 |
for act in actual: |
|
14
by Paul Larson
Add support for xenial |
159 |
if e['name'] == act.name: |
160 |
a = {'enabled': act.enabled, 'name': act.name, |
|
161 |
'email': act.email, 'id': act.id} |
|
162 |
if api_version == 3: |
|
163 |
a['default_project_id'] = getattr(act, |
|
164 |
'default_project_id', |
|
165 |
'none') |
|
166 |
else: |
|
167 |
a['tenantId'] = act.tenantId |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
168 |
found = True |
169 |
ret = self._validate_dict_data(e, a) |
|
170 |
if ret: |
|
171 |
return "unexpected user data - {}".format(ret) |
|
172 |
if not found: |
|
173 |
return "user {} does not exist".format(e['name']) |
|
174 |
return ret |
|
175 |
||
176 |
def validate_flavor_data(self, expected, actual): |
|
177 |
"""Validate flavor data.
|
|
178 |
||
179 |
Validate a list of actual flavors vs a list of expected flavors.
|
|
180 |
"""
|
|
14
by Paul Larson
Add support for xenial |
181 |
self.log.debug('Validating flavor data...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
182 |
self.log.debug('actual: {}'.format(repr(actual))) |
183 |
act = [a.name for a in actual] |
|
184 |
return self._validate_list_data(expected, act) |
|
185 |
||
186 |
def tenant_exists(self, keystone, tenant): |
|
187 |
"""Return True if tenant exists."""
|
|
14
by Paul Larson
Add support for xenial |
188 |
self.log.debug('Checking if tenant exists ({})...'.format(tenant)) |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
189 |
return tenant in [t.name for t in keystone.tenants.list()] |
190 |
||
14
by Paul Larson
Add support for xenial |
191 |
def authenticate_cinder_admin(self, keystone_sentry, username, |
192 |
password, tenant): |
|
193 |
"""Authenticates admin user with cinder."""
|
|
194 |
# NOTE(beisner): cinder python client doesn't accept tokens.
|
|
195 |
service_ip = \ |
|
196 |
keystone_sentry.relation('shared-db', |
|
197 |
'mysql:shared-db')['private-address'] |
|
198 |
ept = "http://{}:5000/v2.0".format(service_ip.strip().decode('utf-8')) |
|
199 |
return cinder_client.Client(username, password, tenant, ept) |
|
200 |
||
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
201 |
def authenticate_keystone_admin(self, keystone_sentry, user, password, |
14
by Paul Larson
Add support for xenial |
202 |
tenant=None, api_version=None, |
203 |
keystone_ip=None): |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
204 |
"""Authenticates admin user with the keystone admin endpoint."""
|
14
by Paul Larson
Add support for xenial |
205 |
self.log.debug('Authenticating keystone admin...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
206 |
unit = keystone_sentry |
14
by Paul Larson
Add support for xenial |
207 |
if not keystone_ip: |
208 |
keystone_ip = unit.relation('shared-db', |
|
209 |
'mysql:shared-db')['private-address'] |
|
210 |
base_ep = "http://{}:35357".format(keystone_ip.strip().decode('utf-8')) |
|
211 |
if not api_version or api_version == 2: |
|
212 |
ep = base_ep + "/v2.0" |
|
213 |
return keystone_client.Client(username=user, password=password, |
|
214 |
tenant_name=tenant, auth_url=ep) |
|
215 |
else: |
|
216 |
ep = base_ep + "/v3" |
|
217 |
auth = keystone_id_v3.Password( |
|
218 |
user_domain_name='admin_domain', |
|
219 |
username=user, |
|
220 |
password=password, |
|
221 |
domain_name='admin_domain', |
|
222 |
auth_url=ep, |
|
223 |
)
|
|
224 |
sess = keystone_session.Session(auth=auth) |
|
225 |
return keystone_client_v3.Client(session=sess) |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
226 |
|
227 |
def authenticate_keystone_user(self, keystone, user, password, tenant): |
|
228 |
"""Authenticates a regular user with the keystone public endpoint."""
|
|
14
by Paul Larson
Add support for xenial |
229 |
self.log.debug('Authenticating keystone user ({})...'.format(user)) |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
230 |
ep = keystone.service_catalog.url_for(service_type='identity', |
231 |
endpoint_type='publicURL') |
|
232 |
return keystone_client.Client(username=user, password=password, |
|
233 |
tenant_name=tenant, auth_url=ep) |
|
234 |
||
235 |
def authenticate_glance_admin(self, keystone): |
|
236 |
"""Authenticates admin user with glance."""
|
|
14
by Paul Larson
Add support for xenial |
237 |
self.log.debug('Authenticating glance admin...') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
238 |
ep = keystone.service_catalog.url_for(service_type='image', |
239 |
endpoint_type='adminURL') |
|
240 |
return glance_client.Client(ep, token=keystone.auth_token) |
|
241 |
||
14
by Paul Larson
Add support for xenial |
242 |
def authenticate_heat_admin(self, keystone): |
243 |
"""Authenticates the admin user with heat."""
|
|
244 |
self.log.debug('Authenticating heat admin...') |
|
245 |
ep = keystone.service_catalog.url_for(service_type='orchestration', |
|
246 |
endpoint_type='publicURL') |
|
247 |
return heat_client.Client(endpoint=ep, token=keystone.auth_token) |
|
248 |
||
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
249 |
def authenticate_nova_user(self, keystone, user, password, tenant): |
250 |
"""Authenticates a regular user with nova-api."""
|
|
14
by Paul Larson
Add support for xenial |
251 |
self.log.debug('Authenticating nova user ({})...'.format(user)) |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
252 |
ep = keystone.service_catalog.url_for(service_type='identity', |
253 |
endpoint_type='publicURL') |
|
14
by Paul Larson
Add support for xenial |
254 |
return nova_client.Client(NOVA_CLIENT_VERSION, |
255 |
username=user, api_key=password, |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
256 |
project_id=tenant, auth_url=ep) |
257 |
||
14
by Paul Larson
Add support for xenial |
258 |
def authenticate_swift_user(self, keystone, user, password, tenant): |
259 |
"""Authenticates a regular user with swift api."""
|
|
260 |
self.log.debug('Authenticating swift user ({})...'.format(user)) |
|
261 |
ep = keystone.service_catalog.url_for(service_type='identity', |
|
262 |
endpoint_type='publicURL') |
|
263 |
return swiftclient.Connection(authurl=ep, |
|
264 |
user=user, |
|
265 |
key=password, |
|
266 |
tenant_name=tenant, |
|
267 |
auth_version='2.0') |
|
268 |
||
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
269 |
def create_cirros_image(self, glance, image_name): |
14
by Paul Larson
Add support for xenial |
270 |
"""Download the latest cirros image and upload it to glance,
|
271 |
validate and return a resource pointer.
|
|
272 |
||
273 |
:param glance: pointer to authenticated glance connection
|
|
274 |
:param image_name: display name for new image
|
|
275 |
:returns: glance image pointer
|
|
276 |
"""
|
|
277 |
self.log.debug('Creating glance cirros image ' |
|
278 |
'({})...'.format(image_name)) |
|
279 |
||
280 |
# Download cirros image
|
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
281 |
http_proxy = os.getenv('AMULET_HTTP_PROXY') |
282 |
self.log.debug('AMULET_HTTP_PROXY: {}'.format(http_proxy)) |
|
283 |
if http_proxy: |
|
284 |
proxies = {'http': http_proxy} |
|
285 |
opener = urllib.FancyURLopener(proxies) |
|
286 |
else: |
|
287 |
opener = urllib.FancyURLopener() |
|
288 |
||
14
by Paul Larson
Add support for xenial |
289 |
f = opener.open('http://download.cirros-cloud.net/version/released') |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
290 |
version = f.read().strip() |
14
by Paul Larson
Add support for xenial |
291 |
cirros_img = 'cirros-{}-x86_64-disk.img'.format(version) |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
292 |
local_path = os.path.join('tests', cirros_img) |
293 |
||
294 |
if not os.path.exists(local_path): |
|
14
by Paul Larson
Add support for xenial |
295 |
cirros_url = 'http://{}/{}/{}'.format('download.cirros-cloud.net', |
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
296 |
version, cirros_img) |
297 |
opener.retrieve(cirros_url, local_path) |
|
298 |
f.close() |
|
299 |
||
14
by Paul Larson
Add support for xenial |
300 |
# Create glance image
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
301 |
with open(local_path) as f: |
302 |
image = glance.images.create(name=image_name, is_public=True, |
|
303 |
disk_format='qcow2', |
|
304 |
container_format='bare', data=f) |
|
14
by Paul Larson
Add support for xenial |
305 |
|
306 |
# Wait for image to reach active status
|
|
307 |
img_id = image.id |
|
308 |
ret = self.resource_reaches_status(glance.images, img_id, |
|
309 |
expected_stat='active', |
|
310 |
msg='Image status wait') |
|
311 |
if not ret: |
|
312 |
msg = 'Glance image failed to reach expected state.' |
|
313 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
314 |
||
315 |
# Re-validate new image
|
|
316 |
self.log.debug('Validating image attributes...') |
|
317 |
val_img_name = glance.images.get(img_id).name |
|
318 |
val_img_stat = glance.images.get(img_id).status |
|
319 |
val_img_pub = glance.images.get(img_id).is_public |
|
320 |
val_img_cfmt = glance.images.get(img_id).container_format |
|
321 |
val_img_dfmt = glance.images.get(img_id).disk_format |
|
322 |
msg_attr = ('Image attributes - name:{} public:{} id:{} stat:{} ' |
|
323 |
'container fmt:{} disk fmt:{}'.format( |
|
324 |
val_img_name, val_img_pub, img_id, |
|
325 |
val_img_stat, val_img_cfmt, val_img_dfmt)) |
|
326 |
||
327 |
if val_img_name == image_name and val_img_stat == 'active' \ |
|
328 |
and val_img_pub is True and val_img_cfmt == 'bare' \ |
|
329 |
and val_img_dfmt == 'qcow2': |
|
330 |
self.log.debug(msg_attr) |
|
331 |
else: |
|
332 |
msg = ('Volume validation failed, {}'.format(msg_attr)) |
|
333 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
334 |
|
335 |
return image |
|
336 |
||
337 |
def delete_image(self, glance, image): |
|
338 |
"""Delete the specified image."""
|
|
14
by Paul Larson
Add support for xenial |
339 |
|
340 |
# /!\ DEPRECATION WARNING
|
|
341 |
self.log.warn('/!\\ DEPRECATION WARNING: use ' |
|
342 |
'delete_resource instead of delete_image.') |
|
343 |
self.log.debug('Deleting glance image ({})...'.format(image)) |
|
344 |
return self.delete_resource(glance.images, image, msg='glance image') |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
345 |
|
346 |
def create_instance(self, nova, image_name, instance_name, flavor): |
|
347 |
"""Create the specified instance."""
|
|
14
by Paul Larson
Add support for xenial |
348 |
self.log.debug('Creating instance ' |
349 |
'({}|{}|{})'.format(instance_name, image_name, flavor)) |
|
1
by Paul Larson
snappy-device-agent charm for deploying provisioning kits to work with SPI |
350 |
image = nova.images.find(name=image_name) |
351 |
flavor = nova.flavors.find(name=flavor) |
|
352 |
instance = nova.servers.create(name=instance_name, image=image, |
|
353 |
flavor=flavor) |
|
354 |
||
355 |
count = 1 |
|
356 |
status = instance.status |
|
357 |
while status != 'ACTIVE' and count < 60: |
|
358 |
time.sleep(3) |
|
359 |
instance = nova.servers.get(instance.id) |
|
360 |
status = instance.status |
|
361 |
self.log.debug('instance status: {}'.format(status)) |
|
362 |
count += 1 |
|
363 |
||
364 |
if status != 'ACTIVE': |
|
365 |
self.log.error('instance creation timed out') |
|
366 |
return None |
|
367 |
||
368 |
return instance |
|
369 |
||
370 |
def delete_instance(self, nova, instance): |
|
371 |
"""Delete the specified instance."""
|
|
14
by Paul Larson
Add support for xenial |
372 |
|
373 |
# /!\ DEPRECATION WARNING
|
|
374 |
self.log.warn('/!\\ DEPRECATION WARNING: use ' |
|
375 |
'delete_resource instead of delete_instance.') |
|
376 |
self.log.debug('Deleting instance ({})...'.format(instance)) |
|
377 |
return self.delete_resource(nova.servers, instance, |
|
378 |
msg='nova instance') |
|
379 |
||
380 |
def create_or_get_keypair(self, nova, keypair_name="testkey"): |
|
381 |
"""Create a new keypair, or return pointer if it already exists."""
|
|
382 |
try: |
|
383 |
_keypair = nova.keypairs.get(keypair_name) |
|
384 |
self.log.debug('Keypair ({}) already exists, ' |
|
385 |
'using it.'.format(keypair_name)) |
|
386 |
return _keypair |
|
387 |
except: |
|
388 |
self.log.debug('Keypair ({}) does not exist, ' |
|
389 |
'creating it.'.format(keypair_name)) |
|
390 |
||
391 |
_keypair = nova.keypairs.create(name=keypair_name) |
|
392 |
return _keypair |
|
393 |
||
394 |
def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1, |
|
395 |
img_id=None, src_vol_id=None, snap_id=None): |
|
396 |
"""Create cinder volume, optionally from a glance image, OR
|
|
397 |
optionally as a clone of an existing volume, OR optionally
|
|
398 |
from a snapshot. Wait for the new volume status to reach
|
|
399 |
the expected status, validate and return a resource pointer.
|
|
400 |
||
401 |
:param vol_name: cinder volume display name
|
|
402 |
:param vol_size: size in gigabytes
|
|
403 |
:param img_id: optional glance image id
|
|
404 |
:param src_vol_id: optional source volume id to clone
|
|
405 |
:param snap_id: optional snapshot id to use
|
|
406 |
:returns: cinder volume pointer
|
|
407 |
"""
|
|
408 |
# Handle parameter input and avoid impossible combinations
|
|
409 |
if img_id and not src_vol_id and not snap_id: |
|
410 |
# Create volume from image
|
|
411 |
self.log.debug('Creating cinder volume from glance image...') |
|
412 |
bootable = 'true' |
|
413 |
elif src_vol_id and not img_id and not snap_id: |
|
414 |
# Clone an existing volume
|
|
415 |
self.log.debug('Cloning cinder volume...') |
|
416 |
bootable = cinder.volumes.get(src_vol_id).bootable |
|
417 |
elif snap_id and not src_vol_id and not img_id: |
|
418 |
# Create volume from snapshot
|
|
419 |
self.log.debug('Creating cinder volume from snapshot...') |
|
420 |
snap = cinder.volume_snapshots.find(id=snap_id) |
|
421 |
vol_size = snap.size |
|
422 |
snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id |
|
423 |
bootable = cinder.volumes.get(snap_vol_id).bootable |
|
424 |
elif not img_id and not src_vol_id and not snap_id: |
|
425 |
# Create volume
|
|
426 |
self.log.debug('Creating cinder volume...') |
|
427 |
bootable = 'false' |
|
428 |
else: |
|
429 |
# Impossible combination of parameters
|
|
430 |
msg = ('Invalid method use - name:{} size:{} img_id:{} ' |
|
431 |
'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size, |
|
432 |
img_id, src_vol_id, |
|
433 |
snap_id)) |
|
434 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
435 |
||
436 |
# Create new volume
|
|
437 |
try: |
|
438 |
vol_new = cinder.volumes.create(display_name=vol_name, |
|
439 |
imageRef=img_id, |
|
440 |
size=vol_size, |
|
441 |
source_volid=src_vol_id, |
|
442 |
snapshot_id=snap_id) |
|
443 |
vol_id = vol_new.id |
|
444 |
except Exception as e: |
|
445 |
msg = 'Failed to create volume: {}'.format(e) |
|
446 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
447 |
||
448 |
# Wait for volume to reach available status
|
|
449 |
ret = self.resource_reaches_status(cinder.volumes, vol_id, |
|
450 |
expected_stat="available", |
|
451 |
msg="Volume status wait") |
|
452 |
if not ret: |
|
453 |
msg = 'Cinder volume failed to reach expected state.' |
|
454 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
455 |
||
456 |
# Re-validate new volume
|
|
457 |
self.log.debug('Validating volume attributes...') |
|
458 |
val_vol_name = cinder.volumes.get(vol_id).display_name |
|
459 |
val_vol_boot = cinder.volumes.get(vol_id).bootable |
|
460 |
val_vol_stat = cinder.volumes.get(vol_id).status |
|
461 |
val_vol_size = cinder.volumes.get(vol_id).size |
|
462 |
msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:' |
|
463 |
'{} size:{}'.format(val_vol_name, vol_id, |
|
464 |
val_vol_stat, val_vol_boot, |
|
465 |
val_vol_size)) |
|
466 |
||
467 |
if val_vol_boot == bootable and val_vol_stat == 'available' \ |
|
468 |
and val_vol_name == vol_name and val_vol_size == vol_size: |
|
469 |
self.log.debug(msg_attr) |
|
470 |
else: |
|
471 |
msg = ('Volume validation failed, {}'.format(msg_attr)) |
|
472 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
473 |
||
474 |
return vol_new |
|
475 |
||
476 |
def delete_resource(self, resource, resource_id, |
|
477 |
msg="resource", max_wait=120): |
|
478 |
"""Delete one openstack resource, such as one instance, keypair,
|
|
479 |
image, volume, stack, etc., and confirm deletion within max wait time.
|
|
480 |
||
481 |
:param resource: pointer to os resource type, ex:glance_client.images
|
|
482 |
:param resource_id: unique name or id for the openstack resource
|
|
483 |
:param msg: text to identify purpose in logging
|
|
484 |
:param max_wait: maximum wait time in seconds
|
|
485 |
:returns: True if successful, otherwise False
|
|
486 |
"""
|
|
487 |
self.log.debug('Deleting OpenStack resource ' |
|
488 |
'{} ({})'.format(resource_id, msg)) |
|
489 |
num_before = len(list(resource.list())) |
|
490 |
resource.delete(resource_id) |
|
491 |
||
492 |
tries = 0 |
|
493 |
num_after = len(list(resource.list())) |
|
494 |
while num_after != (num_before - 1) and tries < (max_wait / 4): |
|
495 |
self.log.debug('{} delete check: ' |
|
496 |
'{} [{}:{}] {}'.format(msg, tries, |
|
497 |
num_before, |
|
498 |
num_after, |
|
499 |
resource_id)) |
|
500 |
time.sleep(4) |
|
501 |
num_after = len(list(resource.list())) |
|
502 |
tries += 1 |
|
503 |
||
504 |
self.log.debug('{}: expected, actual count = {}, ' |
|
505 |
'{}'.format(msg, num_before - 1, num_after)) |
|
506 |
||
507 |
if num_after == (num_before - 1): |
|
508 |
return True |
|
509 |
else: |
|
510 |
self.log.error('{} delete timed out'.format(msg)) |
|
511 |
return False |
|
512 |
||
513 |
def resource_reaches_status(self, resource, resource_id, |
|
514 |
expected_stat='available', |
|
515 |
msg='resource', max_wait=120): |
|
516 |
"""Wait for an openstack resources status to reach an
|
|
517 |
expected status within a specified time. Useful to confirm that
|
|
518 |
nova instances, cinder vols, snapshots, glance images, heat stacks
|
|
519 |
and other resources eventually reach the expected status.
|
|
520 |
||
521 |
:param resource: pointer to os resource type, ex: heat_client.stacks
|
|
522 |
:param resource_id: unique id for the openstack resource
|
|
523 |
:param expected_stat: status to expect resource to reach
|
|
524 |
:param msg: text to identify purpose in logging
|
|
525 |
:param max_wait: maximum wait time in seconds
|
|
526 |
:returns: True if successful, False if status is not reached
|
|
527 |
"""
|
|
528 |
||
529 |
tries = 0 |
|
530 |
resource_stat = resource.get(resource_id).status |
|
531 |
while resource_stat != expected_stat and tries < (max_wait / 4): |
|
532 |
self.log.debug('{} status check: ' |
|
533 |
'{} [{}:{}] {}'.format(msg, tries, |
|
534 |
resource_stat, |
|
535 |
expected_stat, |
|
536 |
resource_id)) |
|
537 |
time.sleep(4) |
|
538 |
resource_stat = resource.get(resource_id).status |
|
539 |
tries += 1 |
|
540 |
||
541 |
self.log.debug('{}: expected, actual status = {}, ' |
|
542 |
'{}'.format(msg, resource_stat, expected_stat)) |
|
543 |
||
544 |
if resource_stat == expected_stat: |
|
545 |
return True |
|
546 |
else: |
|
547 |
self.log.debug('{} never reached expected status: ' |
|
548 |
'{}'.format(resource_id, expected_stat)) |
|
549 |
return False |
|
550 |
||
551 |
def get_ceph_osd_id_cmd(self, index): |
|
552 |
"""Produce a shell command that will return a ceph-osd id."""
|
|
553 |
return ("`initctl list | grep 'ceph-osd ' | " |
|
554 |
"awk 'NR=={} {{ print $2 }}' | " |
|
555 |
"grep -o '[0-9]*'`".format(index + 1)) |
|
556 |
||
557 |
def get_ceph_pools(self, sentry_unit): |
|
558 |
"""Return a dict of ceph pools from a single ceph unit, with
|
|
559 |
pool name as keys, pool id as vals."""
|
|
560 |
pools = {} |
|
561 |
cmd = 'sudo ceph osd lspools' |
|
562 |
output, code = sentry_unit.run(cmd) |
|
563 |
if code != 0: |
|
564 |
msg = ('{} `{}` returned {} ' |
|
565 |
'{}'.format(sentry_unit.info['unit_name'], |
|
566 |
cmd, code, output)) |
|
567 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
568 |
||
569 |
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
|
|
570 |
for pool in str(output).split(','): |
|
571 |
pool_id_name = pool.split(' ') |
|
572 |
if len(pool_id_name) == 2: |
|
573 |
pool_id = pool_id_name[0] |
|
574 |
pool_name = pool_id_name[1] |
|
575 |
pools[pool_name] = int(pool_id) |
|
576 |
||
577 |
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'], |
|
578 |
pools)) |
|
579 |
return pools |
|
580 |
||
581 |
def get_ceph_df(self, sentry_unit): |
|
582 |
"""Return dict of ceph df json output, including ceph pool state.
|
|
583 |
||
584 |
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
|
|
585 |
:returns: Dict of ceph df output
|
|
586 |
"""
|
|
587 |
cmd = 'sudo ceph df --format=json' |
|
588 |
output, code = sentry_unit.run(cmd) |
|
589 |
if code != 0: |
|
590 |
msg = ('{} `{}` returned {} ' |
|
591 |
'{}'.format(sentry_unit.info['unit_name'], |
|
592 |
cmd, code, output)) |
|
593 |
amulet.raise_status(amulet.FAIL, msg=msg) |
|
594 |
return json.loads(output) |
|
595 |
||
596 |
def get_ceph_pool_sample(self, sentry_unit, pool_id=0): |
|
597 |
"""Take a sample of attributes of a ceph pool, returning ceph
|
|
598 |
pool name, object count and disk space used for the specified
|
|
599 |
pool ID number.
|
|
600 |
||
601 |
:param sentry_unit: Pointer to amulet sentry instance (juju unit)
|
|
602 |
:param pool_id: Ceph pool ID
|
|
603 |
:returns: List of pool name, object count, kb disk space used
|
|
604 |
"""
|
|
605 |
df = self.get_ceph_df(sentry_unit) |
|
606 |
pool_name = df['pools'][pool_id]['name'] |
|
607 |
obj_count = df['pools'][pool_id]['stats']['objects'] |
|
608 |
kb_used = df['pools'][pool_id]['stats']['kb_used'] |
|
609 |
self.log.debug('Ceph {} pool (ID {}): {} objects, ' |
|
610 |
'{} kb used'.format(pool_name, pool_id, |
|
611 |
obj_count, kb_used)) |
|
612 |
return pool_name, obj_count, kb_used |
|
613 |
||
614 |
def validate_ceph_pool_samples(self, samples, sample_type="resource pool"): |
|
615 |
"""Validate ceph pool samples taken over time, such as pool
|
|
616 |
object counts or pool kb used, before adding, after adding, and
|
|
617 |
after deleting items which affect those pool attributes. The
|
|
618 |
2nd element is expected to be greater than the 1st; 3rd is expected
|
|
619 |
to be less than the 2nd.
|
|
620 |
||
621 |
:param samples: List containing 3 data samples
|
|
622 |
:param sample_type: String for logging and usage context
|
|
623 |
:returns: None if successful, Failure message otherwise
|
|
624 |
"""
|
|
625 |
original, created, deleted = range(3) |
|
626 |
if samples[created] <= samples[original] or \ |
|
627 |
samples[deleted] >= samples[created]: |
|
628 |
return ('Ceph {} samples ({}) ' |
|
629 |
'unexpected.'.format(sample_type, samples)) |
|
630 |
else: |
|
631 |
self.log.debug('Ceph {} samples (OK): ' |
|
632 |
'{}'.format(sample_type, samples)) |
|
633 |
return None |
|
634 |
||
635 |
# rabbitmq/amqp specific helpers:
|
|
636 |
||
637 |
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200): |
|
638 |
"""Wait for rmq units extended status to show cluster readiness,
|
|
639 |
after an optional initial sleep period. Initial sleep is likely
|
|
640 |
necessary to be effective following a config change, as status
|
|
641 |
message may not instantly update to non-ready."""
|
|
642 |
||
643 |
if init_sleep: |
|
644 |
time.sleep(init_sleep) |
|
645 |
||
646 |
message = re.compile('^Unit is ready and clustered$') |
|
647 |
deployment._auto_wait_for_status(message=message, |
|
648 |
timeout=timeout, |
|
649 |
include_only=['rabbitmq-server']) |
|
650 |
||
651 |
def add_rmq_test_user(self, sentry_units, |
|
652 |
username="testuser1", password="changeme"): |
|
653 |
"""Add a test user via the first rmq juju unit, check connection as
|
|
654 |
the new user against all sentry units.
|
|
655 |
||
656 |
:param sentry_units: list of sentry unit pointers
|
|
657 |
:param username: amqp user name, default to testuser1
|
|
658 |
:param password: amqp user password
|
|
659 |
:returns: None if successful. Raise on error.
|
|
660 |
"""
|
|
661 |
self.log.debug('Adding rmq user ({})...'.format(username)) |
|
662 |
||
663 |
# Check that user does not already exist
|
|
664 |
cmd_user_list = 'rabbitmqctl list_users' |
|
665 |
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list) |
|
666 |
if username in output: |
|
667 |
self.log.warning('User ({}) already exists, returning ' |
|
668 |
'gracefully.'.format(username)) |
|
669 |
return
|
|
670 |
||
671 |
perms = '".*" ".*" ".*"' |
|
672 |
cmds = ['rabbitmqctl add_user {} {}'.format(username, password), |
|
673 |
'rabbitmqctl set_permissions {} {}'.format(username, perms)] |
|
674 |
||
675 |
# Add user via first unit
|
|
676 |
for cmd in cmds: |
|
677 |
output, _ = self.run_cmd_unit(sentry_units[0], cmd) |
|
678 |
||
679 |
# Check connection against the other sentry_units
|
|
680 |
self.log.debug('Checking user connect against units...') |
|
681 |
for sentry_unit in sentry_units: |
|
682 |
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False, |
|
683 |
username=username, |
|
684 |
password=password) |
|
685 |
connection.close() |
|
686 |
||
687 |
def delete_rmq_test_user(self, sentry_units, username="testuser1"): |
|
688 |
"""Delete a rabbitmq user via the first rmq juju unit.
|
|
689 |
||
690 |
:param sentry_units: list of sentry unit pointers
|
|
691 |
:param username: amqp user name, default to testuser1
|
|
692 |
:param password: amqp user password
|
|
693 |
:returns: None if successful or no such user.
|
|
694 |
"""
|
|
695 |
self.log.debug('Deleting rmq user ({})...'.format(username)) |
|
696 |
||
697 |
# Check that the user exists
|
|
698 |
cmd_user_list = 'rabbitmqctl list_users' |
|
699 |
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list) |
|
700 |
||
701 |
if username not in output: |
|
702 |
self.log.warning('User ({}) does not exist, returning ' |
|
703 |
'gracefully.'.format(username)) |
|
704 |
return
|
|
705 |
||
706 |
# Delete the user
|
|
707 |
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username) |
|
708 |
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del) |
|
709 |
||
710 |
def get_rmq_cluster_status(self, sentry_unit): |
|
711 |
"""Execute rabbitmq cluster status command on a unit and return
|
|
712 |
the full output.
|
|
713 |
||
714 |
:param unit: sentry unit
|
|
715 |
:returns: String containing console output of cluster status command
|
|
716 |
"""
|
|
717 |
cmd = 'rabbitmqctl cluster_status' |
|
718 |
output, _ = self.run_cmd_unit(sentry_unit, cmd) |
|
719 |
self.log.debug('{} cluster_status:\n{}'.format( |
|
720 |
sentry_unit.info['unit_name'], output)) |
|
721 |
return str(output) |
|
722 |
||
723 |
def get_rmq_cluster_running_nodes(self, sentry_unit): |
|
724 |
"""Parse rabbitmqctl cluster_status output string, return list of
|
|
725 |
running rabbitmq cluster nodes.
|
|
726 |
||
727 |
:param unit: sentry unit
|
|
728 |
:returns: List containing node names of running nodes
|
|
729 |
"""
|
|
730 |
# NOTE(beisner): rabbitmqctl cluster_status output is not
|
|
731 |
# json-parsable, do string chop foo, then json.loads that.
|
|
732 |
str_stat = self.get_rmq_cluster_status(sentry_unit) |
|
733 |
if 'running_nodes' in str_stat: |
|
734 |
pos_start = str_stat.find("{running_nodes,") + 15 |
|
735 |
pos_end = str_stat.find("]},", pos_start) + 1 |
|
736 |
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"') |
|
737 |
run_nodes = json.loads(str_run_nodes) |
|
738 |
return run_nodes |
|
739 |
else: |
|
740 |
return [] |
|
741 |
||
742 |
def validate_rmq_cluster_running_nodes(self, sentry_units): |
|
743 |
"""Check that all rmq unit hostnames are represented in the
|
|
744 |
cluster_status output of all units.
|
|
745 |
||
746 |
:param host_names: dict of juju unit names to host names
|
|
747 |
:param units: list of sentry unit pointers (all rmq units)
|
|
748 |
:returns: None if successful, otherwise return error message
|
|
749 |
"""
|
|
750 |
host_names = self.get_unit_hostnames(sentry_units) |
|
751 |
errors = [] |
|
752 |
||
753 |
# Query every unit for cluster_status running nodes
|
|
754 |
for query_unit in sentry_units: |
|
755 |
query_unit_name = query_unit.info['unit_name'] |
|
756 |
running_nodes = self.get_rmq_cluster_running_nodes(query_unit) |
|
757 |
||
758 |
# Confirm that every unit is represented in the queried unit's
|
|
759 |
# cluster_status running nodes output.
|
|
760 |
for validate_unit in sentry_units: |
|
761 |
val_host_name = host_names[validate_unit.info['unit_name']] |
|
762 |
val_node_name = 'rabbit@{}'.format(val_host_name) |
|
763 |
||
764 |
if val_node_name not in running_nodes: |
|
765 |
errors.append('Cluster member check failed on {}: {} not ' |
|
766 |
'in {}\n'.format(query_unit_name, |
|
767 |
val_node_name, |
|
768 |
running_nodes)) |
|
769 |
if errors: |
|
770 |
return ''.join(errors) |
|
771 |
||
772 |
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None): |
|
773 |
"""Check a single juju rmq unit for ssl and port in the config file."""
|
|
774 |
host = sentry_unit.info['public-address'] |
|
775 |
unit_name = sentry_unit.info['unit_name'] |
|
776 |
||
777 |
conf_file = '/etc/rabbitmq/rabbitmq.config' |
|
778 |
conf_contents = str(self.file_contents_safe(sentry_unit, |
|
779 |
conf_file, max_wait=16)) |
|
780 |
# Checks
|
|
781 |
conf_ssl = 'ssl' in conf_contents |
|
782 |
conf_port = str(port) in conf_contents |
|
783 |
||
784 |
# Port explicitly checked in config
|
|
785 |
if port and conf_port and conf_ssl: |
|
786 |
self.log.debug('SSL is enabled @{}:{} ' |
|
787 |
'({})'.format(host, port, unit_name)) |
|
788 |
return True |
|
789 |
elif port and not conf_port and conf_ssl: |
|
790 |
self.log.debug('SSL is enabled @{} but not on port {} ' |
|
791 |
'({})'.format(host, port, unit_name)) |
|
792 |
return False |
|
793 |
# Port not checked (useful when checking that ssl is disabled)
|
|
794 |
elif not port and conf_ssl: |
|
795 |
self.log.debug('SSL is enabled @{}:{} ' |
|
796 |
'({})'.format(host, port, unit_name)) |
|
797 |
return True |
|
798 |
elif not conf_ssl: |
|
799 |
self.log.debug('SSL not enabled @{}:{} ' |
|
800 |
'({})'.format(host, port, unit_name)) |
|
801 |
return False |
|
802 |
else: |
|
803 |
msg = ('Unknown condition when checking SSL status @{}:{} ' |
|
804 |
'({})'.format(host, port, unit_name)) |
|
805 |
amulet.raise_status(amulet.FAIL, msg) |
|
806 |
||
807 |
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None): |
|
808 |
"""Check that ssl is enabled on rmq juju sentry units.
|
|
809 |
||
810 |
:param sentry_units: list of all rmq sentry units
|
|
811 |
:param port: optional ssl port override to validate
|
|
812 |
:returns: None if successful, otherwise return error message
|
|
813 |
"""
|
|
814 |
for sentry_unit in sentry_units: |
|
815 |
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port): |
|
816 |
return ('Unexpected condition: ssl is disabled on unit ' |
|
817 |
'({})'.format(sentry_unit.info['unit_name'])) |
|
818 |
return None |
|
819 |
||
820 |
def validate_rmq_ssl_disabled_units(self, sentry_units): |
|
821 |
"""Check that ssl is enabled on listed rmq juju sentry units.
|
|
822 |
||
823 |
:param sentry_units: list of all rmq sentry units
|
|
824 |
:returns: True if successful. Raise on error.
|
|
825 |
"""
|
|
826 |
for sentry_unit in sentry_units: |
|
827 |
if self.rmq_ssl_is_enabled_on_unit(sentry_unit): |
|
828 |
return ('Unexpected condition: ssl is enabled on unit ' |
|
829 |
'({})'.format(sentry_unit.info['unit_name'])) |
|
830 |
return None |
|
831 |
||
832 |
def configure_rmq_ssl_on(self, sentry_units, deployment, |
|
833 |
port=None, max_wait=60): |
|
834 |
"""Turn ssl charm config option on, with optional non-default
|
|
835 |
ssl port specification. Confirm that it is enabled on every
|
|
836 |
unit.
|
|
837 |
||
838 |
:param sentry_units: list of sentry units
|
|
839 |
:param deployment: amulet deployment object pointer
|
|
840 |
:param port: amqp port, use defaults if None
|
|
841 |
:param max_wait: maximum time to wait in seconds to confirm
|
|
842 |
:returns: None if successful. Raise on error.
|
|
843 |
"""
|
|
844 |
self.log.debug('Setting ssl charm config option: on') |
|
845 |
||
846 |
# Enable RMQ SSL
|
|
847 |
config = {'ssl': 'on'} |
|
848 |
if port: |
|
849 |
config['ssl_port'] = port |
|
850 |
||
851 |
deployment.d.configure('rabbitmq-server', config) |
|
852 |
||
853 |
# Wait for unit status
|
|
854 |
self.rmq_wait_for_cluster(deployment) |
|
855 |
||
856 |
# Confirm
|
|
857 |
tries = 0 |
|
858 |
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port) |
|
859 |
while ret and tries < (max_wait / 4): |
|
860 |
time.sleep(4) |
|
861 |
self.log.debug('Attempt {}: {}'.format(tries, ret)) |
|
862 |
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port) |
|
863 |
tries += 1 |
|
864 |
||
865 |
if ret: |
|
866 |
amulet.raise_status(amulet.FAIL, ret) |
|
867 |
||
868 |
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60): |
|
869 |
"""Turn ssl charm config option off, confirm that it is disabled
|
|
870 |
on every unit.
|
|
871 |
||
872 |
:param sentry_units: list of sentry units
|
|
873 |
:param deployment: amulet deployment object pointer
|
|
874 |
:param max_wait: maximum time to wait in seconds to confirm
|
|
875 |
:returns: None if successful. Raise on error.
|
|
876 |
"""
|
|
877 |
self.log.debug('Setting ssl charm config option: off') |
|
878 |
||
879 |
# Disable RMQ SSL
|
|
880 |
config = {'ssl': 'off'} |
|
881 |
deployment.d.configure('rabbitmq-server', config) |
|
882 |
||
883 |
# Wait for unit status
|
|
884 |
self.rmq_wait_for_cluster(deployment) |
|
885 |
||
886 |
# Confirm
|
|
887 |
tries = 0 |
|
888 |
ret = self.validate_rmq_ssl_disabled_units(sentry_units) |
|
889 |
while ret and tries < (max_wait / 4): |
|
890 |
time.sleep(4) |
|
891 |
self.log.debug('Attempt {}: {}'.format(tries, ret)) |
|
892 |
ret = self.validate_rmq_ssl_disabled_units(sentry_units) |
|
893 |
tries += 1 |
|
894 |
||
895 |
if ret: |
|
896 |
amulet.raise_status(amulet.FAIL, ret) |
|
897 |
||
898 |
def connect_amqp_by_unit(self, sentry_unit, ssl=False, |
|
899 |
port=None, fatal=True, |
|
900 |
username="testuser1", password="changeme"): |
|
901 |
"""Establish and return a pika amqp connection to the rabbitmq service
|
|
902 |
running on a rmq juju unit.
|
|
903 |
||
904 |
:param sentry_unit: sentry unit pointer
|
|
905 |
:param ssl: boolean, default to False
|
|
906 |
:param port: amqp port, use defaults if None
|
|
907 |
:param fatal: boolean, default to True (raises on connect error)
|
|
908 |
:param username: amqp user name, default to testuser1
|
|
909 |
:param password: amqp user password
|
|
910 |
:returns: pika amqp connection pointer or None if failed and non-fatal
|
|
911 |
"""
|
|
912 |
host = sentry_unit.info['public-address'] |
|
913 |
unit_name = sentry_unit.info['unit_name'] |
|
914 |
||
915 |
# Default port logic if port is not specified
|
|
916 |
if ssl and not port: |
|
917 |
port = 5671 |
|
918 |
elif not ssl and not port: |
|
919 |
port = 5672 |
|
920 |
||
921 |
self.log.debug('Connecting to amqp on {}:{} ({}) as ' |
|
922 |
'{}...'.format(host, port, unit_name, username)) |
|
923 |
||
924 |
try: |
|
925 |
credentials = pika.PlainCredentials(username, password) |
|
926 |
parameters = pika.ConnectionParameters(host=host, port=port, |
|
927 |
credentials=credentials, |
|
928 |
ssl=ssl, |
|
929 |
connection_attempts=3, |
|
930 |
retry_delay=5, |
|
931 |
socket_timeout=1) |
|
932 |
connection = pika.BlockingConnection(parameters) |
|
933 |
assert connection.server_properties['product'] == 'RabbitMQ' |
|
934 |
self.log.debug('Connect OK') |
|
935 |
return connection |
|
936 |
except Exception as e: |
|
937 |
msg = ('amqp connection failed to {}:{} as ' |
|
938 |
'{} ({})'.format(host, port, username, str(e))) |
|
939 |
if fatal: |
|
940 |
amulet.raise_status(amulet.FAIL, msg) |
|
941 |
else: |
|
942 |
self.log.warn(msg) |
|
943 |
return None |
|
944 |
||
945 |
def publish_amqp_message_by_unit(self, sentry_unit, message, |
|
946 |
queue="test", ssl=False, |
|
947 |
username="testuser1", |
|
948 |
password="changeme", |
|
949 |
port=None): |
|
950 |
"""Publish an amqp message to a rmq juju unit.
|
|
951 |
||
952 |
:param sentry_unit: sentry unit pointer
|
|
953 |
:param message: amqp message string
|
|
954 |
:param queue: message queue, default to test
|
|
955 |
:param username: amqp user name, default to testuser1
|
|
956 |
:param password: amqp user password
|
|
957 |
:param ssl: boolean, default to False
|
|
958 |
:param port: amqp port, use defaults if None
|
|
959 |
:returns: None. Raises exception if publish failed.
|
|
960 |
"""
|
|
961 |
self.log.debug('Publishing message to {} queue:\n{}'.format(queue, |
|
962 |
message)) |
|
963 |
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl, |
|
964 |
port=port, |
|
965 |
username=username, |
|
966 |
password=password) |
|
967 |
||
968 |
# NOTE(beisner): extra debug here re: pika hang potential:
|
|
969 |
# https://github.com/pika/pika/issues/297
|
|
970 |
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
|
|
971 |
self.log.debug('Defining channel...') |
|
972 |
channel = connection.channel() |
|
973 |
self.log.debug('Declaring queue...') |
|
974 |
channel.queue_declare(queue=queue, auto_delete=False, durable=True) |
|
975 |
self.log.debug('Publishing message...') |
|
976 |
channel.basic_publish(exchange='', routing_key=queue, body=message) |
|
977 |
self.log.debug('Closing channel...') |
|
978 |
channel.close() |
|
979 |
self.log.debug('Closing connection...') |
|
980 |
connection.close() |
|
981 |
||
982 |
def get_amqp_message_by_unit(self, sentry_unit, queue="test", |
|
983 |
username="testuser1", |
|
984 |
password="changeme", |
|
985 |
ssl=False, port=None): |
|
986 |
"""Get an amqp message from a rmq juju unit.
|
|
987 |
||
988 |
:param sentry_unit: sentry unit pointer
|
|
989 |
:param queue: message queue, default to test
|
|
990 |
:param username: amqp user name, default to testuser1
|
|
991 |
:param password: amqp user password
|
|
992 |
:param ssl: boolean, default to False
|
|
993 |
:param port: amqp port, use defaults if None
|
|
994 |
:returns: amqp message body as string. Raise if get fails.
|
|
995 |
"""
|
|
996 |
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl, |
|
997 |
port=port, |
|
998 |
username=username, |
|
999 |
password=password) |
|
1000 |
channel = connection.channel() |
|
1001 |
method_frame, _, body = channel.basic_get(queue) |
|
1002 |
||
1003 |
if method_frame: |
|
1004 |
self.log.debug('Retreived message from {} queue:\n{}'.format(queue, |
|
1005 |
body)) |
|
1006 |
channel.basic_ack(method_frame.delivery_tag) |
|
1007 |
channel.close() |
|
1008 |
connection.close() |
|
1009 |
return body |
|
1010 |
else: |
|
1011 |
msg = 'No message retrieved.' |
|
1012 |
amulet.raise_status(amulet.FAIL, msg) |