1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
18
# Copyright 2012 Canonical Ltd.
20
# This file is sourced from lp:openstack-charm-helpers
23
# James Page <james.page@ubuntu.com>
24
# Adam Gandelman <adamg@ubuntu.com>
37
from subprocess import (
42
from charmhelpers.core.hookenv import (
54
from charmhelpers.core.host import (
62
from charmhelpers.fetch import (
66
from charmhelpers.core.kernel import modprobe
68
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
69
KEYFILE = '/etc/ceph/ceph.client.{}.key'
71
CEPH_CONF = """[global]
72
auth supported = {auth}
74
mon host = {mon_hosts}
75
log to syslog = {use_syslog}
76
err to syslog = {use_syslog}
77
clog to syslog = {use_syslog}
79
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
80
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
83
def validator(value, valid_type, valid_range=None):
85
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
90
This says I'm testing value=1. It must be an int inclusive in [0,2]
92
:param value: The value to validate
93
:param valid_type: The type that value should be.
94
:param valid_range: A range of values that value can assume.
97
assert isinstance(value, valid_type), "{} is not a {}".format(
100
if valid_range is not None:
101
assert isinstance(valid_range, list), \
102
"valid_range must be a list, was given {}".format(valid_range)
103
# If we're dealing with strings
104
if valid_type is six.string_types:
105
assert value in valid_range, \
106
"{} is not in the list {}".format(value, valid_range)
107
# Integer, float should have a min and max
109
if len(valid_range) != 2:
111
"Invalid valid_range list of {} for {}. "
112
"List must be [min,max]".format(valid_range, value))
113
assert value >= valid_range[0], \
114
"{} is less than minimum allowed value of {}".format(
115
value, valid_range[0])
116
assert value <= valid_range[1], \
117
"{} is greater than maximum allowed value of {}".format(
118
value, valid_range[1])
121
class PoolCreationError(Exception):
123
A custom error to inform the caller that a pool creation failed. Provides an error message
126
def __init__(self, message):
127
super(PoolCreationError, self).__init__(message)
132
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
133
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
136
def __init__(self, service, name):
137
self.service = service
140
# Create the pool if it doesn't exist already
141
# To be implemented by subclasses
145
def add_cache_tier(self, cache_pool, mode):
147
Adds a new cache tier to an existing pool.
148
:param cache_pool: six.string_types. The cache tier pool name to add.
149
:param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
152
# Check the input types and values
153
validator(value=cache_pool, valid_type=six.string_types)
154
validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
156
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
157
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
158
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
159
check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
161
def remove_cache_tier(self, cache_pool):
163
Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
164
:param cache_pool: six.string_types. The cache tier pool name to remove.
167
# read-only is easy, writeback is much harder
168
mode = get_cache_mode(self.service, cache_pool)
169
if mode == 'readonly':
170
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
171
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
173
elif mode == 'writeback':
174
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
175
# Flush the cache and wait for it to return
176
check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
177
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
178
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
180
def get_pgs(self, pool_size):
182
:param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
184
:return: int. The number of pgs to use.
186
validator(value=pool_size, valid_type=int)
187
osd_list = get_osds(self.service)
189
# NOTE(james-page): Default to 200 for older ceph versions
190
# which don't support OSD query from cli
193
osd_list_length = len(osd_list)
194
# Calculate based on Ceph best practices
195
if osd_list_length < 5:
197
elif 5 < osd_list_length < 10:
199
elif 10 < osd_list_length < 50:
202
estimate = (osd_list_length * 100) / pool_size
203
# Return the next nearest power of 2
204
index = bisect.bisect_right(powers_of_two, estimate)
205
return powers_of_two[index]
208
class ReplicatedPool(Pool):
209
def __init__(self, service, name, pg_num=None, replicas=2):
210
super(ReplicatedPool, self).__init__(service=service, name=name)
211
self.replicas = replicas
213
self.pg_num = self.get_pgs(self.replicas)
218
if not pool_exists(self.service, self.name):
220
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
221
self.name, str(self.pg_num)]
224
except CalledProcessError:
228
# Default jerasure erasure coded pool
229
class ErasurePool(Pool):
230
def __init__(self, service, name, erasure_code_profile="default"):
231
super(ErasurePool, self).__init__(service=service, name=name)
232
self.erasure_code_profile = erasure_code_profile
235
if not pool_exists(self.service, self.name):
236
# Try to find the erasure profile information so we can properly size the pgs
237
erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
240
if erasure_profile is None:
241
log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
243
raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
244
if 'k' not in erasure_profile or 'm' not in erasure_profile:
246
log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
248
raise PoolCreationError(
249
message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
251
pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
253
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
254
'erasure', self.erasure_code_profile]
257
except CalledProcessError:
260
"""Get an existing erasure code profile if it already exists.
261
Returns json formatted output"""
264
def get_mon_map(service):
266
Returns the current monitor map.
267
:param service: six.string_types. The Ceph user name to run the command under
268
:return: json string. :raise: ValueError if the monmap fails to parse.
269
Also raises CalledProcessError if our ceph command fails
272
mon_status = check_output(
273
['ceph', '--id', service,
274
'mon_status', '--format=json'])
276
return json.loads(mon_status)
277
except ValueError as v:
278
log("Unable to parse mon_status json: {}. Error: {}".format(
279
mon_status, v.message))
281
except CalledProcessError as e:
282
log("mon_status command failed with message: {}".format(
287
def hash_monitor_names(service):
289
Uses the get_mon_map() function to get information about the monitor
291
Hash the name of each monitor. Return a sorted list of monitor hashes
292
in an ascending order.
293
:param service: six.string_types. The Ceph user name to run the command under
294
:rtype : dict. json dict of monitor name, ip address and rank
296
'name': 'ip-172-31-13-165',
298
'addr': '172.31.13.165:6789/0'}
302
monitor_list = get_mon_map(service=service)
303
if monitor_list['monmap']['mons']:
304
for mon in monitor_list['monmap']['mons']:
306
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
307
return sorted(hash_list)
310
except (ValueError, CalledProcessError):
314
def monitor_key_delete(service, key):
316
Delete a key and value pair from the monitor cluster
317
:param service: six.string_types. The Ceph user name to run the command under
318
Deletes a key value pair on the monitor cluster.
319
:param key: six.string_types. The key to delete.
323
['ceph', '--id', service,
324
'config-key', 'del', str(key)])
325
except CalledProcessError as e:
326
log("Monitor config-key put failed with message: {}".format(
331
def monitor_key_set(service, key, value):
333
Sets a key value pair on the monitor cluster.
334
:param service: six.string_types. The Ceph user name to run the command under
335
:param key: six.string_types. The key to set.
336
:param value: The value to set. This will be converted to a string
341
['ceph', '--id', service,
342
'config-key', 'put', str(key), str(value)])
343
except CalledProcessError as e:
344
log("Monitor config-key put failed with message: {}".format(
349
def monitor_key_get(service, key):
351
Gets the value of an existing key in the monitor cluster.
352
:param service: six.string_types. The Ceph user name to run the command under
353
:param key: six.string_types. The key to search for.
354
:return: Returns the value of that key or None if not found.
357
output = check_output(
358
['ceph', '--id', service,
359
'config-key', 'get', str(key)])
361
except CalledProcessError as e:
362
log("Monitor config-key get failed with message: {}".format(
367
def monitor_key_exists(service, key):
369
Searches for the existence of a key in the monitor cluster.
370
:param service: six.string_types. The Ceph user name to run the command under
371
:param key: six.string_types. The key to search for
372
:return: Returns True if the key exists, False if not and raises an
373
exception if an unknown error occurs. :raise: CalledProcessError if
374
an unknown error occurs
378
['ceph', '--id', service,
379
'config-key', 'exists', str(key)])
380
# I can return true here regardless because Ceph returns
381
# ENOENT if the key wasn't found
383
except CalledProcessError as e:
384
if e.returncode == errno.ENOENT:
387
log("Unknown error from ceph config-get exists: {} {}".format(
388
e.returncode, e.output))
392
def get_erasure_profile(service, name):
394
:param service: six.string_types. The Ceph user name to run the command under
399
out = check_output(['ceph', '--id', service,
400
'osd', 'erasure-code-profile', 'get',
401
name, '--format=json'])
402
return json.loads(out)
403
except (CalledProcessError, OSError, ValueError):
407
def pool_set(service, pool_name, key, value):
409
Sets a value for a RADOS pool in ceph.
410
:param service: six.string_types. The Ceph user name to run the command under
411
:param pool_name: six.string_types
412
:param key: six.string_types
414
:return: None. Can raise CalledProcessError
416
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
419
except CalledProcessError:
423
def snapshot_pool(service, pool_name, snapshot_name):
425
Snapshots a RADOS pool in ceph.
426
:param service: six.string_types. The Ceph user name to run the command under
427
:param pool_name: six.string_types
428
:param snapshot_name: six.string_types
429
:return: None. Can raise CalledProcessError
431
cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
434
except CalledProcessError:
438
def remove_pool_snapshot(service, pool_name, snapshot_name):
440
Remove a snapshot from a RADOS pool in ceph.
441
:param service: six.string_types. The Ceph user name to run the command under
442
:param pool_name: six.string_types
443
:param snapshot_name: six.string_types
444
:return: None. Can raise CalledProcessError
446
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
449
except CalledProcessError:
453
# max_bytes should be an int or long
454
def set_pool_quota(service, pool_name, max_bytes):
456
:param service: six.string_types. The Ceph user name to run the command under
457
:param pool_name: six.string_types
458
:param max_bytes: int or long
459
:return: None. Can raise CalledProcessError
461
# Set a byte quota on a RADOS pool in ceph.
462
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
463
'max_bytes', str(max_bytes)]
466
except CalledProcessError:
470
def remove_pool_quota(service, pool_name):
472
Set a byte quota on a RADOS pool in ceph.
473
:param service: six.string_types. The Ceph user name to run the command under
474
:param pool_name: six.string_types
475
:return: None. Can raise CalledProcessError
477
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
480
except CalledProcessError:
484
def remove_erasure_profile(service, profile_name):
486
Create a new erasure code profile if one does not already exist for it. Updates
487
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
489
:param service: six.string_types. The Ceph user name to run the command under
490
:param profile_name: six.string_types
491
:return: None. Can raise CalledProcessError
493
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
497
except CalledProcessError:
501
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
502
failure_domain='host',
503
data_chunks=2, coding_chunks=1,
504
locality=None, durability_estimator=None):
506
Create a new erasure code profile if one does not already exist for it. Updates
507
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
509
:param service: six.string_types. The Ceph user name to run the command under
510
:param profile_name: six.string_types
511
:param erasure_plugin_name: six.string_types
512
:param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
513
'room', 'root', 'row'])
514
:param data_chunks: int
515
:param coding_chunks: int
517
:param durability_estimator: int
518
:return: None. Can raise CalledProcessError
520
# Ensure this failure_domain is allowed by Ceph
521
validator(failure_domain, six.string_types,
522
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
524
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
525
'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
526
'ruleset_failure_domain=' + failure_domain]
527
if locality is not None and durability_estimator is not None:
528
raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
530
# Add plugin specific information
531
if locality is not None:
532
# For local erasure codes
533
cmd.append('l=' + str(locality))
534
if durability_estimator is not None:
535
# For Shec erasure codes
536
cmd.append('c=' + str(durability_estimator))
538
if erasure_profile_exists(service, profile_name):
539
cmd.append('--force')
543
except CalledProcessError:
547
def rename_pool(service, old_name, new_name):
549
Rename a Ceph pool from old_name to new_name
550
:param service: six.string_types. The Ceph user name to run the command under
551
:param old_name: six.string_types
552
:param new_name: six.string_types
555
validator(value=old_name, valid_type=six.string_types)
556
validator(value=new_name, valid_type=six.string_types)
558
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
562
def erasure_profile_exists(service, name):
564
Check to see if an Erasure code profile already exists.
565
:param service: six.string_types. The Ceph user name to run the command under
566
:param name: six.string_types
569
validator(value=name, valid_type=six.string_types)
571
check_call(['ceph', '--id', service,
572
'osd', 'erasure-code-profile', 'get',
575
except CalledProcessError:
579
def get_cache_mode(service, pool_name):
581
Find the current caching mode of the pool_name given.
582
:param service: six.string_types. The Ceph user name to run the command under
583
:param pool_name: six.string_types
586
validator(value=service, valid_type=six.string_types)
587
validator(value=pool_name, valid_type=six.string_types)
588
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
590
osd_json = json.loads(out)
591
for pool in osd_json['pools']:
592
if pool['pool_name'] == pool_name:
593
return pool['cache_mode']
599
def pool_exists(service, name):
600
"""Check to see if a RADOS pool already exists."""
602
out = check_output(['rados', '--id', service,
603
'lspools']).decode('UTF-8')
604
except CalledProcessError:
610
def get_osds(service):
611
"""Return a list of all Ceph Object Storage Daemons currently in the
614
version = ceph_version()
615
if version and version >= '0.56':
616
return json.loads(check_output(['ceph', '--id', service,
618
'--format=json']).decode('UTF-8'))
624
"""Basic Ceph client installation."""
625
ceph_dir = "/etc/ceph"
626
if not os.path.exists(ceph_dir):
629
apt_install('ceph-common', fatal=True)
632
def rbd_exists(service, pool, rbd_img):
633
"""Check to see if a RADOS block device exists."""
635
out = check_output(['rbd', 'list', '--id',
636
service, '--pool', pool]).decode('UTF-8')
637
except CalledProcessError:
640
return rbd_img in out
643
def create_rbd_image(service, pool, image, sizemb):
644
"""Create a new RADOS block device."""
645
cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
650
def update_pool(client, pool, settings):
651
cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
652
for k, v in six.iteritems(settings):
659
def create_pool(service, name, replicas=3, pg_num=None):
660
"""Create a new RADOS pool."""
661
if pool_exists(service, name):
662
log("Ceph pool {} already exists, skipping creation".format(name),
667
# Calculate the number of placement groups based
668
# on upstream recommended best practices.
669
osds = get_osds(service)
671
pg_num = (len(osds) * 100 // replicas)
673
# NOTE(james-page): Default to 200 for older ceph versions
674
# which don't support OSD query from cli
677
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
680
update_pool(service, name, settings={'size': str(replicas)})
683
def delete_pool(service, name):
684
"""Delete a RADOS pool from ceph."""
685
cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
686
'--yes-i-really-really-mean-it']
690
def _keyfile_path(service):
691
return KEYFILE.format(service)
694
def _keyring_path(service):
695
return KEYRING.format(service)
698
def create_keyring(service, key):
699
"""Create a new Ceph keyring containing key."""
700
keyring = _keyring_path(service)
701
if os.path.exists(keyring):
702
log('Ceph keyring exists at %s.' % keyring, level=WARNING)
705
cmd = ['ceph-authtool', keyring, '--create-keyring',
706
'--name=client.{}'.format(service), '--add-key={}'.format(key)]
708
log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
711
def delete_keyring(service):
712
"""Delete an existing Ceph keyring."""
713
keyring = _keyring_path(service)
714
if not os.path.exists(keyring):
715
log('Keyring does not exist at %s' % keyring, level=WARNING)
719
log('Deleted ring at %s.' % keyring, level=INFO)
722
def create_key_file(service, key):
723
"""Create a file containing key."""
724
keyfile = _keyfile_path(service)
725
if os.path.exists(keyfile):
726
log('Keyfile exists at %s.' % keyfile, level=WARNING)
729
with open(keyfile, 'w') as fd:
732
log('Created new keyfile at %s.' % keyfile, level=INFO)
735
def get_ceph_nodes(relation='ceph'):
736
"""Query named relation to determine current nodes."""
738
for r_id in relation_ids(relation):
739
for unit in related_units(r_id):
740
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
745
def configure(service, key, auth, use_syslog):
746
"""Perform basic configuration of Ceph."""
747
create_keyring(service, key)
748
create_key_file(service, key)
749
hosts = get_ceph_nodes()
750
with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
751
ceph_conf.write(CEPH_CONF.format(auth=auth,
752
keyring=_keyring_path(service),
753
mon_hosts=",".join(map(str, hosts)),
754
use_syslog=use_syslog))
758
def image_mapped(name):
759
"""Determine whether a RADOS block device is mapped locally."""
761
out = check_output(['rbd', 'showmapped']).decode('UTF-8')
762
except CalledProcessError:
768
def map_block_storage(service, pool, image):
769
"""Map a RADOS block device for local use."""
773
'{}/{}'.format(pool, image),
777
_keyfile_path(service),
782
def filesystem_mounted(fs):
783
"""Determine whether a filesytems is already mounted."""
784
return fs in [f for f, m in mounts()]
787
def make_filesystem(blk_device, fstype='ext4', timeout=10):
788
"""Make a new filesystem on the specified block device."""
790
e_noent = os.errno.ENOENT
791
while not os.path.exists(blk_device):
793
log('Gave up waiting on block device %s' % blk_device,
795
raise IOError(e_noent, os.strerror(e_noent), blk_device)
797
log('Waiting for block device %s to appear' % blk_device,
802
log('Formatting block device %s as filesystem %s.' %
803
(blk_device, fstype), level=INFO)
804
check_call(['mkfs', '-t', fstype, blk_device])
807
def place_data_on_block_device(blk_device, data_src_dst):
808
"""Migrate data in data_src_dst to blk_device and then remount."""
809
# mount block device into /mnt
810
mount(blk_device, '/mnt')
812
copy_files(data_src_dst, '/mnt')
813
# umount block device
815
# Grab user/group ID's from original source
816
_dir = os.stat(data_src_dst)
819
# re-mount where the data should originally be
820
# TODO: persist is currently a NO-OP in core.host
821
mount(blk_device, data_src_dst, persist=True)
822
# ensure original ownership of new mount.
823
os.chown(data_src_dst, uid, gid)
826
def copy_files(src, dst, symlinks=False, ignore=None):
827
"""Copy files from src to dst."""
828
for item in os.listdir(src):
829
s = os.path.join(src, item)
830
d = os.path.join(dst, item)
832
shutil.copytree(s, d, symlinks, ignore)
837
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
838
blk_device, fstype, system_services=[],
840
"""NOTE: This function must only be called from a single service unit for
841
the same rbd_img otherwise data loss will occur.
843
Ensures given pool and RBD image exists, is mapped to a block device,
844
and the device is formatted and mounted at the given mount_point.
846
If formatting a device for the first time, data existing at mount_point
847
will be migrated to the RBD device before being re-mounted.
849
All services listed in system_services will be stopped prior to data
850
migration and restarted when complete.
852
# Ensure pool, RBD image, RBD mappings are in place.
853
if not pool_exists(service, pool):
854
log('Creating new pool {}.'.format(pool), level=INFO)
855
create_pool(service, pool, replicas=replicas)
857
if not rbd_exists(service, pool, rbd_img):
858
log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
859
create_rbd_image(service, pool, rbd_img, sizemb)
861
if not image_mapped(rbd_img):
862
log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
864
map_block_storage(service, pool, rbd_img)
867
# TODO: What happens if for whatever reason this is run again and
868
# the data is already in the rbd device and/or is mounted??
869
# When it is mounted already, it will fail to make the fs
870
# XXX: This is really sketchy! Need to at least add an fstab entry
871
# otherwise this hook will blow away existing data if its executed
873
if not filesystem_mounted(mount_point):
874
make_filesystem(blk_device, fstype)
876
for svc in system_services:
877
if service_running(svc):
878
log('Stopping services {} prior to migrating data.'
879
.format(svc), level=DEBUG)
882
place_data_on_block_device(blk_device, mount_point)
884
for svc in system_services:
885
log('Starting service {} after migrating data.'
886
.format(svc), level=DEBUG)
890
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
891
"""Ensures a ceph keyring is created for a named service and optionally
892
ensures user and group ownership.
894
Returns False if no ceph key is available in relation state.
897
for rid in relation_ids(relation):
898
for unit in related_units(rid):
899
key = relation_get('key', rid=rid, unit=unit)
906
create_keyring(service=service, key=key)
907
keyring = _keyring_path(service)
909
check_call(['chown', '%s.%s' % (user, group), keyring])
915
"""Retrieve the local version of ceph."""
916
if os.path.exists('/usr/bin/ceph'):
918
output = check_output(cmd).decode('US-ASCII')
919
output = output.split()
928
class CephBrokerRq(object):
929
"""Ceph broker request.
931
Multiple operations can be added to a request and sent to the Ceph broker
934
Request is json-encoded for sending over the wire.
936
The API is versioned and defaults to version 1.
939
def __init__(self, api_version=1, request_id=None):
940
self.api_version = api_version
942
self.request_id = request_id
944
self.request_id = str(uuid.uuid1())
947
def add_op_create_pool(self, name, replica_count=3, pg_num=None):
948
"""Adds an operation to create a pool.
950
@param pg_num setting: optional setting. If not provided, this value
951
will be calculated by the broker based on how many OSDs are in the
952
cluster at the time of creation. Note that, if provided, this value
953
will be capped at the current available maximum.
955
self.ops.append({'op': 'create-pool', 'name': name,
956
'replicas': replica_count, 'pg_num': pg_num})
958
def set_ops(self, ops):
959
"""Set request ops to provided value.
961
Useful for injecting ops that come from a previous request
962
to allow comparisons to ensure validity.
968
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
969
'request-id': self.request_id})
971
def _ops_equal(self, other):
972
if len(self.ops) == len(other.ops):
973
for req_no in range(0, len(self.ops)):
974
for key in ['replicas', 'name', 'op', 'pg_num']:
975
if self.ops[req_no].get(key) != other.ops[req_no].get(key):
981
def __eq__(self, other):
982
if not isinstance(other, self.__class__):
984
if self.api_version == other.api_version and \
985
self._ops_equal(other):
990
def __ne__(self, other):
991
return not self.__eq__(other)
994
class CephBrokerRsp(object):
995
"""Ceph broker response.
997
Response is json-decoded and contents provided as methods/properties.
999
The API is versioned and defaults to version 1.
1002
def __init__(self, encoded_rsp):
1003
self.api_version = None
1004
self.rsp = json.loads(encoded_rsp)
1007
def request_id(self):
1008
return self.rsp.get('request-id')
1011
def exit_code(self):
1012
return self.rsp.get('exit-code')
1016
return self.rsp.get('stderr')
1019
# Ceph Broker Conversation:
1020
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1021
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1022
# unique id so that the client can identity which CephBrokerRsp is associated
1023
# with the request. Ceph will also respond to each client unit individually
1024
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1025
# via key broker-rsp-glance-0
1027
# To use this the charm can just do something like:
1029
# from charmhelpers.contrib.storage.linux.ceph import (
1030
# send_request_if_needed,
1031
# is_request_complete,
1035
# @hooks.hook('ceph-relation-changed')
1036
# def ceph_changed():
1037
# rq = CephBrokerRq()
1038
# rq.add_op_create_pool(name='poolname', replica_count=3)
1040
# if is_request_complete(rq):
1041
# <Request complete actions>
1043
# send_request_if_needed(get_ceph_request())
1045
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1046
# of glance having sent a request to ceph which ceph has successfully processed
1050
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1051
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1052
# 'ceph-public-address': '10.5.44.103',
1053
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1054
# 'private-address': '10.5.44.103',
1057
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1058
# '"ops": [{"replicas": 3, "name": "glance", '
1059
# '"op": "create-pool"}]}'),
1060
# 'private-address': '10.5.44.109',
1064
def get_previous_request(rid):
1065
"""Return the last ceph broker request sent on a given relation
1067
@param rid: Relation id to query for request
1070
broker_req = relation_get(attribute='broker_req', rid=rid,
1073
request_data = json.loads(broker_req)
1074
request = CephBrokerRq(api_version=request_data['api-version'],
1075
request_id=request_data['request-id'])
1076
request.set_ops(request_data['ops'])
1081
def get_request_states(request, relation='ceph'):
1082
"""Return a dict of requests per relation id with their corresponding
1085
This allows a charm, which has a request for ceph, to see whether there is
1086
an equivalent request already being processed and if so what state that
1089
@param request: A CephBrokerRq object
1093
for rid in relation_ids(relation):
1095
previous_request = get_previous_request(rid)
1096
if request == previous_request:
1098
complete = is_request_complete_for_rid(previous_request, rid)
1105
'complete': complete,
1111
def is_request_sent(request, relation='ceph'):
1112
"""Check to see if a functionally equivalent request has already been sent
1114
Returns True if a similair request has been sent
1116
@param request: A CephBrokerRq object
1118
states = get_request_states(request, relation=relation)
1119
for rid in states.keys():
1120
if not states[rid]['sent']:
1126
def is_request_complete(request, relation='ceph'):
1127
"""Check to see if a functionally equivalent request has already been
1130
Returns True if a similair request has been completed
1132
@param request: A CephBrokerRq object
1134
states = get_request_states(request, relation=relation)
1135
for rid in states.keys():
1136
if not states[rid]['complete']:
1142
def is_request_complete_for_rid(request, rid):
1143
"""Check if a given request has been completed on the given relation
1145
@param request: A CephBrokerRq object
1146
@param rid: Relation ID
1148
broker_key = get_broker_rsp_key()
1149
for unit in related_units(rid):
1150
rdata = relation_get(rid=rid, unit=unit)
1151
if rdata.get(broker_key):
1152
rsp = CephBrokerRsp(rdata.get(broker_key))
1153
if rsp.request_id == request.request_id:
1154
if not rsp.exit_code:
1157
# The remote unit sent no reply targeted at this unit so either the
1158
# remote ceph cluster does not support unit targeted replies or it
1159
# has not processed our request yet.
1160
if rdata.get('broker_rsp'):
1161
request_data = json.loads(rdata['broker_rsp'])
1162
if request_data.get('request-id'):
1163
log('Ignoring legacy broker_rsp without unit key as remote '
1164
'service supports unit specific replies', level=DEBUG)
1166
log('Using legacy broker_rsp as remote service does not '
1167
'supports unit specific replies', level=DEBUG)
1168
rsp = CephBrokerRsp(rdata['broker_rsp'])
1169
if not rsp.exit_code:
1175
def get_broker_rsp_key():
1176
"""Return broker response key for this unit
1178
This is the key that ceph is going to use to pass request status
1179
information back to this unit
1181
return 'broker-rsp-' + local_unit().replace('/', '-')
1184
def send_request_if_needed(request, relation='ceph'):
1185
"""Send broker request if an equivalent request has not already been sent
1187
@param request: A CephBrokerRq object
1189
if is_request_sent(request, relation=relation):
1190
log('Request already sent but not complete, not sending new request',
1193
for rid in relation_ids(relation):
1194
log('Sending request {}'.format(request.request_id), level=DEBUG)
1195
relation_set(relation_id=rid, broker_req=request.request)