~gnuoy/charms/trusty/odl-controller/amulet

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Liam Young
  • Date: 2015-11-03 17:23:59 UTC
  • Revision ID: liam.young@canonical.com-20151103172359-74a8n5a0mo05iv07
First stab at amulet

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
import shutil
29
29
import json
30
30
import time
 
31
import uuid
31
32
 
32
33
from subprocess import (
33
34
    check_call,
35
36
    CalledProcessError,
36
37
)
37
38
from charmhelpers.core.hookenv import (
 
39
    local_unit,
38
40
    relation_get,
39
41
    relation_ids,
 
42
    relation_set,
40
43
    related_units,
41
44
    log,
42
45
    DEBUG,
56
59
    apt_install,
57
60
)
58
61
 
 
62
from charmhelpers.core.kernel import modprobe
 
63
 
59
64
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
60
65
KEYFILE = '/etc/ceph/ceph.client.{}.key'
61
66
 
288
293
    os.chown(data_src_dst, uid, gid)
289
294
 
290
295
 
291
 
# TODO: re-use
292
 
def modprobe(module):
293
 
    """Load a kernel module and configure for auto-load on reboot."""
294
 
    log('Loading kernel module', level=INFO)
295
 
    cmd = ['modprobe', module]
296
 
    check_call(cmd)
297
 
    with open('/etc/modules', 'r+') as modules:
298
 
        if module not in modules.read():
299
 
            modules.write(module)
300
 
 
301
 
 
302
296
def copy_files(src, dst, symlinks=False, ignore=None):
303
297
    """Copy files from src to dst."""
304
298
    for item in os.listdir(src):
411
405
 
412
406
    The API is versioned and defaults to version 1.
413
407
    """
414
 
    def __init__(self, api_version=1):
 
408
    def __init__(self, api_version=1, request_id=None):
415
409
        self.api_version = api_version
 
410
        if request_id:
 
411
            self.request_id = request_id
 
412
        else:
 
413
            self.request_id = str(uuid.uuid1())
416
414
        self.ops = []
417
415
 
418
416
    def add_op_create_pool(self, name, replica_count=3):
419
417
        self.ops.append({'op': 'create-pool', 'name': name,
420
418
                         'replicas': replica_count})
421
419
 
 
420
    def set_ops(self, ops):
 
421
        """Set request ops to provided value.
 
422
 
 
423
        Useful for injecting ops that come from a previous request
 
424
        to allow comparisons to ensure validity.
 
425
        """
 
426
        self.ops = ops
 
427
 
422
428
    @property
423
429
    def request(self):
424
 
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
430
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 
431
                           'request-id': self.request_id})
 
432
 
 
433
    def _ops_equal(self, other):
 
434
        if len(self.ops) == len(other.ops):
 
435
            for req_no in range(0, len(self.ops)):
 
436
                for key in ['replicas', 'name', 'op']:
 
437
                    if self.ops[req_no][key] != other.ops[req_no][key]:
 
438
                        return False
 
439
        else:
 
440
            return False
 
441
        return True
 
442
 
 
443
    def __eq__(self, other):
 
444
        if not isinstance(other, self.__class__):
 
445
            return False
 
446
        if self.api_version == other.api_version and \
 
447
                self._ops_equal(other):
 
448
            return True
 
449
        else:
 
450
            return False
 
451
 
 
452
    def __ne__(self, other):
 
453
        return not self.__eq__(other)
425
454
 
426
455
 
427
456
class CephBrokerRsp(object):
431
460
 
432
461
    The API is versioned and defaults to version 1.
433
462
    """
 
463
 
434
464
    def __init__(self, encoded_rsp):
435
465
        self.api_version = None
436
466
        self.rsp = json.loads(encoded_rsp)
437
467
 
438
468
    @property
 
469
    def request_id(self):
 
470
        return self.rsp.get('request-id')
 
471
 
 
472
    @property
439
473
    def exit_code(self):
440
474
        return self.rsp.get('exit-code')
441
475
 
442
476
    @property
443
477
    def exit_msg(self):
444
478
        return self.rsp.get('stderr')
 
479
 
 
480
 
 
481
# Ceph Broker Conversation:
 
482
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 
483
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 
484
# unique id so that the client can identity which CephBrokerRsp is associated
 
485
# with the request. Ceph will also respond to each client unit individually
 
486
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 
487
# via key broker-rsp-glance-0
 
488
#
 
489
# To use this the charm can just do something like:
 
490
#
 
491
# from charmhelpers.contrib.storage.linux.ceph import (
 
492
#     send_request_if_needed,
 
493
#     is_request_complete,
 
494
#     CephBrokerRq,
 
495
# )
 
496
#
 
497
# @hooks.hook('ceph-relation-changed')
 
498
# def ceph_changed():
 
499
#     rq = CephBrokerRq()
 
500
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 
501
#
 
502
#     if is_request_complete(rq):
 
503
#         <Request complete actions>
 
504
#     else:
 
505
#         send_request_if_needed(get_ceph_request())
 
506
#
 
507
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 
508
# of glance having sent a request to ceph which ceph has successfully processed
 
509
#  'ceph:8': {
 
510
#      'ceph/0': {
 
511
#          'auth': 'cephx',
 
512
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 
513
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 
514
#          'ceph-public-address': '10.5.44.103',
 
515
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 
516
#          'private-address': '10.5.44.103',
 
517
#      },
 
518
#      'glance/0': {
 
519
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 
520
#                         '"ops": [{"replicas": 3, "name": "glance", '
 
521
#                         '"op": "create-pool"}]}'),
 
522
#          'private-address': '10.5.44.109',
 
523
#      },
 
524
#  }
 
525
 
 
526
def get_previous_request(rid):
 
527
    """Return the last ceph broker request sent on a given relation
 
528
 
 
529
    @param rid: Relation id to query for request
 
530
    """
 
531
    request = None
 
532
    broker_req = relation_get(attribute='broker_req', rid=rid,
 
533
                              unit=local_unit())
 
534
    if broker_req:
 
535
        request_data = json.loads(broker_req)
 
536
        request = CephBrokerRq(api_version=request_data['api-version'],
 
537
                               request_id=request_data['request-id'])
 
538
        request.set_ops(request_data['ops'])
 
539
 
 
540
    return request
 
541
 
 
542
 
 
543
def get_request_states(request):
 
544
    """Return a dict of requests per relation id with their corresponding
 
545
       completion state.
 
546
 
 
547
    This allows a charm, which has a request for ceph, to see whether there is
 
548
    an equivalent request already being processed and if so what state that
 
549
    request is in.
 
550
 
 
551
    @param request: A CephBrokerRq object
 
552
    """
 
553
    complete = []
 
554
    requests = {}
 
555
    for rid in relation_ids('ceph'):
 
556
        complete = False
 
557
        previous_request = get_previous_request(rid)
 
558
        if request == previous_request:
 
559
            sent = True
 
560
            complete = is_request_complete_for_rid(previous_request, rid)
 
561
        else:
 
562
            sent = False
 
563
            complete = False
 
564
 
 
565
        requests[rid] = {
 
566
            'sent': sent,
 
567
            'complete': complete,
 
568
        }
 
569
 
 
570
    return requests
 
571
 
 
572
 
 
573
def is_request_sent(request):
 
574
    """Check to see if a functionally equivalent request has already been sent
 
575
 
 
576
    Returns True if a similair request has been sent
 
577
 
 
578
    @param request: A CephBrokerRq object
 
579
    """
 
580
    states = get_request_states(request)
 
581
    for rid in states.keys():
 
582
        if not states[rid]['sent']:
 
583
            return False
 
584
 
 
585
    return True
 
586
 
 
587
 
 
588
def is_request_complete(request):
 
589
    """Check to see if a functionally equivalent request has already been
 
590
    completed
 
591
 
 
592
    Returns True if a similair request has been completed
 
593
 
 
594
    @param request: A CephBrokerRq object
 
595
    """
 
596
    states = get_request_states(request)
 
597
    for rid in states.keys():
 
598
        if not states[rid]['complete']:
 
599
            return False
 
600
 
 
601
    return True
 
602
 
 
603
 
 
604
def is_request_complete_for_rid(request, rid):
 
605
    """Check if a given request has been completed on the given relation
 
606
 
 
607
    @param request: A CephBrokerRq object
 
608
    @param rid: Relation ID
 
609
    """
 
610
    broker_key = get_broker_rsp_key()
 
611
    for unit in related_units(rid):
 
612
        rdata = relation_get(rid=rid, unit=unit)
 
613
        if rdata.get(broker_key):
 
614
            rsp = CephBrokerRsp(rdata.get(broker_key))
 
615
            if rsp.request_id == request.request_id:
 
616
                if not rsp.exit_code:
 
617
                    return True
 
618
        else:
 
619
            # The remote unit sent no reply targeted at this unit so either the
 
620
            # remote ceph cluster does not support unit targeted replies or it
 
621
            # has not processed our request yet.
 
622
            if rdata.get('broker_rsp'):
 
623
                request_data = json.loads(rdata['broker_rsp'])
 
624
                if request_data.get('request-id'):
 
625
                    log('Ignoring legacy broker_rsp without unit key as remote '
 
626
                        'service supports unit specific replies', level=DEBUG)
 
627
                else:
 
628
                    log('Using legacy broker_rsp as remote service does not '
 
629
                        'supports unit specific replies', level=DEBUG)
 
630
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 
631
                    if not rsp.exit_code:
 
632
                        return True
 
633
 
 
634
    return False
 
635
 
 
636
 
 
637
def get_broker_rsp_key():
 
638
    """Return broker response key for this unit
 
639
 
 
640
    This is the key that ceph is going to use to pass request status
 
641
    information back to this unit
 
642
    """
 
643
    return 'broker-rsp-' + local_unit().replace('/', '-')
 
644
 
 
645
 
 
646
def send_request_if_needed(request):
 
647
    """Send broker request if an equivalent request has not already been sent
 
648
 
 
649
    @param request: A CephBrokerRq object
 
650
    """
 
651
    if is_request_sent(request):
 
652
        log('Request already sent but not complete, not sending new request',
 
653
            level=DEBUG)
 
654
    else:
 
655
        for rid in relation_ids('ceph'):
 
656
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 
657
            relation_set(relation_id=rid, broker_req=request.request)