68
78
clog to syslog = {use_syslog}
81
# The number of placement groups per OSD to target for placement group
82
# calculations. This number is chosen as 100 due to the ceph PG Calc
83
# documentation recommending to choose 100 for clusters which are not
84
# expected to increase in the foreseeable future. Since the majority of the
85
# calculations are done on deployment, target the case of non-expanding
86
# clusters as the default.
87
DEFAULT_PGS_PER_OSD_TARGET = 100
88
DEFAULT_POOL_WEIGHT = 10.0
90
DEFAULT_MINIMUM_PGS = 2
93
def validator(value, valid_type, valid_range=None):
95
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
100
This says I'm testing value=1. It must be an int inclusive in [0,2]
102
:param value: The value to validate
103
:param valid_type: The type that value should be.
104
:param valid_range: A range of values that value can assume.
107
assert isinstance(value, valid_type), "{} is not a {}".format(
110
if valid_range is not None:
111
assert isinstance(valid_range, list), \
112
"valid_range must be a list, was given {}".format(valid_range)
113
# If we're dealing with strings
114
if valid_type is six.string_types:
115
assert value in valid_range, \
116
"{} is not in the list {}".format(value, valid_range)
117
# Integer, float should have a min and max
119
if len(valid_range) != 2:
121
"Invalid valid_range list of {} for {}. "
122
"List must be [min,max]".format(valid_range, value))
123
assert value >= valid_range[0], \
124
"{} is less than minimum allowed value of {}".format(
125
value, valid_range[0])
126
assert value <= valid_range[1], \
127
"{} is greater than maximum allowed value of {}".format(
128
value, valid_range[1])
131
class PoolCreationError(Exception):
133
A custom error to inform the caller that a pool creation failed. Provides an error message
136
def __init__(self, message):
137
super(PoolCreationError, self).__init__(message)
142
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
143
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
146
def __init__(self, service, name):
147
self.service = service
150
# Create the pool if it doesn't exist already
151
# To be implemented by subclasses
155
def add_cache_tier(self, cache_pool, mode):
157
Adds a new cache tier to an existing pool.
158
:param cache_pool: six.string_types. The cache tier pool name to add.
159
:param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
162
# Check the input types and values
163
validator(value=cache_pool, valid_type=six.string_types)
164
validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
166
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
167
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
168
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
169
check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
171
def remove_cache_tier(self, cache_pool):
173
Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
174
:param cache_pool: six.string_types. The cache tier pool name to remove.
177
# read-only is easy, writeback is much harder
178
mode = get_cache_mode(self.service, cache_pool)
179
version = ceph_version()
180
if mode == 'readonly':
181
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
182
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
184
elif mode == 'writeback':
185
pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
186
'cache-mode', cache_pool, 'forward']
187
if version >= '10.1':
188
# Jewel added a mandatory flag
189
pool_forward_cmd.append('--yes-i-really-mean-it')
191
check_call(pool_forward_cmd)
192
# Flush the cache and wait for it to return
193
check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
194
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
195
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
197
def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT):
198
"""Return the number of placement groups to use when creating the pool.
200
Returns the number of placement groups which should be specified when
201
creating the pool. This is based upon the calculation guidelines
202
provided by the Ceph Placement Group Calculator (located online at
203
http://ceph.com/pgcalc/).
205
The number of placement groups are calculated using the following:
207
(Target PGs per OSD) * (OSD #) * (%Data)
208
----------------------------------------
211
Per the upstream guidelines, the OSD # should really be considered
212
based on the number of OSDs which are eligible to be selected by the
213
pool. Since the pool creation doesn't specify any of CRUSH set rules,
214
the default rule will be dependent upon the type of pool being
215
created (replicated or erasure).
217
This code makes no attempt to determine the number of OSDs which can be
218
selected for the specific rule, rather it is left to the user to tune
219
in the form of 'expected-osd-count' config option.
221
:param pool_size: int. pool_size is either the number of replicas for
222
replicated pools or the K+M sum for erasure coded pools
223
:param percent_data: float. the percentage of data that is expected to
224
be contained in the pool for the specific OSD set. Default value
225
is to assume 10% of the data is for this pool, which is a
226
relatively low % of the data but allows for the pg_num to be
227
increased. NOTE: the default is primarily to handle the scenario
228
where related charms requiring pools has not been upgraded to
229
include an update to indicate their relative usage of the pools.
230
:return: int. The number of pgs to use.
233
# Note: This calculation follows the approach that is provided
234
# by the Ceph PG Calculator located at http://ceph.com/pgcalc/.
235
validator(value=pool_size, valid_type=int)
237
# Ensure that percent data is set to something - even with a default
238
# it can be set to None, which would wreak havoc below.
239
if percent_data is None:
240
percent_data = DEFAULT_POOL_WEIGHT
242
# If the expected-osd-count is specified, then use the max between
243
# the expected-osd-count and the actual osd_count
244
osd_list = get_osds(self.service)
245
expected = config('expected-osd-count') or 0
248
osd_count = max(expected, len(osd_list))
250
# Log a message to provide some insight if the calculations claim
251
# to be off because someone is setting the expected count and
252
# there are more OSDs in reality. Try to make a proper guess
253
# based upon the cluster itself.
254
if expected and osd_count != expected:
255
log("Found more OSDs than provided expected count. "
256
"Using the actual count instead", INFO)
258
# Use the expected-osd-count in older ceph versions to allow for
259
# a more accurate pg calculations
262
# NOTE(james-page): Default to 200 for older ceph versions
263
# which don't support OSD query from cli
264
return LEGACY_PG_COUNT
266
percent_data /= 100.0
267
target_pgs_per_osd = config('pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET
268
num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size
270
# NOTE: ensure a sane minimum number of PGS otherwise we don't get any
271
# reasonable data distribution in minimal OSD configurations
272
if num_pg < DEFAULT_MINIMUM_PGS:
273
num_pg = DEFAULT_MINIMUM_PGS
275
# The CRUSH algorithm has a slight optimization for placement groups
276
# with powers of 2 so find the nearest power of 2. If the nearest
277
# power of 2 is more than 25% below the original value, the next
278
# highest value is used. To do this, find the nearest power of 2 such
279
# that 2^n <= num_pg, check to see if its within the 25% tolerance.
280
exponent = math.floor(math.log(num_pg, 2))
281
nearest = 2 ** exponent
282
if (num_pg - nearest) > (num_pg * 0.25):
283
# Choose the next highest power of 2 since the nearest is more
284
# than 25% below the original value.
285
return int(nearest * 2)
290
class ReplicatedPool(Pool):
291
def __init__(self, service, name, pg_num=None, replicas=2,
293
super(ReplicatedPool, self).__init__(service=service, name=name)
294
self.replicas = replicas
296
# Since the number of placement groups were specified, ensure
297
# that there aren't too many created.
298
max_pgs = self.get_pgs(self.replicas, 100.0)
299
self.pg_num = min(pg_num, max_pgs)
301
self.pg_num = self.get_pgs(self.replicas, percent_data)
304
if not pool_exists(self.service, self.name):
306
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
307
self.name, str(self.pg_num)]
310
# Set the pool replica size
311
update_pool(client=self.service,
313
settings={'size': str(self.replicas)})
314
except CalledProcessError:
318
# Default jerasure erasure coded pool
319
class ErasurePool(Pool):
320
def __init__(self, service, name, erasure_code_profile="default",
322
super(ErasurePool, self).__init__(service=service, name=name)
323
self.erasure_code_profile = erasure_code_profile
324
self.percent_data = percent_data
327
if not pool_exists(self.service, self.name):
328
# Try to find the erasure profile information in order to properly
329
# size the number of placement groups. The size of an erasure
330
# coded placement group is calculated as k+m.
331
erasure_profile = get_erasure_profile(self.service,
332
self.erasure_code_profile)
335
if erasure_profile is None:
336
msg = ("Failed to discover erasure profile named "
337
"{}".format(self.erasure_code_profile))
338
log(msg, level=ERROR)
339
raise PoolCreationError(msg)
340
if 'k' not in erasure_profile or 'm' not in erasure_profile:
342
msg = ("Unable to find k (data chunks) or m (coding chunks) "
343
"in erasure profile {}".format(erasure_profile))
344
log(msg, level=ERROR)
345
raise PoolCreationError(msg)
347
k = int(erasure_profile['k'])
348
m = int(erasure_profile['m'])
349
pgs = self.get_pgs(k + m, self.percent_data)
351
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
352
self.name, str(pgs), str(pgs),
353
'erasure', self.erasure_code_profile]
356
except CalledProcessError:
359
"""Get an existing erasure code profile if it already exists.
360
Returns json formatted output"""
363
def get_mon_map(service):
365
Returns the current monitor map.
366
:param service: six.string_types. The Ceph user name to run the command under
367
:return: json string. :raise: ValueError if the monmap fails to parse.
368
Also raises CalledProcessError if our ceph command fails
371
mon_status = check_output(
372
['ceph', '--id', service,
373
'mon_status', '--format=json'])
375
return json.loads(mon_status)
376
except ValueError as v:
377
log("Unable to parse mon_status json: {}. Error: {}".format(
378
mon_status, v.message))
380
except CalledProcessError as e:
381
log("mon_status command failed with message: {}".format(
386
def hash_monitor_names(service):
388
Uses the get_mon_map() function to get information about the monitor
390
Hash the name of each monitor. Return a sorted list of monitor hashes
391
in an ascending order.
392
:param service: six.string_types. The Ceph user name to run the command under
393
:rtype : dict. json dict of monitor name, ip address and rank
395
'name': 'ip-172-31-13-165',
397
'addr': '172.31.13.165:6789/0'}
401
monitor_list = get_mon_map(service=service)
402
if monitor_list['monmap']['mons']:
403
for mon in monitor_list['monmap']['mons']:
405
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
406
return sorted(hash_list)
409
except (ValueError, CalledProcessError):
413
def monitor_key_delete(service, key):
415
Delete a key and value pair from the monitor cluster
416
:param service: six.string_types. The Ceph user name to run the command under
417
Deletes a key value pair on the monitor cluster.
418
:param key: six.string_types. The key to delete.
422
['ceph', '--id', service,
423
'config-key', 'del', str(key)])
424
except CalledProcessError as e:
425
log("Monitor config-key put failed with message: {}".format(
430
def monitor_key_set(service, key, value):
432
Sets a key value pair on the monitor cluster.
433
:param service: six.string_types. The Ceph user name to run the command under
434
:param key: six.string_types. The key to set.
435
:param value: The value to set. This will be converted to a string
440
['ceph', '--id', service,
441
'config-key', 'put', str(key), str(value)])
442
except CalledProcessError as e:
443
log("Monitor config-key put failed with message: {}".format(
448
def monitor_key_get(service, key):
450
Gets the value of an existing key in the monitor cluster.
451
:param service: six.string_types. The Ceph user name to run the command under
452
:param key: six.string_types. The key to search for.
453
:return: Returns the value of that key or None if not found.
456
output = check_output(
457
['ceph', '--id', service,
458
'config-key', 'get', str(key)])
460
except CalledProcessError as e:
461
log("Monitor config-key get failed with message: {}".format(
466
def monitor_key_exists(service, key):
468
Searches for the existence of a key in the monitor cluster.
469
:param service: six.string_types. The Ceph user name to run the command under
470
:param key: six.string_types. The key to search for
471
:return: Returns True if the key exists, False if not and raises an
472
exception if an unknown error occurs. :raise: CalledProcessError if
473
an unknown error occurs
477
['ceph', '--id', service,
478
'config-key', 'exists', str(key)])
479
# I can return true here regardless because Ceph returns
480
# ENOENT if the key wasn't found
482
except CalledProcessError as e:
483
if e.returncode == errno.ENOENT:
486
log("Unknown error from ceph config-get exists: {} {}".format(
487
e.returncode, e.output))
491
def get_erasure_profile(service, name):
493
:param service: six.string_types. The Ceph user name to run the command under
498
out = check_output(['ceph', '--id', service,
499
'osd', 'erasure-code-profile', 'get',
500
name, '--format=json'])
501
return json.loads(out)
502
except (CalledProcessError, OSError, ValueError):
506
def pool_set(service, pool_name, key, value):
508
Sets a value for a RADOS pool in ceph.
509
:param service: six.string_types. The Ceph user name to run the command under
510
:param pool_name: six.string_types
511
:param key: six.string_types
513
:return: None. Can raise CalledProcessError
515
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
518
except CalledProcessError:
522
def snapshot_pool(service, pool_name, snapshot_name):
524
Snapshots a RADOS pool in ceph.
525
:param service: six.string_types. The Ceph user name to run the command under
526
:param pool_name: six.string_types
527
:param snapshot_name: six.string_types
528
:return: None. Can raise CalledProcessError
530
cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
533
except CalledProcessError:
537
def remove_pool_snapshot(service, pool_name, snapshot_name):
539
Remove a snapshot from a RADOS pool in ceph.
540
:param service: six.string_types. The Ceph user name to run the command under
541
:param pool_name: six.string_types
542
:param snapshot_name: six.string_types
543
:return: None. Can raise CalledProcessError
545
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
548
except CalledProcessError:
552
# max_bytes should be an int or long
553
def set_pool_quota(service, pool_name, max_bytes):
555
:param service: six.string_types. The Ceph user name to run the command under
556
:param pool_name: six.string_types
557
:param max_bytes: int or long
558
:return: None. Can raise CalledProcessError
560
# Set a byte quota on a RADOS pool in ceph.
561
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
562
'max_bytes', str(max_bytes)]
565
except CalledProcessError:
569
def remove_pool_quota(service, pool_name):
571
Set a byte quota on a RADOS pool in ceph.
572
:param service: six.string_types. The Ceph user name to run the command under
573
:param pool_name: six.string_types
574
:return: None. Can raise CalledProcessError
576
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
579
except CalledProcessError:
583
def remove_erasure_profile(service, profile_name):
585
Create a new erasure code profile if one does not already exist for it. Updates
586
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
588
:param service: six.string_types. The Ceph user name to run the command under
589
:param profile_name: six.string_types
590
:return: None. Can raise CalledProcessError
592
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
596
except CalledProcessError:
600
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
601
failure_domain='host',
602
data_chunks=2, coding_chunks=1,
603
locality=None, durability_estimator=None):
605
Create a new erasure code profile if one does not already exist for it. Updates
606
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
608
:param service: six.string_types. The Ceph user name to run the command under
609
:param profile_name: six.string_types
610
:param erasure_plugin_name: six.string_types
611
:param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
612
'room', 'root', 'row'])
613
:param data_chunks: int
614
:param coding_chunks: int
616
:param durability_estimator: int
617
:return: None. Can raise CalledProcessError
619
# Ensure this failure_domain is allowed by Ceph
620
validator(failure_domain, six.string_types,
621
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
623
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
624
'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
625
'ruleset_failure_domain=' + failure_domain]
626
if locality is not None and durability_estimator is not None:
627
raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
629
# Add plugin specific information
630
if locality is not None:
631
# For local erasure codes
632
cmd.append('l=' + str(locality))
633
if durability_estimator is not None:
634
# For Shec erasure codes
635
cmd.append('c=' + str(durability_estimator))
637
if erasure_profile_exists(service, profile_name):
638
cmd.append('--force')
642
except CalledProcessError:
646
def rename_pool(service, old_name, new_name):
648
Rename a Ceph pool from old_name to new_name
649
:param service: six.string_types. The Ceph user name to run the command under
650
:param old_name: six.string_types
651
:param new_name: six.string_types
654
validator(value=old_name, valid_type=six.string_types)
655
validator(value=new_name, valid_type=six.string_types)
657
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
661
def erasure_profile_exists(service, name):
663
Check to see if an Erasure code profile already exists.
664
:param service: six.string_types. The Ceph user name to run the command under
665
:param name: six.string_types
668
validator(value=name, valid_type=six.string_types)
670
check_call(['ceph', '--id', service,
671
'osd', 'erasure-code-profile', 'get',
674
except CalledProcessError:
678
def get_cache_mode(service, pool_name):
680
Find the current caching mode of the pool_name given.
681
:param service: six.string_types. The Ceph user name to run the command under
682
:param pool_name: six.string_types
685
validator(value=service, valid_type=six.string_types)
686
validator(value=pool_name, valid_type=six.string_types)
687
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
689
osd_json = json.loads(out)
690
for pool in osd_json['pools']:
691
if pool['pool_name'] == pool_name:
692
return pool['cache_mode']
698
def pool_exists(service, name):
699
"""Check to see if a RADOS pool already exists."""
701
out = check_output(['rados', '--id', service,
702
'lspools']).decode('UTF-8')
703
except CalledProcessError:
706
return name in out.split()
709
def get_osds(service):
710
"""Return a list of all Ceph Object Storage Daemons currently in the
713
version = ceph_version()
714
if version and version >= '0.56':
715
return json.loads(check_output(['ceph', '--id', service,
717
'--format=json']).decode('UTF-8'))
73
723
"""Basic Ceph client installation."""
432
1104
The API is versioned and defaults to version 1.
434
1107
def __init__(self, encoded_rsp):
435
1108
self.api_version = None
436
1109
self.rsp = json.loads(encoded_rsp)
1112
def request_id(self):
1113
return self.rsp.get('request-id')
439
1116
def exit_code(self):
440
1117
return self.rsp.get('exit-code')
443
1120
def exit_msg(self):
444
1121
return self.rsp.get('stderr')
1124
# Ceph Broker Conversation:
1125
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1126
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1127
# unique id so that the client can identity which CephBrokerRsp is associated
1128
# with the request. Ceph will also respond to each client unit individually
1129
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1130
# via key broker-rsp-glance-0
1132
# To use this the charm can just do something like:
1134
# from charmhelpers.contrib.storage.linux.ceph import (
1135
# send_request_if_needed,
1136
# is_request_complete,
1140
# @hooks.hook('ceph-relation-changed')
1141
# def ceph_changed():
1142
# rq = CephBrokerRq()
1143
# rq.add_op_create_pool(name='poolname', replica_count=3)
1145
# if is_request_complete(rq):
1146
# <Request complete actions>
1148
# send_request_if_needed(get_ceph_request())
1150
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1151
# of glance having sent a request to ceph which ceph has successfully processed
1155
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1156
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1157
# 'ceph-public-address': '10.5.44.103',
1158
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1159
# 'private-address': '10.5.44.103',
1162
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1163
# '"ops": [{"replicas": 3, "name": "glance", '
1164
# '"op": "create-pool"}]}'),
1165
# 'private-address': '10.5.44.109',
1169
def get_previous_request(rid):
1170
"""Return the last ceph broker request sent on a given relation
1172
@param rid: Relation id to query for request
1175
broker_req = relation_get(attribute='broker_req', rid=rid,
1178
request_data = json.loads(broker_req)
1179
request = CephBrokerRq(api_version=request_data['api-version'],
1180
request_id=request_data['request-id'])
1181
request.set_ops(request_data['ops'])
1186
def get_request_states(request, relation='ceph'):
1187
"""Return a dict of requests per relation id with their corresponding
1190
This allows a charm, which has a request for ceph, to see whether there is
1191
an equivalent request already being processed and if so what state that
1194
@param request: A CephBrokerRq object
1198
for rid in relation_ids(relation):
1200
previous_request = get_previous_request(rid)
1201
if request == previous_request:
1203
complete = is_request_complete_for_rid(previous_request, rid)
1210
'complete': complete,
1216
def is_request_sent(request, relation='ceph'):
1217
"""Check to see if a functionally equivalent request has already been sent
1219
Returns True if a similair request has been sent
1221
@param request: A CephBrokerRq object
1223
states = get_request_states(request, relation=relation)
1224
for rid in states.keys():
1225
if not states[rid]['sent']:
1231
def is_request_complete(request, relation='ceph'):
1232
"""Check to see if a functionally equivalent request has already been
1235
Returns True if a similair request has been completed
1237
@param request: A CephBrokerRq object
1239
states = get_request_states(request, relation=relation)
1240
for rid in states.keys():
1241
if not states[rid]['complete']:
1247
def is_request_complete_for_rid(request, rid):
1248
"""Check if a given request has been completed on the given relation
1250
@param request: A CephBrokerRq object
1251
@param rid: Relation ID
1253
broker_key = get_broker_rsp_key()
1254
for unit in related_units(rid):
1255
rdata = relation_get(rid=rid, unit=unit)
1256
if rdata.get(broker_key):
1257
rsp = CephBrokerRsp(rdata.get(broker_key))
1258
if rsp.request_id == request.request_id:
1259
if not rsp.exit_code:
1262
# The remote unit sent no reply targeted at this unit so either the
1263
# remote ceph cluster does not support unit targeted replies or it
1264
# has not processed our request yet.
1265
if rdata.get('broker_rsp'):
1266
request_data = json.loads(rdata['broker_rsp'])
1267
if request_data.get('request-id'):
1268
log('Ignoring legacy broker_rsp without unit key as remote '
1269
'service supports unit specific replies', level=DEBUG)
1271
log('Using legacy broker_rsp as remote service does not '
1272
'supports unit specific replies', level=DEBUG)
1273
rsp = CephBrokerRsp(rdata['broker_rsp'])
1274
if not rsp.exit_code:
1280
def get_broker_rsp_key():
1281
"""Return broker response key for this unit
1283
This is the key that ceph is going to use to pass request status
1284
information back to this unit
1286
return 'broker-rsp-' + local_unit().replace('/', '-')
1289
def send_request_if_needed(request, relation='ceph'):
1290
"""Send broker request if an equivalent request has not already been sent
1292
@param request: A CephBrokerRq object
1294
if is_request_sent(request, relation=relation):
1295
log('Request already sent but not complete, not sending new request',
1298
for rid in relation_ids(relation):
1299
log('Sending request {}'.format(request.request_id), level=DEBUG)
1300
relation_set(relation_id=rid, broker_req=request.request)
1303
class CephConfContext(object):
1304
"""Ceph config (ceph.conf) context.
1306
Supports user-provided Ceph configuration settings. Use can provide a
1307
dictionary as the value for the config-flags charm option containing
1308
Ceph configuration settings keyede by their section in ceph.conf.
1310
def __init__(self, permitted_sections=None):
1311
self.permitted_sections = permitted_sections or []
1314
conf = config('config-flags')
1318
conf = config_flags_parser(conf)
1319
if type(conf) != dict:
1320
log("Provided config-flags is not a dictionary - ignoring",
1324
permitted = self.permitted_sections
1326
diff = set(conf.keys()).difference(set(permitted))
1328
log("Config-flags contains invalid keys '%s' - they will be "
1329
"ignored" % (', '.join(diff)), level=WARNING)
1333
if permitted and key not in permitted:
1334
log("Ignoring key '%s'" % key, level=WARNING)
1337
ceph_conf[key] = conf[key]