123
def create_pool(service, name, replicas=3):
129
def update_pool(client, pool, settings):
130
cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
131
for k, v in six.iteritems(settings):
138
def create_pool(service, name, replicas=3, pg_num=None):
124
139
"""Create a new RADOS pool."""
125
140
if pool_exists(service, name):
126
141
log("Ceph pool {} already exists, skipping creation".format(name),
130
# Calculate the number of placement groups based
131
# on upstream recommended best practices.
132
osds = get_osds(service)
134
pgnum = (len(osds) * 100 // replicas)
136
# NOTE(james-page): Default to 200 for older ceph versions
137
# which don't support OSD query from cli
140
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
143
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
146
# Calculate the number of placement groups based
147
# on upstream recommended best practices.
148
osds = get_osds(service)
150
pg_num = (len(osds) * 100 // replicas)
152
# NOTE(james-page): Default to 200 for older ceph versions
153
# which don't support OSD query from cli
156
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
159
update_pool(service, name, settings={'size': str(replicas)})
148
162
def delete_pool(service, name):
412
415
The API is versioned and defaults to version 1.
414
def __init__(self, api_version=1):
417
def __init__(self, api_version=1, request_id=None):
415
418
self.api_version = api_version
420
self.request_id = request_id
422
self.request_id = str(uuid.uuid1())
418
def add_op_create_pool(self, name, replica_count=3):
425
def add_op_create_pool(self, name, replica_count=3, pg_num=None):
426
"""Adds an operation to create a pool.
428
@param pg_num setting: optional setting. If not provided, this value
429
will be calculated by the broker based on how many OSDs are in the
430
cluster at the time of creation. Note that, if provided, this value
431
will be capped at the current available maximum.
419
433
self.ops.append({'op': 'create-pool', 'name': name,
420
'replicas': replica_count})
434
'replicas': replica_count, 'pg_num': pg_num})
436
def set_ops(self, ops):
437
"""Set request ops to provided value.
439
Useful for injecting ops that come from a previous request
440
to allow comparisons to ensure validity.
423
445
def request(self):
424
return json.dumps({'api-version': self.api_version, 'ops': self.ops})
446
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
447
'request-id': self.request_id})
449
def _ops_equal(self, other):
450
if len(self.ops) == len(other.ops):
451
for req_no in range(0, len(self.ops)):
452
for key in ['replicas', 'name', 'op', 'pg_num']:
453
if self.ops[req_no].get(key) != other.ops[req_no].get(key):
459
def __eq__(self, other):
460
if not isinstance(other, self.__class__):
462
if self.api_version == other.api_version and \
463
self._ops_equal(other):
468
def __ne__(self, other):
469
return not self.__eq__(other)
427
472
class CephBrokerRsp(object):
432
477
The API is versioned and defaults to version 1.
434
480
def __init__(self, encoded_rsp):
435
481
self.api_version = None
436
482
self.rsp = json.loads(encoded_rsp)
485
def request_id(self):
486
return self.rsp.get('request-id')
439
489
def exit_code(self):
440
490
return self.rsp.get('exit-code')
443
493
def exit_msg(self):
444
494
return self.rsp.get('stderr')
497
# Ceph Broker Conversation:
498
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
499
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
500
# unique id so that the client can identity which CephBrokerRsp is associated
501
# with the request. Ceph will also respond to each client unit individually
502
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
503
# via key broker-rsp-glance-0
505
# To use this the charm can just do something like:
507
# from charmhelpers.contrib.storage.linux.ceph import (
508
# send_request_if_needed,
509
# is_request_complete,
513
# @hooks.hook('ceph-relation-changed')
514
# def ceph_changed():
515
# rq = CephBrokerRq()
516
# rq.add_op_create_pool(name='poolname', replica_count=3)
518
# if is_request_complete(rq):
519
# <Request complete actions>
521
# send_request_if_needed(get_ceph_request())
523
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
524
# of glance having sent a request to ceph which ceph has successfully processed
528
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
529
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
530
# 'ceph-public-address': '10.5.44.103',
531
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
532
# 'private-address': '10.5.44.103',
535
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
536
# '"ops": [{"replicas": 3, "name": "glance", '
537
# '"op": "create-pool"}]}'),
538
# 'private-address': '10.5.44.109',
542
def get_previous_request(rid):
543
"""Return the last ceph broker request sent on a given relation
545
@param rid: Relation id to query for request
548
broker_req = relation_get(attribute='broker_req', rid=rid,
551
request_data = json.loads(broker_req)
552
request = CephBrokerRq(api_version=request_data['api-version'],
553
request_id=request_data['request-id'])
554
request.set_ops(request_data['ops'])
559
def get_request_states(request, relation='ceph'):
560
"""Return a dict of requests per relation id with their corresponding
563
This allows a charm, which has a request for ceph, to see whether there is
564
an equivalent request already being processed and if so what state that
567
@param request: A CephBrokerRq object
571
for rid in relation_ids(relation):
573
previous_request = get_previous_request(rid)
574
if request == previous_request:
576
complete = is_request_complete_for_rid(previous_request, rid)
583
'complete': complete,
589
def is_request_sent(request, relation='ceph'):
590
"""Check to see if a functionally equivalent request has already been sent
592
Returns True if a similair request has been sent
594
@param request: A CephBrokerRq object
596
states = get_request_states(request, relation=relation)
597
for rid in states.keys():
598
if not states[rid]['sent']:
604
def is_request_complete(request, relation='ceph'):
605
"""Check to see if a functionally equivalent request has already been
608
Returns True if a similair request has been completed
610
@param request: A CephBrokerRq object
612
states = get_request_states(request, relation=relation)
613
for rid in states.keys():
614
if not states[rid]['complete']:
620
def is_request_complete_for_rid(request, rid):
621
"""Check if a given request has been completed on the given relation
623
@param request: A CephBrokerRq object
624
@param rid: Relation ID
626
broker_key = get_broker_rsp_key()
627
for unit in related_units(rid):
628
rdata = relation_get(rid=rid, unit=unit)
629
if rdata.get(broker_key):
630
rsp = CephBrokerRsp(rdata.get(broker_key))
631
if rsp.request_id == request.request_id:
632
if not rsp.exit_code:
635
# The remote unit sent no reply targeted at this unit so either the
636
# remote ceph cluster does not support unit targeted replies or it
637
# has not processed our request yet.
638
if rdata.get('broker_rsp'):
639
request_data = json.loads(rdata['broker_rsp'])
640
if request_data.get('request-id'):
641
log('Ignoring legacy broker_rsp without unit key as remote '
642
'service supports unit specific replies', level=DEBUG)
644
log('Using legacy broker_rsp as remote service does not '
645
'supports unit specific replies', level=DEBUG)
646
rsp = CephBrokerRsp(rdata['broker_rsp'])
647
if not rsp.exit_code:
653
def get_broker_rsp_key():
654
"""Return broker response key for this unit
656
This is the key that ceph is going to use to pass request status
657
information back to this unit
659
return 'broker-rsp-' + local_unit().replace('/', '-')
662
def send_request_if_needed(request, relation='ceph'):
663
"""Send broker request if an equivalent request has not already been sent
665
@param request: A CephBrokerRq object
667
if is_request_sent(request, relation=relation):
668
log('Request already sent but not complete, not sending new request',
671
for rid in relation_ids(relation):
672
log('Sending request {}'.format(request.request_id), level=DEBUG)
673
relation_set(relation_id=rid, broker_req=request.request)