67
76
err to syslog = {use_syslog}
68
77
clog to syslog = {use_syslog}
79
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
80
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
83
def validator(value, valid_type, valid_range=None):
85
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
90
This says I'm testing value=1. It must be an int inclusive in [0,2]
92
:param value: The value to validate
93
:param valid_type: The type that value should be.
94
:param valid_range: A range of values that value can assume.
97
assert isinstance(value, valid_type), "{} is not a {}".format(
100
if valid_range is not None:
101
assert isinstance(valid_range, list), \
102
"valid_range must be a list, was given {}".format(valid_range)
103
# If we're dealing with strings
104
if valid_type is six.string_types:
105
assert value in valid_range, \
106
"{} is not in the list {}".format(value, valid_range)
107
# Integer, float should have a min and max
109
if len(valid_range) != 2:
111
"Invalid valid_range list of {} for {}. "
112
"List must be [min,max]".format(valid_range, value))
113
assert value >= valid_range[0], \
114
"{} is less than minimum allowed value of {}".format(
115
value, valid_range[0])
116
assert value <= valid_range[1], \
117
"{} is greater than maximum allowed value of {}".format(
118
value, valid_range[1])
121
class PoolCreationError(Exception):
123
A custom error to inform the caller that a pool creation failed. Provides an error message
126
def __init__(self, message):
127
super(PoolCreationError, self).__init__(message)
132
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
133
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
136
def __init__(self, service, name):
137
self.service = service
140
# Create the pool if it doesn't exist already
141
# To be implemented by subclasses
145
def add_cache_tier(self, cache_pool, mode):
147
Adds a new cache tier to an existing pool.
148
:param cache_pool: six.string_types. The cache tier pool name to add.
149
:param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
152
# Check the input types and values
153
validator(value=cache_pool, valid_type=six.string_types)
154
validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
156
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
157
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
158
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
159
check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
161
def remove_cache_tier(self, cache_pool):
163
Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
164
:param cache_pool: six.string_types. The cache tier pool name to remove.
167
# read-only is easy, writeback is much harder
168
mode = get_cache_mode(self.service, cache_pool)
169
if mode == 'readonly':
170
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
171
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
173
elif mode == 'writeback':
174
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
175
# Flush the cache and wait for it to return
176
check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
177
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
178
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
180
def get_pgs(self, pool_size):
182
:param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
184
:return: int. The number of pgs to use.
186
validator(value=pool_size, valid_type=int)
187
osd_list = get_osds(self.service)
189
# NOTE(james-page): Default to 200 for older ceph versions
190
# which don't support OSD query from cli
193
osd_list_length = len(osd_list)
194
# Calculate based on Ceph best practices
195
if osd_list_length < 5:
197
elif 5 < osd_list_length < 10:
199
elif 10 < osd_list_length < 50:
202
estimate = (osd_list_length * 100) / pool_size
203
# Return the next nearest power of 2
204
index = bisect.bisect_right(powers_of_two, estimate)
205
return powers_of_two[index]
208
class ReplicatedPool(Pool):
209
def __init__(self, service, name, pg_num=None, replicas=2):
210
super(ReplicatedPool, self).__init__(service=service, name=name)
211
self.replicas = replicas
213
self.pg_num = self.get_pgs(self.replicas)
218
if not pool_exists(self.service, self.name):
220
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
221
self.name, str(self.pg_num)]
224
except CalledProcessError:
228
# Default jerasure erasure coded pool
229
class ErasurePool(Pool):
230
def __init__(self, service, name, erasure_code_profile="default"):
231
super(ErasurePool, self).__init__(service=service, name=name)
232
self.erasure_code_profile = erasure_code_profile
235
if not pool_exists(self.service, self.name):
236
# Try to find the erasure profile information so we can properly size the pgs
237
erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
240
if erasure_profile is None:
241
log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
243
raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
244
if 'k' not in erasure_profile or 'm' not in erasure_profile:
246
log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
248
raise PoolCreationError(
249
message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
251
pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
253
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
254
'erasure', self.erasure_code_profile]
257
except CalledProcessError:
260
"""Get an existing erasure code profile if it already exists.
261
Returns json formatted output"""
264
def get_mon_map(service):
266
Returns the current monitor map.
267
:param service: six.string_types. The Ceph user name to run the command under
268
:return: json string. :raise: ValueError if the monmap fails to parse.
269
Also raises CalledProcessError if our ceph command fails
272
mon_status = check_output(
273
['ceph', '--id', service,
274
'mon_status', '--format=json'])
276
return json.loads(mon_status)
277
except ValueError as v:
278
log("Unable to parse mon_status json: {}. Error: {}".format(
279
mon_status, v.message))
281
except CalledProcessError as e:
282
log("mon_status command failed with message: {}".format(
287
def hash_monitor_names(service):
289
Uses the get_mon_map() function to get information about the monitor
291
Hash the name of each monitor. Return a sorted list of monitor hashes
292
in an ascending order.
293
:param service: six.string_types. The Ceph user name to run the command under
294
:rtype : dict. json dict of monitor name, ip address and rank
296
'name': 'ip-172-31-13-165',
298
'addr': '172.31.13.165:6789/0'}
302
monitor_list = get_mon_map(service=service)
303
if monitor_list['monmap']['mons']:
304
for mon in monitor_list['monmap']['mons']:
306
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
307
return sorted(hash_list)
310
except (ValueError, CalledProcessError):
314
def monitor_key_delete(service, key):
316
Delete a key and value pair from the monitor cluster
317
:param service: six.string_types. The Ceph user name to run the command under
318
Deletes a key value pair on the monitor cluster.
319
:param key: six.string_types. The key to delete.
323
['ceph', '--id', service,
324
'config-key', 'del', str(key)])
325
except CalledProcessError as e:
326
log("Monitor config-key put failed with message: {}".format(
331
def monitor_key_set(service, key, value):
333
Sets a key value pair on the monitor cluster.
334
:param service: six.string_types. The Ceph user name to run the command under
335
:param key: six.string_types. The key to set.
336
:param value: The value to set. This will be converted to a string
341
['ceph', '--id', service,
342
'config-key', 'put', str(key), str(value)])
343
except CalledProcessError as e:
344
log("Monitor config-key put failed with message: {}".format(
349
def monitor_key_get(service, key):
351
Gets the value of an existing key in the monitor cluster.
352
:param service: six.string_types. The Ceph user name to run the command under
353
:param key: six.string_types. The key to search for.
354
:return: Returns the value of that key or None if not found.
357
output = check_output(
358
['ceph', '--id', service,
359
'config-key', 'get', str(key)])
361
except CalledProcessError as e:
362
log("Monitor config-key get failed with message: {}".format(
367
def monitor_key_exists(service, key):
369
Searches for the existence of a key in the monitor cluster.
370
:param service: six.string_types. The Ceph user name to run the command under
371
:param key: six.string_types. The key to search for
372
:return: Returns True if the key exists, False if not and raises an
373
exception if an unknown error occurs. :raise: CalledProcessError if
374
an unknown error occurs
378
['ceph', '--id', service,
379
'config-key', 'exists', str(key)])
380
# I can return true here regardless because Ceph returns
381
# ENOENT if the key wasn't found
383
except CalledProcessError as e:
384
if e.returncode == errno.ENOENT:
387
log("Unknown error from ceph config-get exists: {} {}".format(
388
e.returncode, e.output))
392
def get_erasure_profile(service, name):
394
:param service: six.string_types. The Ceph user name to run the command under
399
out = check_output(['ceph', '--id', service,
400
'osd', 'erasure-code-profile', 'get',
401
name, '--format=json'])
402
return json.loads(out)
403
except (CalledProcessError, OSError, ValueError):
407
def pool_set(service, pool_name, key, value):
409
Sets a value for a RADOS pool in ceph.
410
:param service: six.string_types. The Ceph user name to run the command under
411
:param pool_name: six.string_types
412
:param key: six.string_types
414
:return: None. Can raise CalledProcessError
416
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
419
except CalledProcessError:
423
def snapshot_pool(service, pool_name, snapshot_name):
425
Snapshots a RADOS pool in ceph.
426
:param service: six.string_types. The Ceph user name to run the command under
427
:param pool_name: six.string_types
428
:param snapshot_name: six.string_types
429
:return: None. Can raise CalledProcessError
431
cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
434
except CalledProcessError:
438
def remove_pool_snapshot(service, pool_name, snapshot_name):
440
Remove a snapshot from a RADOS pool in ceph.
441
:param service: six.string_types. The Ceph user name to run the command under
442
:param pool_name: six.string_types
443
:param snapshot_name: six.string_types
444
:return: None. Can raise CalledProcessError
446
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
449
except CalledProcessError:
453
# max_bytes should be an int or long
454
def set_pool_quota(service, pool_name, max_bytes):
456
:param service: six.string_types. The Ceph user name to run the command under
457
:param pool_name: six.string_types
458
:param max_bytes: int or long
459
:return: None. Can raise CalledProcessError
461
# Set a byte quota on a RADOS pool in ceph.
462
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
463
'max_bytes', str(max_bytes)]
466
except CalledProcessError:
470
def remove_pool_quota(service, pool_name):
472
Set a byte quota on a RADOS pool in ceph.
473
:param service: six.string_types. The Ceph user name to run the command under
474
:param pool_name: six.string_types
475
:return: None. Can raise CalledProcessError
477
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
480
except CalledProcessError:
484
def remove_erasure_profile(service, profile_name):
486
Create a new erasure code profile if one does not already exist for it. Updates
487
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
489
:param service: six.string_types. The Ceph user name to run the command under
490
:param profile_name: six.string_types
491
:return: None. Can raise CalledProcessError
493
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
497
except CalledProcessError:
501
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
502
failure_domain='host',
503
data_chunks=2, coding_chunks=1,
504
locality=None, durability_estimator=None):
506
Create a new erasure code profile if one does not already exist for it. Updates
507
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
509
:param service: six.string_types. The Ceph user name to run the command under
510
:param profile_name: six.string_types
511
:param erasure_plugin_name: six.string_types
512
:param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
513
'room', 'root', 'row'])
514
:param data_chunks: int
515
:param coding_chunks: int
517
:param durability_estimator: int
518
:return: None. Can raise CalledProcessError
520
# Ensure this failure_domain is allowed by Ceph
521
validator(failure_domain, six.string_types,
522
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
524
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
525
'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
526
'ruleset_failure_domain=' + failure_domain]
527
if locality is not None and durability_estimator is not None:
528
raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
530
# Add plugin specific information
531
if locality is not None:
532
# For local erasure codes
533
cmd.append('l=' + str(locality))
534
if durability_estimator is not None:
535
# For Shec erasure codes
536
cmd.append('c=' + str(durability_estimator))
538
if erasure_profile_exists(service, profile_name):
539
cmd.append('--force')
543
except CalledProcessError:
547
def rename_pool(service, old_name, new_name):
549
Rename a Ceph pool from old_name to new_name
550
:param service: six.string_types. The Ceph user name to run the command under
551
:param old_name: six.string_types
552
:param new_name: six.string_types
555
validator(value=old_name, valid_type=six.string_types)
556
validator(value=new_name, valid_type=six.string_types)
558
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
562
def erasure_profile_exists(service, name):
564
Check to see if an Erasure code profile already exists.
565
:param service: six.string_types. The Ceph user name to run the command under
566
:param name: six.string_types
569
validator(value=name, valid_type=six.string_types)
571
check_call(['ceph', '--id', service,
572
'osd', 'erasure-code-profile', 'get',
575
except CalledProcessError:
579
def get_cache_mode(service, pool_name):
581
Find the current caching mode of the pool_name given.
582
:param service: six.string_types. The Ceph user name to run the command under
583
:param pool_name: six.string_types
586
validator(value=service, valid_type=six.string_types)
587
validator(value=pool_name, valid_type=six.string_types)
588
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
590
osd_json = json.loads(out)
591
for pool in osd_json['pools']:
592
if pool['pool_name'] == pool_name:
593
return pool['cache_mode']
599
def pool_exists(service, name):
600
"""Check to see if a RADOS pool already exists."""
602
out = check_output(['rados', '--id', service,
603
'lspools']).decode('UTF-8')
604
except CalledProcessError:
610
def get_osds(service):
611
"""Return a list of all Ceph Object Storage Daemons currently in the
614
version = ceph_version()
615
if version and version >= '0.56':
616
return json.loads(check_output(['ceph', '--id', service,
618
'--format=json']).decode('UTF-8'))
432
999
The API is versioned and defaults to version 1.
434
1002
def __init__(self, encoded_rsp):
435
1003
self.api_version = None
436
1004
self.rsp = json.loads(encoded_rsp)
1007
def request_id(self):
1008
return self.rsp.get('request-id')
439
1011
def exit_code(self):
440
1012
return self.rsp.get('exit-code')
443
1015
def exit_msg(self):
444
1016
return self.rsp.get('stderr')
1019
# Ceph Broker Conversation:
1020
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1021
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1022
# unique id so that the client can identity which CephBrokerRsp is associated
1023
# with the request. Ceph will also respond to each client unit individually
1024
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1025
# via key broker-rsp-glance-0
1027
# To use this the charm can just do something like:
1029
# from charmhelpers.contrib.storage.linux.ceph import (
1030
# send_request_if_needed,
1031
# is_request_complete,
1035
# @hooks.hook('ceph-relation-changed')
1036
# def ceph_changed():
1037
# rq = CephBrokerRq()
1038
# rq.add_op_create_pool(name='poolname', replica_count=3)
1040
# if is_request_complete(rq):
1041
# <Request complete actions>
1043
# send_request_if_needed(get_ceph_request())
1045
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1046
# of glance having sent a request to ceph which ceph has successfully processed
1050
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1051
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1052
# 'ceph-public-address': '10.5.44.103',
1053
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1054
# 'private-address': '10.5.44.103',
1057
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1058
# '"ops": [{"replicas": 3, "name": "glance", '
1059
# '"op": "create-pool"}]}'),
1060
# 'private-address': '10.5.44.109',
1064
def get_previous_request(rid):
1065
"""Return the last ceph broker request sent on a given relation
1067
@param rid: Relation id to query for request
1070
broker_req = relation_get(attribute='broker_req', rid=rid,
1073
request_data = json.loads(broker_req)
1074
request = CephBrokerRq(api_version=request_data['api-version'],
1075
request_id=request_data['request-id'])
1076
request.set_ops(request_data['ops'])
1081
def get_request_states(request, relation='ceph'):
1082
"""Return a dict of requests per relation id with their corresponding
1085
This allows a charm, which has a request for ceph, to see whether there is
1086
an equivalent request already being processed and if so what state that
1089
@param request: A CephBrokerRq object
1093
for rid in relation_ids(relation):
1095
previous_request = get_previous_request(rid)
1096
if request == previous_request:
1098
complete = is_request_complete_for_rid(previous_request, rid)
1105
'complete': complete,
1111
def is_request_sent(request, relation='ceph'):
1112
"""Check to see if a functionally equivalent request has already been sent
1114
Returns True if a similair request has been sent
1116
@param request: A CephBrokerRq object
1118
states = get_request_states(request, relation=relation)
1119
for rid in states.keys():
1120
if not states[rid]['sent']:
1126
def is_request_complete(request, relation='ceph'):
1127
"""Check to see if a functionally equivalent request has already been
1130
Returns True if a similair request has been completed
1132
@param request: A CephBrokerRq object
1134
states = get_request_states(request, relation=relation)
1135
for rid in states.keys():
1136
if not states[rid]['complete']:
1142
def is_request_complete_for_rid(request, rid):
1143
"""Check if a given request has been completed on the given relation
1145
@param request: A CephBrokerRq object
1146
@param rid: Relation ID
1148
broker_key = get_broker_rsp_key()
1149
for unit in related_units(rid):
1150
rdata = relation_get(rid=rid, unit=unit)
1151
if rdata.get(broker_key):
1152
rsp = CephBrokerRsp(rdata.get(broker_key))
1153
if rsp.request_id == request.request_id:
1154
if not rsp.exit_code:
1157
# The remote unit sent no reply targeted at this unit so either the
1158
# remote ceph cluster does not support unit targeted replies or it
1159
# has not processed our request yet.
1160
if rdata.get('broker_rsp'):
1161
request_data = json.loads(rdata['broker_rsp'])
1162
if request_data.get('request-id'):
1163
log('Ignoring legacy broker_rsp without unit key as remote '
1164
'service supports unit specific replies', level=DEBUG)
1166
log('Using legacy broker_rsp as remote service does not '
1167
'supports unit specific replies', level=DEBUG)
1168
rsp = CephBrokerRsp(rdata['broker_rsp'])
1169
if not rsp.exit_code:
1175
def get_broker_rsp_key():
1176
"""Return broker response key for this unit
1178
This is the key that ceph is going to use to pass request status
1179
information back to this unit
1181
return 'broker-rsp-' + local_unit().replace('/', '-')
1184
def send_request_if_needed(request, relation='ceph'):
1185
"""Send broker request if an equivalent request has not already been sent
1187
@param request: A CephBrokerRq object
1189
if is_request_sent(request, relation=relation):
1190
log('Request already sent but not complete, not sending new request',
1193
for rid in relation_ids(relation):
1194
log('Sending request {}'.format(request.request_id), level=DEBUG)
1195
relation_set(relation_id=rid, broker_req=request.request)