1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
18
# Copyright 2012 Canonical Ltd.
20
# This file is sourced from lp:openstack-charm-helpers
23
# James Page <james.page@ubuntu.com>
24
# Adam Gandelman <adamg@ubuntu.com>
33
from subprocess import (
38
from charmhelpers.core.hookenv import (
50
from charmhelpers.core.host import (
58
from charmhelpers.fetch import (
62
from charmhelpers.core.kernel import modprobe
64
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
65
KEYFILE = '/etc/ceph/ceph.client.{}.key'
67
CEPH_CONF = """[global]
68
auth supported = {auth}
70
mon host = {mon_hosts}
71
log to syslog = {use_syslog}
72
err to syslog = {use_syslog}
73
clog to syslog = {use_syslog}
78
"""Basic Ceph client installation."""
79
ceph_dir = "/etc/ceph"
80
if not os.path.exists(ceph_dir):
83
apt_install('ceph-common', fatal=True)
86
def rbd_exists(service, pool, rbd_img):
87
"""Check to see if a RADOS block device exists."""
89
out = check_output(['rbd', 'list', '--id',
90
service, '--pool', pool]).decode('UTF-8')
91
except CalledProcessError:
97
def create_rbd_image(service, pool, image, sizemb):
98
"""Create a new RADOS block device."""
99
cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
104
def pool_exists(service, name):
105
"""Check to see if a RADOS pool already exists."""
107
out = check_output(['rados', '--id', service,
108
'lspools']).decode('UTF-8')
109
except CalledProcessError:
115
def get_osds(service):
116
"""Return a list of all Ceph Object Storage Daemons currently in the
119
version = ceph_version()
120
if version and version >= '0.56':
121
return json.loads(check_output(['ceph', '--id', service,
123
'--format=json']).decode('UTF-8'))
128
def create_pool(service, name, replicas=3):
129
"""Create a new RADOS pool."""
130
if pool_exists(service, name):
131
log("Ceph pool {} already exists, skipping creation".format(name),
135
# Calculate the number of placement groups based
136
# on upstream recommended best practices.
137
osds = get_osds(service)
139
pgnum = (len(osds) * 100 // replicas)
141
# NOTE(james-page): Default to 200 for older ceph versions
142
# which don't support OSD query from cli
145
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
148
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
153
def delete_pool(service, name):
154
"""Delete a RADOS pool from ceph."""
155
cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
156
'--yes-i-really-really-mean-it']
160
def _keyfile_path(service):
161
return KEYFILE.format(service)
164
def _keyring_path(service):
165
return KEYRING.format(service)
168
def create_keyring(service, key):
169
"""Create a new Ceph keyring containing key."""
170
keyring = _keyring_path(service)
171
if os.path.exists(keyring):
172
log('Ceph keyring exists at %s.' % keyring, level=WARNING)
175
cmd = ['ceph-authtool', keyring, '--create-keyring',
176
'--name=client.{}'.format(service), '--add-key={}'.format(key)]
178
log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
181
def delete_keyring(service):
182
"""Delete an existing Ceph keyring."""
183
keyring = _keyring_path(service)
184
if not os.path.exists(keyring):
185
log('Keyring does not exist at %s' % keyring, level=WARNING)
189
log('Deleted ring at %s.' % keyring, level=INFO)
192
def create_key_file(service, key):
193
"""Create a file containing key."""
194
keyfile = _keyfile_path(service)
195
if os.path.exists(keyfile):
196
log('Keyfile exists at %s.' % keyfile, level=WARNING)
199
with open(keyfile, 'w') as fd:
202
log('Created new keyfile at %s.' % keyfile, level=INFO)
205
def get_ceph_nodes():
206
"""Query named relation 'ceph' to determine current nodes."""
208
for r_id in relation_ids('ceph'):
209
for unit in related_units(r_id):
210
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
215
def configure(service, key, auth, use_syslog):
216
"""Perform basic configuration of Ceph."""
217
create_keyring(service, key)
218
create_key_file(service, key)
219
hosts = get_ceph_nodes()
220
with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
221
ceph_conf.write(CEPH_CONF.format(auth=auth,
222
keyring=_keyring_path(service),
223
mon_hosts=",".join(map(str, hosts)),
224
use_syslog=use_syslog))
228
def image_mapped(name):
229
"""Determine whether a RADOS block device is mapped locally."""
231
out = check_output(['rbd', 'showmapped']).decode('UTF-8')
232
except CalledProcessError:
238
def map_block_storage(service, pool, image):
239
"""Map a RADOS block device for local use."""
243
'{}/{}'.format(pool, image),
247
_keyfile_path(service),
252
def filesystem_mounted(fs):
253
"""Determine whether a filesytems is already mounted."""
254
return fs in [f for f, m in mounts()]
257
def make_filesystem(blk_device, fstype='ext4', timeout=10):
258
"""Make a new filesystem on the specified block device."""
260
e_noent = os.errno.ENOENT
261
while not os.path.exists(blk_device):
263
log('Gave up waiting on block device %s' % blk_device,
265
raise IOError(e_noent, os.strerror(e_noent), blk_device)
267
log('Waiting for block device %s to appear' % blk_device,
272
log('Formatting block device %s as filesystem %s.' %
273
(blk_device, fstype), level=INFO)
274
check_call(['mkfs', '-t', fstype, blk_device])
277
def place_data_on_block_device(blk_device, data_src_dst):
278
"""Migrate data in data_src_dst to blk_device and then remount."""
279
# mount block device into /mnt
280
mount(blk_device, '/mnt')
282
copy_files(data_src_dst, '/mnt')
283
# umount block device
285
# Grab user/group ID's from original source
286
_dir = os.stat(data_src_dst)
289
# re-mount where the data should originally be
290
# TODO: persist is currently a NO-OP in core.host
291
mount(blk_device, data_src_dst, persist=True)
292
# ensure original ownership of new mount.
293
os.chown(data_src_dst, uid, gid)
296
def copy_files(src, dst, symlinks=False, ignore=None):
297
"""Copy files from src to dst."""
298
for item in os.listdir(src):
299
s = os.path.join(src, item)
300
d = os.path.join(dst, item)
302
shutil.copytree(s, d, symlinks, ignore)
307
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
308
blk_device, fstype, system_services=[],
310
"""NOTE: This function must only be called from a single service unit for
311
the same rbd_img otherwise data loss will occur.
313
Ensures given pool and RBD image exists, is mapped to a block device,
314
and the device is formatted and mounted at the given mount_point.
316
If formatting a device for the first time, data existing at mount_point
317
will be migrated to the RBD device before being re-mounted.
319
All services listed in system_services will be stopped prior to data
320
migration and restarted when complete.
322
# Ensure pool, RBD image, RBD mappings are in place.
323
if not pool_exists(service, pool):
324
log('Creating new pool {}.'.format(pool), level=INFO)
325
create_pool(service, pool, replicas=replicas)
327
if not rbd_exists(service, pool, rbd_img):
328
log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
329
create_rbd_image(service, pool, rbd_img, sizemb)
331
if not image_mapped(rbd_img):
332
log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
334
map_block_storage(service, pool, rbd_img)
337
# TODO: What happens if for whatever reason this is run again and
338
# the data is already in the rbd device and/or is mounted??
339
# When it is mounted already, it will fail to make the fs
340
# XXX: This is really sketchy! Need to at least add an fstab entry
341
# otherwise this hook will blow away existing data if its executed
343
if not filesystem_mounted(mount_point):
344
make_filesystem(blk_device, fstype)
346
for svc in system_services:
347
if service_running(svc):
348
log('Stopping services {} prior to migrating data.'
349
.format(svc), level=DEBUG)
352
place_data_on_block_device(blk_device, mount_point)
354
for svc in system_services:
355
log('Starting service {} after migrating data.'
356
.format(svc), level=DEBUG)
360
def ensure_ceph_keyring(service, user=None, group=None):
361
"""Ensures a ceph keyring is created for a named service and optionally
362
ensures user and group ownership.
364
Returns False if no ceph key is available in relation state.
367
for rid in relation_ids('ceph'):
368
for unit in related_units(rid):
369
key = relation_get('key', rid=rid, unit=unit)
376
create_keyring(service=service, key=key)
377
keyring = _keyring_path(service)
379
check_call(['chown', '%s.%s' % (user, group), keyring])
385
"""Retrieve the local version of ceph."""
386
if os.path.exists('/usr/bin/ceph'):
388
output = check_output(cmd).decode('US-ASCII')
389
output = output.split()
398
class CephBrokerRq(object):
399
"""Ceph broker request.
401
Multiple operations can be added to a request and sent to the Ceph broker
404
Request is json-encoded for sending over the wire.
406
The API is versioned and defaults to version 1.
408
def __init__(self, api_version=1, request_id=None):
409
self.api_version = api_version
411
self.request_id = request_id
413
self.request_id = str(uuid.uuid1())
416
def add_op_create_pool(self, name, replica_count=3):
417
self.ops.append({'op': 'create-pool', 'name': name,
418
'replicas': replica_count})
420
def set_ops(self, ops):
421
"""Set request ops to provided value.
423
Useful for injecting ops that come from a previous request
424
to allow comparisons to ensure validity.
430
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
431
'request-id': self.request_id})
433
def _ops_equal(self, other):
434
if len(self.ops) == len(other.ops):
435
for req_no in range(0, len(self.ops)):
436
for key in ['replicas', 'name', 'op']:
437
if self.ops[req_no][key] != other.ops[req_no][key]:
443
def __eq__(self, other):
444
if not isinstance(other, self.__class__):
446
if self.api_version == other.api_version and \
447
self._ops_equal(other):
452
def __ne__(self, other):
453
return not self.__eq__(other)
456
class CephBrokerRsp(object):
457
"""Ceph broker response.
459
Response is json-decoded and contents provided as methods/properties.
461
The API is versioned and defaults to version 1.
464
def __init__(self, encoded_rsp):
465
self.api_version = None
466
self.rsp = json.loads(encoded_rsp)
469
def request_id(self):
470
return self.rsp.get('request-id')
474
return self.rsp.get('exit-code')
478
return self.rsp.get('stderr')
481
# Ceph Broker Conversation:
482
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
483
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
484
# unique id so that the client can identity which CephBrokerRsp is associated
485
# with the request. Ceph will also respond to each client unit individually
486
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
487
# via key broker-rsp-glance-0
489
# To use this the charm can just do something like:
491
# from charmhelpers.contrib.storage.linux.ceph import (
492
# send_request_if_needed,
493
# is_request_complete,
497
# @hooks.hook('ceph-relation-changed')
498
# def ceph_changed():
499
# rq = CephBrokerRq()
500
# rq.add_op_create_pool(name='poolname', replica_count=3)
502
# if is_request_complete(rq):
503
# <Request complete actions>
505
# send_request_if_needed(get_ceph_request())
507
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
508
# of glance having sent a request to ceph which ceph has successfully processed
512
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
513
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
514
# 'ceph-public-address': '10.5.44.103',
515
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
516
# 'private-address': '10.5.44.103',
519
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
520
# '"ops": [{"replicas": 3, "name": "glance", '
521
# '"op": "create-pool"}]}'),
522
# 'private-address': '10.5.44.109',
526
def get_previous_request(rid):
527
"""Return the last ceph broker request sent on a given relation
529
@param rid: Relation id to query for request
532
broker_req = relation_get(attribute='broker_req', rid=rid,
535
request_data = json.loads(broker_req)
536
request = CephBrokerRq(api_version=request_data['api-version'],
537
request_id=request_data['request-id'])
538
request.set_ops(request_data['ops'])
543
def get_request_states(request):
544
"""Return a dict of requests per relation id with their corresponding
547
This allows a charm, which has a request for ceph, to see whether there is
548
an equivalent request already being processed and if so what state that
551
@param request: A CephBrokerRq object
555
for rid in relation_ids('ceph'):
557
previous_request = get_previous_request(rid)
558
if request == previous_request:
560
complete = is_request_complete_for_rid(previous_request, rid)
567
'complete': complete,
573
def is_request_sent(request):
574
"""Check to see if a functionally equivalent request has already been sent
576
Returns True if a similair request has been sent
578
@param request: A CephBrokerRq object
580
states = get_request_states(request)
581
for rid in states.keys():
582
if not states[rid]['sent']:
588
def is_request_complete(request):
589
"""Check to see if a functionally equivalent request has already been
592
Returns True if a similair request has been completed
594
@param request: A CephBrokerRq object
596
states = get_request_states(request)
597
for rid in states.keys():
598
if not states[rid]['complete']:
604
def is_request_complete_for_rid(request, rid):
605
"""Check if a given request has been completed on the given relation
607
@param request: A CephBrokerRq object
608
@param rid: Relation ID
610
broker_key = get_broker_rsp_key()
611
for unit in related_units(rid):
612
rdata = relation_get(rid=rid, unit=unit)
613
if rdata.get(broker_key):
614
rsp = CephBrokerRsp(rdata.get(broker_key))
615
if rsp.request_id == request.request_id:
616
if not rsp.exit_code:
619
# The remote unit sent no reply targeted at this unit so either the
620
# remote ceph cluster does not support unit targeted replies or it
621
# has not processed our request yet.
622
if rdata.get('broker_rsp'):
623
request_data = json.loads(rdata['broker_rsp'])
624
if request_data.get('request-id'):
625
log('Ignoring legacy broker_rsp without unit key as remote '
626
'service supports unit specific replies', level=DEBUG)
628
log('Using legacy broker_rsp as remote service does not '
629
'supports unit specific replies', level=DEBUG)
630
rsp = CephBrokerRsp(rdata['broker_rsp'])
631
if not rsp.exit_code:
637
def get_broker_rsp_key():
638
"""Return broker response key for this unit
640
This is the key that ceph is going to use to pass request status
641
information back to this unit
643
return 'broker-rsp-' + local_unit().replace('/', '-')
646
def send_request_if_needed(request):
647
"""Send broker request if an equivalent request has not already been sent
649
@param request: A CephBrokerRq object
651
if is_request_sent(request):
652
log('Request already sent but not complete, not sending new request',
655
for rid in relation_ids('ceph'):
656
log('Sending request {}'.format(request.request_id), level=DEBUG)
657
relation_set(relation_id=rid, broker_req=request.request)