67
76
err to syslog = {use_syslog}
68
77
clog to syslog = {use_syslog}
79
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
80
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
83
def validator(value, valid_type, valid_range=None):
85
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
90
This says I'm testing value=1. It must be an int inclusive in [0,2]
92
:param value: The value to validate
93
:param valid_type: The type that value should be.
94
:param valid_range: A range of values that value can assume.
97
assert isinstance(value, valid_type), "{} is not a {}".format(
100
if valid_range is not None:
101
assert isinstance(valid_range, list), \
102
"valid_range must be a list, was given {}".format(valid_range)
103
# If we're dealing with strings
104
if valid_type is six.string_types:
105
assert value in valid_range, \
106
"{} is not in the list {}".format(value, valid_range)
107
# Integer, float should have a min and max
109
if len(valid_range) != 2:
111
"Invalid valid_range list of {} for {}. "
112
"List must be [min,max]".format(valid_range, value))
113
assert value >= valid_range[0], \
114
"{} is less than minimum allowed value of {}".format(
115
value, valid_range[0])
116
assert value <= valid_range[1], \
117
"{} is greater than maximum allowed value of {}".format(
118
value, valid_range[1])
121
class PoolCreationError(Exception):
123
A custom error to inform the caller that a pool creation failed. Provides an error message
126
def __init__(self, message):
127
super(PoolCreationError, self).__init__(message)
132
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
133
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
136
def __init__(self, service, name):
137
self.service = service
140
# Create the pool if it doesn't exist already
141
# To be implemented by subclasses
145
def add_cache_tier(self, cache_pool, mode):
147
Adds a new cache tier to an existing pool.
148
:param cache_pool: six.string_types. The cache tier pool name to add.
149
:param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
152
# Check the input types and values
153
validator(value=cache_pool, valid_type=six.string_types)
154
validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
156
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
157
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
158
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
159
check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
161
def remove_cache_tier(self, cache_pool):
163
Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
164
:param cache_pool: six.string_types. The cache tier pool name to remove.
167
# read-only is easy, writeback is much harder
168
mode = get_cache_mode(self.service, cache_pool)
169
version = ceph_version()
170
if mode == 'readonly':
171
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
172
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
174
elif mode == 'writeback':
175
pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
176
'cache-mode', cache_pool, 'forward']
177
if version >= '10.1':
178
# Jewel added a mandatory flag
179
pool_forward_cmd.append('--yes-i-really-mean-it')
181
check_call(pool_forward_cmd)
182
# Flush the cache and wait for it to return
183
check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
184
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
185
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
187
def get_pgs(self, pool_size):
189
:param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
191
:return: int. The number of pgs to use.
193
validator(value=pool_size, valid_type=int)
194
osd_list = get_osds(self.service)
196
# NOTE(james-page): Default to 200 for older ceph versions
197
# which don't support OSD query from cli
200
osd_list_length = len(osd_list)
201
# Calculate based on Ceph best practices
202
if osd_list_length < 5:
204
elif 5 < osd_list_length < 10:
206
elif 10 < osd_list_length < 50:
209
estimate = (osd_list_length * 100) / pool_size
210
# Return the next nearest power of 2
211
index = bisect.bisect_right(powers_of_two, estimate)
212
return powers_of_two[index]
215
class ReplicatedPool(Pool):
216
def __init__(self, service, name, pg_num=None, replicas=2):
217
super(ReplicatedPool, self).__init__(service=service, name=name)
218
self.replicas = replicas
220
self.pg_num = self.get_pgs(self.replicas)
225
if not pool_exists(self.service, self.name):
227
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
228
self.name, str(self.pg_num)]
231
# Set the pool replica size
232
update_pool(client=self.service,
234
settings={'size': str(self.replicas)})
235
except CalledProcessError:
239
# Default jerasure erasure coded pool
240
class ErasurePool(Pool):
241
def __init__(self, service, name, erasure_code_profile="default"):
242
super(ErasurePool, self).__init__(service=service, name=name)
243
self.erasure_code_profile = erasure_code_profile
246
if not pool_exists(self.service, self.name):
247
# Try to find the erasure profile information so we can properly size the pgs
248
erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
251
if erasure_profile is None:
252
log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
254
raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
255
if 'k' not in erasure_profile or 'm' not in erasure_profile:
257
log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
259
raise PoolCreationError(
260
message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
262
pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
264
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
265
'erasure', self.erasure_code_profile]
268
except CalledProcessError:
271
"""Get an existing erasure code profile if it already exists.
272
Returns json formatted output"""
275
def get_mon_map(service):
277
Returns the current monitor map.
278
:param service: six.string_types. The Ceph user name to run the command under
279
:return: json string. :raise: ValueError if the monmap fails to parse.
280
Also raises CalledProcessError if our ceph command fails
283
mon_status = check_output(
284
['ceph', '--id', service,
285
'mon_status', '--format=json'])
287
return json.loads(mon_status)
288
except ValueError as v:
289
log("Unable to parse mon_status json: {}. Error: {}".format(
290
mon_status, v.message))
292
except CalledProcessError as e:
293
log("mon_status command failed with message: {}".format(
298
def hash_monitor_names(service):
300
Uses the get_mon_map() function to get information about the monitor
302
Hash the name of each monitor. Return a sorted list of monitor hashes
303
in an ascending order.
304
:param service: six.string_types. The Ceph user name to run the command under
305
:rtype : dict. json dict of monitor name, ip address and rank
307
'name': 'ip-172-31-13-165',
309
'addr': '172.31.13.165:6789/0'}
313
monitor_list = get_mon_map(service=service)
314
if monitor_list['monmap']['mons']:
315
for mon in monitor_list['monmap']['mons']:
317
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
318
return sorted(hash_list)
321
except (ValueError, CalledProcessError):
325
def monitor_key_delete(service, key):
327
Delete a key and value pair from the monitor cluster
328
:param service: six.string_types. The Ceph user name to run the command under
329
Deletes a key value pair on the monitor cluster.
330
:param key: six.string_types. The key to delete.
334
['ceph', '--id', service,
335
'config-key', 'del', str(key)])
336
except CalledProcessError as e:
337
log("Monitor config-key put failed with message: {}".format(
342
def monitor_key_set(service, key, value):
344
Sets a key value pair on the monitor cluster.
345
:param service: six.string_types. The Ceph user name to run the command under
346
:param key: six.string_types. The key to set.
347
:param value: The value to set. This will be converted to a string
352
['ceph', '--id', service,
353
'config-key', 'put', str(key), str(value)])
354
except CalledProcessError as e:
355
log("Monitor config-key put failed with message: {}".format(
360
def monitor_key_get(service, key):
362
Gets the value of an existing key in the monitor cluster.
363
:param service: six.string_types. The Ceph user name to run the command under
364
:param key: six.string_types. The key to search for.
365
:return: Returns the value of that key or None if not found.
368
output = check_output(
369
['ceph', '--id', service,
370
'config-key', 'get', str(key)])
372
except CalledProcessError as e:
373
log("Monitor config-key get failed with message: {}".format(
378
def monitor_key_exists(service, key):
380
Searches for the existence of a key in the monitor cluster.
381
:param service: six.string_types. The Ceph user name to run the command under
382
:param key: six.string_types. The key to search for
383
:return: Returns True if the key exists, False if not and raises an
384
exception if an unknown error occurs. :raise: CalledProcessError if
385
an unknown error occurs
389
['ceph', '--id', service,
390
'config-key', 'exists', str(key)])
391
# I can return true here regardless because Ceph returns
392
# ENOENT if the key wasn't found
394
except CalledProcessError as e:
395
if e.returncode == errno.ENOENT:
398
log("Unknown error from ceph config-get exists: {} {}".format(
399
e.returncode, e.output))
403
def get_erasure_profile(service, name):
405
:param service: six.string_types. The Ceph user name to run the command under
410
out = check_output(['ceph', '--id', service,
411
'osd', 'erasure-code-profile', 'get',
412
name, '--format=json'])
413
return json.loads(out)
414
except (CalledProcessError, OSError, ValueError):
418
def pool_set(service, pool_name, key, value):
420
Sets a value for a RADOS pool in ceph.
421
:param service: six.string_types. The Ceph user name to run the command under
422
:param pool_name: six.string_types
423
:param key: six.string_types
425
:return: None. Can raise CalledProcessError
427
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
430
except CalledProcessError:
434
def snapshot_pool(service, pool_name, snapshot_name):
436
Snapshots a RADOS pool in ceph.
437
:param service: six.string_types. The Ceph user name to run the command under
438
:param pool_name: six.string_types
439
:param snapshot_name: six.string_types
440
:return: None. Can raise CalledProcessError
442
cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
445
except CalledProcessError:
449
def remove_pool_snapshot(service, pool_name, snapshot_name):
451
Remove a snapshot from a RADOS pool in ceph.
452
:param service: six.string_types. The Ceph user name to run the command under
453
:param pool_name: six.string_types
454
:param snapshot_name: six.string_types
455
:return: None. Can raise CalledProcessError
457
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
460
except CalledProcessError:
464
# max_bytes should be an int or long
465
def set_pool_quota(service, pool_name, max_bytes):
467
:param service: six.string_types. The Ceph user name to run the command under
468
:param pool_name: six.string_types
469
:param max_bytes: int or long
470
:return: None. Can raise CalledProcessError
472
# Set a byte quota on a RADOS pool in ceph.
473
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
474
'max_bytes', str(max_bytes)]
477
except CalledProcessError:
481
def remove_pool_quota(service, pool_name):
483
Set a byte quota on a RADOS pool in ceph.
484
:param service: six.string_types. The Ceph user name to run the command under
485
:param pool_name: six.string_types
486
:return: None. Can raise CalledProcessError
488
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
491
except CalledProcessError:
495
def remove_erasure_profile(service, profile_name):
497
Create a new erasure code profile if one does not already exist for it. Updates
498
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
500
:param service: six.string_types. The Ceph user name to run the command under
501
:param profile_name: six.string_types
502
:return: None. Can raise CalledProcessError
504
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
508
except CalledProcessError:
512
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
513
failure_domain='host',
514
data_chunks=2, coding_chunks=1,
515
locality=None, durability_estimator=None):
517
Create a new erasure code profile if one does not already exist for it. Updates
518
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
520
:param service: six.string_types. The Ceph user name to run the command under
521
:param profile_name: six.string_types
522
:param erasure_plugin_name: six.string_types
523
:param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
524
'room', 'root', 'row'])
525
:param data_chunks: int
526
:param coding_chunks: int
528
:param durability_estimator: int
529
:return: None. Can raise CalledProcessError
531
# Ensure this failure_domain is allowed by Ceph
532
validator(failure_domain, six.string_types,
533
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
535
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
536
'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
537
'ruleset_failure_domain=' + failure_domain]
538
if locality is not None and durability_estimator is not None:
539
raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
541
# Add plugin specific information
542
if locality is not None:
543
# For local erasure codes
544
cmd.append('l=' + str(locality))
545
if durability_estimator is not None:
546
# For Shec erasure codes
547
cmd.append('c=' + str(durability_estimator))
549
if erasure_profile_exists(service, profile_name):
550
cmd.append('--force')
554
except CalledProcessError:
558
def rename_pool(service, old_name, new_name):
560
Rename a Ceph pool from old_name to new_name
561
:param service: six.string_types. The Ceph user name to run the command under
562
:param old_name: six.string_types
563
:param new_name: six.string_types
566
validator(value=old_name, valid_type=six.string_types)
567
validator(value=new_name, valid_type=six.string_types)
569
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
573
def erasure_profile_exists(service, name):
575
Check to see if an Erasure code profile already exists.
576
:param service: six.string_types. The Ceph user name to run the command under
577
:param name: six.string_types
580
validator(value=name, valid_type=six.string_types)
582
check_call(['ceph', '--id', service,
583
'osd', 'erasure-code-profile', 'get',
586
except CalledProcessError:
590
def get_cache_mode(service, pool_name):
592
Find the current caching mode of the pool_name given.
593
:param service: six.string_types. The Ceph user name to run the command under
594
:param pool_name: six.string_types
597
validator(value=service, valid_type=six.string_types)
598
validator(value=pool_name, valid_type=six.string_types)
599
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
601
osd_json = json.loads(out)
602
for pool in osd_json['pools']:
603
if pool['pool_name'] == pool_name:
604
return pool['cache_mode']
610
def pool_exists(service, name):
611
"""Check to see if a RADOS pool already exists."""
613
out = check_output(['rados', '--id', service,
614
'lspools']).decode('UTF-8')
615
except CalledProcessError:
618
return name in out.split()
621
def get_osds(service):
622
"""Return a list of all Ceph Object Storage Daemons currently in the
625
version = ceph_version()
626
if version and version >= '0.56':
627
return json.loads(check_output(['ceph', '--id', service,
629
'--format=json']).decode('UTF-8'))
432
1010
The API is versioned and defaults to version 1.
434
1013
def __init__(self, encoded_rsp):
435
1014
self.api_version = None
436
1015
self.rsp = json.loads(encoded_rsp)
1018
def request_id(self):
1019
return self.rsp.get('request-id')
439
1022
def exit_code(self):
440
1023
return self.rsp.get('exit-code')
443
1026
def exit_msg(self):
444
1027
return self.rsp.get('stderr')
1030
# Ceph Broker Conversation:
1031
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1032
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1033
# unique id so that the client can identity which CephBrokerRsp is associated
1034
# with the request. Ceph will also respond to each client unit individually
1035
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1036
# via key broker-rsp-glance-0
1038
# To use this the charm can just do something like:
1040
# from charmhelpers.contrib.storage.linux.ceph import (
1041
# send_request_if_needed,
1042
# is_request_complete,
1046
# @hooks.hook('ceph-relation-changed')
1047
# def ceph_changed():
1048
# rq = CephBrokerRq()
1049
# rq.add_op_create_pool(name='poolname', replica_count=3)
1051
# if is_request_complete(rq):
1052
# <Request complete actions>
1054
# send_request_if_needed(get_ceph_request())
1056
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1057
# of glance having sent a request to ceph which ceph has successfully processed
1061
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1062
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1063
# 'ceph-public-address': '10.5.44.103',
1064
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1065
# 'private-address': '10.5.44.103',
1068
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1069
# '"ops": [{"replicas": 3, "name": "glance", '
1070
# '"op": "create-pool"}]}'),
1071
# 'private-address': '10.5.44.109',
1075
def get_previous_request(rid):
1076
"""Return the last ceph broker request sent on a given relation
1078
@param rid: Relation id to query for request
1081
broker_req = relation_get(attribute='broker_req', rid=rid,
1084
request_data = json.loads(broker_req)
1085
request = CephBrokerRq(api_version=request_data['api-version'],
1086
request_id=request_data['request-id'])
1087
request.set_ops(request_data['ops'])
1092
def get_request_states(request, relation='ceph'):
1093
"""Return a dict of requests per relation id with their corresponding
1096
This allows a charm, which has a request for ceph, to see whether there is
1097
an equivalent request already being processed and if so what state that
1100
@param request: A CephBrokerRq object
1104
for rid in relation_ids(relation):
1106
previous_request = get_previous_request(rid)
1107
if request == previous_request:
1109
complete = is_request_complete_for_rid(previous_request, rid)
1116
'complete': complete,
1122
def is_request_sent(request, relation='ceph'):
1123
"""Check to see if a functionally equivalent request has already been sent
1125
Returns True if a similair request has been sent
1127
@param request: A CephBrokerRq object
1129
states = get_request_states(request, relation=relation)
1130
for rid in states.keys():
1131
if not states[rid]['sent']:
1137
def is_request_complete(request, relation='ceph'):
1138
"""Check to see if a functionally equivalent request has already been
1141
Returns True if a similair request has been completed
1143
@param request: A CephBrokerRq object
1145
states = get_request_states(request, relation=relation)
1146
for rid in states.keys():
1147
if not states[rid]['complete']:
1153
def is_request_complete_for_rid(request, rid):
1154
"""Check if a given request has been completed on the given relation
1156
@param request: A CephBrokerRq object
1157
@param rid: Relation ID
1159
broker_key = get_broker_rsp_key()
1160
for unit in related_units(rid):
1161
rdata = relation_get(rid=rid, unit=unit)
1162
if rdata.get(broker_key):
1163
rsp = CephBrokerRsp(rdata.get(broker_key))
1164
if rsp.request_id == request.request_id:
1165
if not rsp.exit_code:
1168
# The remote unit sent no reply targeted at this unit so either the
1169
# remote ceph cluster does not support unit targeted replies or it
1170
# has not processed our request yet.
1171
if rdata.get('broker_rsp'):
1172
request_data = json.loads(rdata['broker_rsp'])
1173
if request_data.get('request-id'):
1174
log('Ignoring legacy broker_rsp without unit key as remote '
1175
'service supports unit specific replies', level=DEBUG)
1177
log('Using legacy broker_rsp as remote service does not '
1178
'supports unit specific replies', level=DEBUG)
1179
rsp = CephBrokerRsp(rdata['broker_rsp'])
1180
if not rsp.exit_code:
1186
def get_broker_rsp_key():
1187
"""Return broker response key for this unit
1189
This is the key that ceph is going to use to pass request status
1190
information back to this unit
1192
return 'broker-rsp-' + local_unit().replace('/', '-')
1195
def send_request_if_needed(request, relation='ceph'):
1196
"""Send broker request if an equivalent request has not already been sent
1198
@param request: A CephBrokerRq object
1200
if is_request_sent(request, relation=relation):
1201
log('Request already sent but not complete, not sending new request',
1204
for rid in relation_ids(relation):
1205
log('Sending request {}'.format(request.request_id), level=DEBUG)
1206
relation_set(relation_id=rid, broker_req=request.request)