~openstack-charmers-next/charms/trusty/cisco-vpp/trunk

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Liam Young
  • Date: 2015-12-02 10:33:20 UTC
  • mfrom: (115.1.9 dhcp)
  • Revision ID: liam.young@canonical.com-20151202103320-0cjg1te17yovqyu3
[gnuoy, r=james-page] Adds support for serving dhcp and metadata requests to guests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
 
27
27
import os
28
28
import shutil
 
29
import six
29
30
import json
30
31
import time
 
32
import uuid
31
33
 
32
34
from subprocess import (
33
35
    check_call,
35
37
    CalledProcessError,
36
38
)
37
39
from charmhelpers.core.hookenv import (
 
40
    local_unit,
38
41
    relation_get,
39
42
    relation_ids,
 
43
    relation_set,
40
44
    related_units,
41
45
    log,
42
46
    DEBUG,
56
60
    apt_install,
57
61
)
58
62
 
 
63
from charmhelpers.core.kernel import modprobe
 
64
 
59
65
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
60
66
KEYFILE = '/etc/ceph/ceph.client.{}.key'
61
67
 
62
68
CEPH_CONF = """[global]
63
 
 auth supported = {auth}
64
 
 keyring = {keyring}
65
 
 mon host = {mon_hosts}
66
 
 log to syslog = {use_syslog}
67
 
 err to syslog = {use_syslog}
68
 
 clog to syslog = {use_syslog}
 
69
auth supported = {auth}
 
70
keyring = {keyring}
 
71
mon host = {mon_hosts}
 
72
log to syslog = {use_syslog}
 
73
err to syslog = {use_syslog}
 
74
clog to syslog = {use_syslog}
69
75
"""
70
76
 
71
77
 
120
126
    return None
121
127
 
122
128
 
123
 
def create_pool(service, name, replicas=3):
 
129
def update_pool(client, pool, settings):
 
130
    cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
 
131
    for k, v in six.iteritems(settings):
 
132
        cmd.append(k)
 
133
        cmd.append(v)
 
134
 
 
135
    check_call(cmd)
 
136
 
 
137
 
 
138
def create_pool(service, name, replicas=3, pg_num=None):
124
139
    """Create a new RADOS pool."""
125
140
    if pool_exists(service, name):
126
141
        log("Ceph pool {} already exists, skipping creation".format(name),
127
142
            level=WARNING)
128
143
        return
129
144
 
130
 
    # Calculate the number of placement groups based
131
 
    # on upstream recommended best practices.
132
 
    osds = get_osds(service)
133
 
    if osds:
134
 
        pgnum = (len(osds) * 100 // replicas)
135
 
    else:
136
 
        # NOTE(james-page): Default to 200 for older ceph versions
137
 
        # which don't support OSD query from cli
138
 
        pgnum = 200
139
 
 
140
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
141
 
    check_call(cmd)
142
 
 
143
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
144
 
           str(replicas)]
145
 
    check_call(cmd)
 
145
    if not pg_num:
 
146
        # Calculate the number of placement groups based
 
147
        # on upstream recommended best practices.
 
148
        osds = get_osds(service)
 
149
        if osds:
 
150
            pg_num = (len(osds) * 100 // replicas)
 
151
        else:
 
152
            # NOTE(james-page): Default to 200 for older ceph versions
 
153
            # which don't support OSD query from cli
 
154
            pg_num = 200
 
155
 
 
156
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
 
157
    check_call(cmd)
 
158
 
 
159
    update_pool(service, name, settings={'size': str(replicas)})
146
160
 
147
161
 
148
162
def delete_pool(service, name):
197
211
    log('Created new keyfile at %s.' % keyfile, level=INFO)
198
212
 
199
213
 
200
 
def get_ceph_nodes():
201
 
    """Query named relation 'ceph' to determine current nodes."""
 
214
def get_ceph_nodes(relation='ceph'):
 
215
    """Query named relation to determine current nodes."""
202
216
    hosts = []
203
 
    for r_id in relation_ids('ceph'):
 
217
    for r_id in relation_ids(relation):
204
218
        for unit in related_units(r_id):
205
219
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
206
220
 
288
302
    os.chown(data_src_dst, uid, gid)
289
303
 
290
304
 
291
 
# TODO: re-use
292
 
def modprobe(module):
293
 
    """Load a kernel module and configure for auto-load on reboot."""
294
 
    log('Loading kernel module', level=INFO)
295
 
    cmd = ['modprobe', module]
296
 
    check_call(cmd)
297
 
    with open('/etc/modules', 'r+') as modules:
298
 
        if module not in modules.read():
299
 
            modules.write(module)
300
 
 
301
 
 
302
305
def copy_files(src, dst, symlinks=False, ignore=None):
303
306
    """Copy files from src to dst."""
304
307
    for item in os.listdir(src):
363
366
            service_start(svc)
364
367
 
365
368
 
366
 
def ensure_ceph_keyring(service, user=None, group=None):
 
369
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
367
370
    """Ensures a ceph keyring is created for a named service and optionally
368
371
    ensures user and group ownership.
369
372
 
370
373
    Returns False if no ceph key is available in relation state.
371
374
    """
372
375
    key = None
373
 
    for rid in relation_ids('ceph'):
 
376
    for rid in relation_ids(relation):
374
377
        for unit in related_units(rid):
375
378
            key = relation_get('key', rid=rid, unit=unit)
376
379
            if key:
411
414
 
412
415
    The API is versioned and defaults to version 1.
413
416
    """
414
 
    def __init__(self, api_version=1):
 
417
    def __init__(self, api_version=1, request_id=None):
415
418
        self.api_version = api_version
 
419
        if request_id:
 
420
            self.request_id = request_id
 
421
        else:
 
422
            self.request_id = str(uuid.uuid1())
416
423
        self.ops = []
417
424
 
418
 
    def add_op_create_pool(self, name, replica_count=3):
 
425
    def add_op_create_pool(self, name, replica_count=3, pg_num=None):
 
426
        """Adds an operation to create a pool.
 
427
 
 
428
        @param pg_num setting:  optional setting. If not provided, this value
 
429
        will be calculated by the broker based on how many OSDs are in the
 
430
        cluster at the time of creation. Note that, if provided, this value
 
431
        will be capped at the current available maximum.
 
432
        """
419
433
        self.ops.append({'op': 'create-pool', 'name': name,
420
 
                         'replicas': replica_count})
 
434
                         'replicas': replica_count, 'pg_num': pg_num})
 
435
 
 
436
    def set_ops(self, ops):
 
437
        """Set request ops to provided value.
 
438
 
 
439
        Useful for injecting ops that come from a previous request
 
440
        to allow comparisons to ensure validity.
 
441
        """
 
442
        self.ops = ops
421
443
 
422
444
    @property
423
445
    def request(self):
424
 
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
446
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 
447
                           'request-id': self.request_id})
 
448
 
 
449
    def _ops_equal(self, other):
 
450
        if len(self.ops) == len(other.ops):
 
451
            for req_no in range(0, len(self.ops)):
 
452
                for key in ['replicas', 'name', 'op', 'pg_num']:
 
453
                    if self.ops[req_no].get(key) != other.ops[req_no].get(key):
 
454
                        return False
 
455
        else:
 
456
            return False
 
457
        return True
 
458
 
 
459
    def __eq__(self, other):
 
460
        if not isinstance(other, self.__class__):
 
461
            return False
 
462
        if self.api_version == other.api_version and \
 
463
                self._ops_equal(other):
 
464
            return True
 
465
        else:
 
466
            return False
 
467
 
 
468
    def __ne__(self, other):
 
469
        return not self.__eq__(other)
425
470
 
426
471
 
427
472
class CephBrokerRsp(object):
431
476
 
432
477
    The API is versioned and defaults to version 1.
433
478
    """
 
479
 
434
480
    def __init__(self, encoded_rsp):
435
481
        self.api_version = None
436
482
        self.rsp = json.loads(encoded_rsp)
437
483
 
438
484
    @property
 
485
    def request_id(self):
 
486
        return self.rsp.get('request-id')
 
487
 
 
488
    @property
439
489
    def exit_code(self):
440
490
        return self.rsp.get('exit-code')
441
491
 
442
492
    @property
443
493
    def exit_msg(self):
444
494
        return self.rsp.get('stderr')
 
495
 
 
496
 
 
497
# Ceph Broker Conversation:
 
498
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 
499
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 
500
# unique id so that the client can identity which CephBrokerRsp is associated
 
501
# with the request. Ceph will also respond to each client unit individually
 
502
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 
503
# via key broker-rsp-glance-0
 
504
#
 
505
# To use this the charm can just do something like:
 
506
#
 
507
# from charmhelpers.contrib.storage.linux.ceph import (
 
508
#     send_request_if_needed,
 
509
#     is_request_complete,
 
510
#     CephBrokerRq,
 
511
# )
 
512
#
 
513
# @hooks.hook('ceph-relation-changed')
 
514
# def ceph_changed():
 
515
#     rq = CephBrokerRq()
 
516
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 
517
#
 
518
#     if is_request_complete(rq):
 
519
#         <Request complete actions>
 
520
#     else:
 
521
#         send_request_if_needed(get_ceph_request())
 
522
#
 
523
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 
524
# of glance having sent a request to ceph which ceph has successfully processed
 
525
#  'ceph:8': {
 
526
#      'ceph/0': {
 
527
#          'auth': 'cephx',
 
528
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 
529
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 
530
#          'ceph-public-address': '10.5.44.103',
 
531
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 
532
#          'private-address': '10.5.44.103',
 
533
#      },
 
534
#      'glance/0': {
 
535
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 
536
#                         '"ops": [{"replicas": 3, "name": "glance", '
 
537
#                         '"op": "create-pool"}]}'),
 
538
#          'private-address': '10.5.44.109',
 
539
#      },
 
540
#  }
 
541
 
 
542
def get_previous_request(rid):
 
543
    """Return the last ceph broker request sent on a given relation
 
544
 
 
545
    @param rid: Relation id to query for request
 
546
    """
 
547
    request = None
 
548
    broker_req = relation_get(attribute='broker_req', rid=rid,
 
549
                              unit=local_unit())
 
550
    if broker_req:
 
551
        request_data = json.loads(broker_req)
 
552
        request = CephBrokerRq(api_version=request_data['api-version'],
 
553
                               request_id=request_data['request-id'])
 
554
        request.set_ops(request_data['ops'])
 
555
 
 
556
    return request
 
557
 
 
558
 
 
559
def get_request_states(request, relation='ceph'):
 
560
    """Return a dict of requests per relation id with their corresponding
 
561
       completion state.
 
562
 
 
563
    This allows a charm, which has a request for ceph, to see whether there is
 
564
    an equivalent request already being processed and if so what state that
 
565
    request is in.
 
566
 
 
567
    @param request: A CephBrokerRq object
 
568
    """
 
569
    complete = []
 
570
    requests = {}
 
571
    for rid in relation_ids(relation):
 
572
        complete = False
 
573
        previous_request = get_previous_request(rid)
 
574
        if request == previous_request:
 
575
            sent = True
 
576
            complete = is_request_complete_for_rid(previous_request, rid)
 
577
        else:
 
578
            sent = False
 
579
            complete = False
 
580
 
 
581
        requests[rid] = {
 
582
            'sent': sent,
 
583
            'complete': complete,
 
584
        }
 
585
 
 
586
    return requests
 
587
 
 
588
 
 
589
def is_request_sent(request, relation='ceph'):
 
590
    """Check to see if a functionally equivalent request has already been sent
 
591
 
 
592
    Returns True if a similair request has been sent
 
593
 
 
594
    @param request: A CephBrokerRq object
 
595
    """
 
596
    states = get_request_states(request, relation=relation)
 
597
    for rid in states.keys():
 
598
        if not states[rid]['sent']:
 
599
            return False
 
600
 
 
601
    return True
 
602
 
 
603
 
 
604
def is_request_complete(request, relation='ceph'):
 
605
    """Check to see if a functionally equivalent request has already been
 
606
    completed
 
607
 
 
608
    Returns True if a similair request has been completed
 
609
 
 
610
    @param request: A CephBrokerRq object
 
611
    """
 
612
    states = get_request_states(request, relation=relation)
 
613
    for rid in states.keys():
 
614
        if not states[rid]['complete']:
 
615
            return False
 
616
 
 
617
    return True
 
618
 
 
619
 
 
620
def is_request_complete_for_rid(request, rid):
 
621
    """Check if a given request has been completed on the given relation
 
622
 
 
623
    @param request: A CephBrokerRq object
 
624
    @param rid: Relation ID
 
625
    """
 
626
    broker_key = get_broker_rsp_key()
 
627
    for unit in related_units(rid):
 
628
        rdata = relation_get(rid=rid, unit=unit)
 
629
        if rdata.get(broker_key):
 
630
            rsp = CephBrokerRsp(rdata.get(broker_key))
 
631
            if rsp.request_id == request.request_id:
 
632
                if not rsp.exit_code:
 
633
                    return True
 
634
        else:
 
635
            # The remote unit sent no reply targeted at this unit so either the
 
636
            # remote ceph cluster does not support unit targeted replies or it
 
637
            # has not processed our request yet.
 
638
            if rdata.get('broker_rsp'):
 
639
                request_data = json.loads(rdata['broker_rsp'])
 
640
                if request_data.get('request-id'):
 
641
                    log('Ignoring legacy broker_rsp without unit key as remote '
 
642
                        'service supports unit specific replies', level=DEBUG)
 
643
                else:
 
644
                    log('Using legacy broker_rsp as remote service does not '
 
645
                        'supports unit specific replies', level=DEBUG)
 
646
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 
647
                    if not rsp.exit_code:
 
648
                        return True
 
649
 
 
650
    return False
 
651
 
 
652
 
 
653
def get_broker_rsp_key():
 
654
    """Return broker response key for this unit
 
655
 
 
656
    This is the key that ceph is going to use to pass request status
 
657
    information back to this unit
 
658
    """
 
659
    return 'broker-rsp-' + local_unit().replace('/', '-')
 
660
 
 
661
 
 
662
def send_request_if_needed(request, relation='ceph'):
 
663
    """Send broker request if an equivalent request has not already been sent
 
664
 
 
665
    @param request: A CephBrokerRq object
 
666
    """
 
667
    if is_request_sent(request, relation=relation):
 
668
        log('Request already sent but not complete, not sending new request',
 
669
            level=DEBUG)
 
670
    else:
 
671
        for rid in relation_ids(relation):
 
672
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 
673
            relation_set(relation_id=rid, broker_req=request.request)