412
406
The API is versioned and defaults to version 1.
414
def __init__(self, api_version=1):
408
def __init__(self, api_version=1, request_id=None):
415
409
self.api_version = api_version
411
self.request_id = request_id
413
self.request_id = str(uuid.uuid1())
418
416
def add_op_create_pool(self, name, replica_count=3):
419
417
self.ops.append({'op': 'create-pool', 'name': name,
420
418
'replicas': replica_count})
420
def set_ops(self, ops):
421
"""Set request ops to provided value.
423
Useful for injecting ops that come from a previous request
424
to allow comparisons to ensure validity.
423
429
def request(self):
424
return json.dumps({'api-version': self.api_version, 'ops': self.ops})
430
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
431
'request-id': self.request_id})
433
def _ops_equal(self, other):
434
if len(self.ops) == len(other.ops):
435
for req_no in range(0, len(self.ops)):
436
for key in ['replicas', 'name', 'op']:
437
if self.ops[req_no][key] != other.ops[req_no][key]:
443
def __eq__(self, other):
444
if not isinstance(other, self.__class__):
446
if self.api_version == other.api_version and \
447
self._ops_equal(other):
452
def __ne__(self, other):
453
return not self.__eq__(other)
427
456
class CephBrokerRsp(object):
432
461
The API is versioned and defaults to version 1.
434
464
def __init__(self, encoded_rsp):
435
465
self.api_version = None
436
466
self.rsp = json.loads(encoded_rsp)
469
def request_id(self):
470
return self.rsp.get('request-id')
439
473
def exit_code(self):
440
474
return self.rsp.get('exit-code')
443
477
def exit_msg(self):
444
478
return self.rsp.get('stderr')
481
# Ceph Broker Conversation:
482
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
483
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
484
# unique id so that the client can identity which CephBrokerRsp is associated
485
# with the request. Ceph will also respond to each client unit individually
486
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
487
# via key broker-rsp-glance-0
489
# To use this the charm can just do something like:
491
# from charmhelpers.contrib.storage.linux.ceph import (
492
# send_request_if_needed,
493
# is_request_complete,
497
# @hooks.hook('ceph-relation-changed')
498
# def ceph_changed():
499
# rq = CephBrokerRq()
500
# rq.add_op_create_pool(name='poolname', replica_count=3)
502
# if is_request_complete(rq):
503
# <Request complete actions>
505
# send_request_if_needed(get_ceph_request())
507
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
508
# of glance having sent a request to ceph which ceph has successfully processed
512
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
513
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
514
# 'ceph-public-address': '10.5.44.103',
515
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
516
# 'private-address': '10.5.44.103',
519
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
520
# '"ops": [{"replicas": 3, "name": "glance", '
521
# '"op": "create-pool"}]}'),
522
# 'private-address': '10.5.44.109',
526
def get_previous_request(rid):
527
"""Return the last ceph broker request sent on a given relation
529
@param rid: Relation id to query for request
532
broker_req = relation_get(attribute='broker_req', rid=rid,
535
request_data = json.loads(broker_req)
536
request = CephBrokerRq(api_version=request_data['api-version'],
537
request_id=request_data['request-id'])
538
request.set_ops(request_data['ops'])
543
def get_request_states(request):
544
"""Return a dict of requests per relation id with their corresponding
547
This allows a charm, which has a request for ceph, to see whether there is
548
an equivalent request already being processed and if so what state that
551
@param request: A CephBrokerRq object
555
for rid in relation_ids('ceph'):
557
previous_request = get_previous_request(rid)
558
if request == previous_request:
560
complete = is_request_complete_for_rid(previous_request, rid)
567
'complete': complete,
573
def is_request_sent(request):
574
"""Check to see if a functionally equivalent request has already been sent
576
Returns True if a similair request has been sent
578
@param request: A CephBrokerRq object
580
states = get_request_states(request)
581
for rid in states.keys():
582
if not states[rid]['sent']:
588
def is_request_complete(request):
589
"""Check to see if a functionally equivalent request has already been
592
Returns True if a similair request has been completed
594
@param request: A CephBrokerRq object
596
states = get_request_states(request)
597
for rid in states.keys():
598
if not states[rid]['complete']:
604
def is_request_complete_for_rid(request, rid):
605
"""Check if a given request has been completed on the given relation
607
@param request: A CephBrokerRq object
608
@param rid: Relation ID
610
broker_key = get_broker_rsp_key()
611
for unit in related_units(rid):
612
rdata = relation_get(rid=rid, unit=unit)
613
if rdata.get(broker_key):
614
rsp = CephBrokerRsp(rdata.get(broker_key))
615
if rsp.request_id == request.request_id:
616
if not rsp.exit_code:
619
# The remote unit sent no reply targeted at this unit so either the
620
# remote ceph cluster does not support unit targeted replies or it
621
# has not processed our request yet.
622
if rdata.get('broker_rsp'):
623
request_data = json.loads(rdata['broker_rsp'])
624
if request_data.get('request-id'):
625
log('Ignoring legacy broker_rsp without unit key as remote '
626
'service supports unit specific replies', level=DEBUG)
628
log('Using legacy broker_rsp as remote service does not '
629
'supports unit specific replies', level=DEBUG)
630
rsp = CephBrokerRsp(rdata['broker_rsp'])
631
if not rsp.exit_code:
637
def get_broker_rsp_key():
638
"""Return broker response key for this unit
640
This is the key that ceph is going to use to pass request status
641
information back to this unit
643
return 'broker-rsp-' + local_unit().replace('/', '-')
646
def send_request_if_needed(request):
647
"""Send broker request if an equivalent request has not already been sent
649
@param request: A CephBrokerRq object
651
if is_request_sent(request):
652
log('Request already sent but not complete, not sending new request',
655
for rid in relation_ids('ceph'):
656
log('Sending request {}'.format(request.request_id), level=DEBUG)
657
relation_set(relation_id=rid, broker_req=request.request)