~objectif-libre/charms/trusty/cloudkitty/trunk

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Luka Peschke
  • Date: 2016-11-07 14:54:24 UTC
  • Revision ID: luka.peschke@epitech.eu-20161107145424-3tcerejh6444ih3r
This charm is now using CloudKitty's mitaka release

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright 2014-2015 Canonical Limited.
2
2
#
3
 
# This file is part of charm-helpers.
4
 
#
5
 
# charm-helpers is free software: you can redistribute it and/or modify
6
 
# it under the terms of the GNU Lesser General Public License version 3 as
7
 
# published by the Free Software Foundation.
8
 
#
9
 
# charm-helpers is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU Lesser General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU Lesser General Public License
15
 
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#  http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
16
14
 
17
15
#
18
16
# Copyright 2012 Canonical Ltd.
24
22
#  Adam Gandelman <adamg@ubuntu.com>
25
23
#
26
24
 
 
25
import errno
 
26
import hashlib
 
27
import math
 
28
import six
 
29
 
27
30
import os
28
31
import shutil
29
32
import json
30
33
import time
 
34
import uuid
31
35
 
32
36
from subprocess import (
33
37
    check_call,
35
39
    CalledProcessError,
36
40
)
37
41
from charmhelpers.core.hookenv import (
 
42
    config,
 
43
    local_unit,
38
44
    relation_get,
39
45
    relation_ids,
 
46
    relation_set,
40
47
    related_units,
41
48
    log,
42
49
    DEBUG,
56
63
    apt_install,
57
64
)
58
65
 
 
66
from charmhelpers.core.kernel import modprobe
 
67
from charmhelpers.contrib.openstack.utils import config_flags_parser
 
68
 
59
69
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
60
70
KEYFILE = '/etc/ceph/ceph.client.{}.key'
61
71
 
68
78
clog to syslog = {use_syslog}
69
79
"""
70
80
 
 
81
# The number of placement groups per OSD to target for placement group
 
82
# calculations. This number is chosen as 100 due to the ceph PG Calc
 
83
# documentation recommending to choose 100 for clusters which are not
 
84
# expected to increase in the foreseeable future. Since the majority of the
 
85
# calculations are done on deployment, target the case of non-expanding
 
86
# clusters as the default.
 
87
DEFAULT_PGS_PER_OSD_TARGET = 100
 
88
DEFAULT_POOL_WEIGHT = 10.0
 
89
LEGACY_PG_COUNT = 200
 
90
DEFAULT_MINIMUM_PGS = 2
 
91
 
 
92
 
 
93
def validator(value, valid_type, valid_range=None):
 
94
    """
 
95
    Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
 
96
    Example input:
 
97
        validator(value=1,
 
98
                  valid_type=int,
 
99
                  valid_range=[0, 2])
 
100
    This says I'm testing value=1.  It must be an int inclusive in [0,2]
 
101
 
 
102
    :param value: The value to validate
 
103
    :param valid_type: The type that value should be.
 
104
    :param valid_range: A range of values that value can assume.
 
105
    :return:
 
106
    """
 
107
    assert isinstance(value, valid_type), "{} is not a {}".format(
 
108
        value,
 
109
        valid_type)
 
110
    if valid_range is not None:
 
111
        assert isinstance(valid_range, list), \
 
112
            "valid_range must be a list, was given {}".format(valid_range)
 
113
        # If we're dealing with strings
 
114
        if valid_type is six.string_types:
 
115
            assert value in valid_range, \
 
116
                "{} is not in the list {}".format(value, valid_range)
 
117
        # Integer, float should have a min and max
 
118
        else:
 
119
            if len(valid_range) != 2:
 
120
                raise ValueError(
 
121
                    "Invalid valid_range list of {} for {}.  "
 
122
                    "List must be [min,max]".format(valid_range, value))
 
123
            assert value >= valid_range[0], \
 
124
                "{} is less than minimum allowed value of {}".format(
 
125
                    value, valid_range[0])
 
126
            assert value <= valid_range[1], \
 
127
                "{} is greater than maximum allowed value of {}".format(
 
128
                    value, valid_range[1])
 
129
 
 
130
 
 
131
class PoolCreationError(Exception):
 
132
    """
 
133
    A custom error to inform the caller that a pool creation failed.  Provides an error message
 
134
    """
 
135
 
 
136
    def __init__(self, message):
 
137
        super(PoolCreationError, self).__init__(message)
 
138
 
 
139
 
 
140
class Pool(object):
 
141
    """
 
142
    An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
 
143
    Do not call create() on this base class as it will not do anything.  Instantiate a child class and call create().
 
144
    """
 
145
 
 
146
    def __init__(self, service, name):
 
147
        self.service = service
 
148
        self.name = name
 
149
 
 
150
    # Create the pool if it doesn't exist already
 
151
    # To be implemented by subclasses
 
152
    def create(self):
 
153
        pass
 
154
 
 
155
    def add_cache_tier(self, cache_pool, mode):
 
156
        """
 
157
        Adds a new cache tier to an existing pool.
 
158
        :param cache_pool: six.string_types.  The cache tier pool name to add.
 
159
        :param mode: six.string_types. The caching mode to use for this pool.  valid range = ["readonly", "writeback"]
 
160
        :return: None
 
161
        """
 
162
        # Check the input types and values
 
163
        validator(value=cache_pool, valid_type=six.string_types)
 
164
        validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
 
165
 
 
166
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
 
167
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
 
168
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
 
169
        check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
 
170
 
 
171
    def remove_cache_tier(self, cache_pool):
 
172
        """
 
173
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
 
174
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
 
175
        :return: None
 
176
        """
 
177
        # read-only is easy, writeback is much harder
 
178
        mode = get_cache_mode(self.service, cache_pool)
 
179
        version = ceph_version()
 
180
        if mode == 'readonly':
 
181
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
 
182
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
 
183
 
 
184
        elif mode == 'writeback':
 
185
            pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
 
186
                                'cache-mode', cache_pool, 'forward']
 
187
            if version >= '10.1':
 
188
                # Jewel added a mandatory flag
 
189
                pool_forward_cmd.append('--yes-i-really-mean-it')
 
190
 
 
191
            check_call(pool_forward_cmd)
 
192
            # Flush the cache and wait for it to return
 
193
            check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
 
194
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
 
195
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
 
196
 
 
197
    def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT):
 
198
        """Return the number of placement groups to use when creating the pool.
 
199
 
 
200
        Returns the number of placement groups which should be specified when
 
201
        creating the pool. This is based upon the calculation guidelines
 
202
        provided by the Ceph Placement Group Calculator (located online at
 
203
        http://ceph.com/pgcalc/).
 
204
 
 
205
        The number of placement groups are calculated using the following:
 
206
 
 
207
            (Target PGs per OSD) * (OSD #) * (%Data)
 
208
            ----------------------------------------
 
209
                         (Pool size)
 
210
 
 
211
        Per the upstream guidelines, the OSD # should really be considered
 
212
        based on the number of OSDs which are eligible to be selected by the
 
213
        pool. Since the pool creation doesn't specify any of CRUSH set rules,
 
214
        the default rule will be dependent upon the type of pool being
 
215
        created (replicated or erasure).
 
216
 
 
217
        This code makes no attempt to determine the number of OSDs which can be
 
218
        selected for the specific rule, rather it is left to the user to tune
 
219
        in the form of 'expected-osd-count' config option.
 
220
 
 
221
        :param pool_size: int. pool_size is either the number of replicas for
 
222
            replicated pools or the K+M sum for erasure coded pools
 
223
        :param percent_data: float. the percentage of data that is expected to
 
224
            be contained in the pool for the specific OSD set. Default value
 
225
            is to assume 10% of the data is for this pool, which is a
 
226
            relatively low % of the data but allows for the pg_num to be
 
227
            increased. NOTE: the default is primarily to handle the scenario
 
228
            where related charms requiring pools has not been upgraded to
 
229
            include an update to indicate their relative usage of the pools.
 
230
        :return: int.  The number of pgs to use.
 
231
        """
 
232
 
 
233
        # Note: This calculation follows the approach that is provided
 
234
        # by the Ceph PG Calculator located at http://ceph.com/pgcalc/.
 
235
        validator(value=pool_size, valid_type=int)
 
236
 
 
237
        # Ensure that percent data is set to something - even with a default
 
238
        # it can be set to None, which would wreak havoc below.
 
239
        if percent_data is None:
 
240
            percent_data = DEFAULT_POOL_WEIGHT
 
241
 
 
242
        # If the expected-osd-count is specified, then use the max between
 
243
        # the expected-osd-count and the actual osd_count
 
244
        osd_list = get_osds(self.service)
 
245
        expected = config('expected-osd-count') or 0
 
246
 
 
247
        if osd_list:
 
248
            osd_count = max(expected, len(osd_list))
 
249
 
 
250
            # Log a message to provide some insight if the calculations claim
 
251
            # to be off because someone is setting the expected count and
 
252
            # there are more OSDs in reality. Try to make a proper guess
 
253
            # based upon the cluster itself.
 
254
            if expected and osd_count != expected:
 
255
                log("Found more OSDs than provided expected count. "
 
256
                    "Using the actual count instead", INFO)
 
257
        elif expected:
 
258
            # Use the expected-osd-count in older ceph versions to allow for
 
259
            # a more accurate pg calculations
 
260
            osd_count = expected
 
261
        else:
 
262
            # NOTE(james-page): Default to 200 for older ceph versions
 
263
            # which don't support OSD query from cli
 
264
            return LEGACY_PG_COUNT
 
265
 
 
266
        percent_data /= 100.0
 
267
        target_pgs_per_osd = config('pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET
 
268
        num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size
 
269
 
 
270
        # NOTE: ensure a sane minimum number of PGS otherwise we don't get any
 
271
        #       reasonable data distribution in minimal OSD configurations
 
272
        if num_pg < DEFAULT_MINIMUM_PGS:
 
273
            num_pg = DEFAULT_MINIMUM_PGS
 
274
 
 
275
        # The CRUSH algorithm has a slight optimization for placement groups
 
276
        # with powers of 2 so find the nearest power of 2. If the nearest
 
277
        # power of 2 is more than 25% below the original value, the next
 
278
        # highest value is used. To do this, find the nearest power of 2 such
 
279
        # that 2^n <= num_pg, check to see if its within the 25% tolerance.
 
280
        exponent = math.floor(math.log(num_pg, 2))
 
281
        nearest = 2 ** exponent
 
282
        if (num_pg - nearest) > (num_pg * 0.25):
 
283
            # Choose the next highest power of 2 since the nearest is more
 
284
            # than 25% below the original value.
 
285
            return int(nearest * 2)
 
286
        else:
 
287
            return int(nearest)
 
288
 
 
289
 
 
290
class ReplicatedPool(Pool):
 
291
    def __init__(self, service, name, pg_num=None, replicas=2,
 
292
                 percent_data=10.0):
 
293
        super(ReplicatedPool, self).__init__(service=service, name=name)
 
294
        self.replicas = replicas
 
295
        if pg_num:
 
296
            # Since the number of placement groups were specified, ensure
 
297
            # that there aren't too many created.
 
298
            max_pgs = self.get_pgs(self.replicas, 100.0)
 
299
            self.pg_num = min(pg_num, max_pgs)
 
300
        else:
 
301
            self.pg_num = self.get_pgs(self.replicas, percent_data)
 
302
 
 
303
    def create(self):
 
304
        if not pool_exists(self.service, self.name):
 
305
            # Create it
 
306
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
 
307
                   self.name, str(self.pg_num)]
 
308
            try:
 
309
                check_call(cmd)
 
310
                # Set the pool replica size
 
311
                update_pool(client=self.service,
 
312
                            pool=self.name,
 
313
                            settings={'size': str(self.replicas)})
 
314
            except CalledProcessError:
 
315
                raise
 
316
 
 
317
 
 
318
# Default jerasure erasure coded pool
 
319
class ErasurePool(Pool):
 
320
    def __init__(self, service, name, erasure_code_profile="default",
 
321
                 percent_data=10.0):
 
322
        super(ErasurePool, self).__init__(service=service, name=name)
 
323
        self.erasure_code_profile = erasure_code_profile
 
324
        self.percent_data = percent_data
 
325
 
 
326
    def create(self):
 
327
        if not pool_exists(self.service, self.name):
 
328
            # Try to find the erasure profile information in order to properly
 
329
            # size the number of placement groups. The size of an erasure
 
330
            # coded placement group is calculated as k+m.
 
331
            erasure_profile = get_erasure_profile(self.service,
 
332
                                                  self.erasure_code_profile)
 
333
 
 
334
            # Check for errors
 
335
            if erasure_profile is None:
 
336
                msg = ("Failed to discover erasure profile named "
 
337
                       "{}".format(self.erasure_code_profile))
 
338
                log(msg, level=ERROR)
 
339
                raise PoolCreationError(msg)
 
340
            if 'k' not in erasure_profile or 'm' not in erasure_profile:
 
341
                # Error
 
342
                msg = ("Unable to find k (data chunks) or m (coding chunks) "
 
343
                       "in erasure profile {}".format(erasure_profile))
 
344
                log(msg, level=ERROR)
 
345
                raise PoolCreationError(msg)
 
346
 
 
347
            k = int(erasure_profile['k'])
 
348
            m = int(erasure_profile['m'])
 
349
            pgs = self.get_pgs(k + m, self.percent_data)
 
350
            # Create it
 
351
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
 
352
                   self.name, str(pgs), str(pgs),
 
353
                   'erasure', self.erasure_code_profile]
 
354
            try:
 
355
                check_call(cmd)
 
356
            except CalledProcessError:
 
357
                raise
 
358
 
 
359
    """Get an existing erasure code profile if it already exists.
 
360
       Returns json formatted output"""
 
361
 
 
362
 
 
363
def get_mon_map(service):
 
364
    """
 
365
    Returns the current monitor map.
 
366
    :param service: six.string_types. The Ceph user name to run the command under
 
367
    :return: json string. :raise: ValueError if the monmap fails to parse.
 
368
      Also raises CalledProcessError if our ceph command fails
 
369
    """
 
370
    try:
 
371
        mon_status = check_output(
 
372
            ['ceph', '--id', service,
 
373
             'mon_status', '--format=json'])
 
374
        try:
 
375
            return json.loads(mon_status)
 
376
        except ValueError as v:
 
377
            log("Unable to parse mon_status json: {}. Error: {}".format(
 
378
                mon_status, v.message))
 
379
            raise
 
380
    except CalledProcessError as e:
 
381
        log("mon_status command failed with message: {}".format(
 
382
            e.message))
 
383
        raise
 
384
 
 
385
 
 
386
def hash_monitor_names(service):
 
387
    """
 
388
    Uses the get_mon_map() function to get information about the monitor
 
389
    cluster.
 
390
    Hash the name of each monitor.  Return a sorted list of monitor hashes
 
391
    in an ascending order.
 
392
    :param service: six.string_types. The Ceph user name to run the command under
 
393
    :rtype : dict.   json dict of monitor name, ip address and rank
 
394
    example: {
 
395
        'name': 'ip-172-31-13-165',
 
396
        'rank': 0,
 
397
        'addr': '172.31.13.165:6789/0'}
 
398
    """
 
399
    try:
 
400
        hash_list = []
 
401
        monitor_list = get_mon_map(service=service)
 
402
        if monitor_list['monmap']['mons']:
 
403
            for mon in monitor_list['monmap']['mons']:
 
404
                hash_list.append(
 
405
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
 
406
            return sorted(hash_list)
 
407
        else:
 
408
            return None
 
409
    except (ValueError, CalledProcessError):
 
410
        raise
 
411
 
 
412
 
 
413
def monitor_key_delete(service, key):
 
414
    """
 
415
    Delete a key and value pair from the monitor cluster
 
416
    :param service: six.string_types. The Ceph user name to run the command under
 
417
    Deletes a key value pair on the monitor cluster.
 
418
    :param key: six.string_types.  The key to delete.
 
419
    """
 
420
    try:
 
421
        check_output(
 
422
            ['ceph', '--id', service,
 
423
             'config-key', 'del', str(key)])
 
424
    except CalledProcessError as e:
 
425
        log("Monitor config-key put failed with message: {}".format(
 
426
            e.output))
 
427
        raise
 
428
 
 
429
 
 
430
def monitor_key_set(service, key, value):
 
431
    """
 
432
    Sets a key value pair on the monitor cluster.
 
433
    :param service: six.string_types. The Ceph user name to run the command under
 
434
    :param key: six.string_types.  The key to set.
 
435
    :param value: The value to set.  This will be converted to a string
 
436
        before setting
 
437
    """
 
438
    try:
 
439
        check_output(
 
440
            ['ceph', '--id', service,
 
441
             'config-key', 'put', str(key), str(value)])
 
442
    except CalledProcessError as e:
 
443
        log("Monitor config-key put failed with message: {}".format(
 
444
            e.output))
 
445
        raise
 
446
 
 
447
 
 
448
def monitor_key_get(service, key):
 
449
    """
 
450
    Gets the value of an existing key in the monitor cluster.
 
451
    :param service: six.string_types. The Ceph user name to run the command under
 
452
    :param key: six.string_types.  The key to search for.
 
453
    :return: Returns the value of that key or None if not found.
 
454
    """
 
455
    try:
 
456
        output = check_output(
 
457
            ['ceph', '--id', service,
 
458
             'config-key', 'get', str(key)])
 
459
        return output
 
460
    except CalledProcessError as e:
 
461
        log("Monitor config-key get failed with message: {}".format(
 
462
            e.output))
 
463
        return None
 
464
 
 
465
 
 
466
def monitor_key_exists(service, key):
 
467
    """
 
468
    Searches for the existence of a key in the monitor cluster.
 
469
    :param service: six.string_types. The Ceph user name to run the command under
 
470
    :param key: six.string_types.  The key to search for
 
471
    :return: Returns True if the key exists, False if not and raises an
 
472
     exception if an unknown error occurs. :raise: CalledProcessError if
 
473
     an unknown error occurs
 
474
    """
 
475
    try:
 
476
        check_call(
 
477
            ['ceph', '--id', service,
 
478
             'config-key', 'exists', str(key)])
 
479
        # I can return true here regardless because Ceph returns
 
480
        # ENOENT if the key wasn't found
 
481
        return True
 
482
    except CalledProcessError as e:
 
483
        if e.returncode == errno.ENOENT:
 
484
            return False
 
485
        else:
 
486
            log("Unknown error from ceph config-get exists: {} {}".format(
 
487
                e.returncode, e.output))
 
488
            raise
 
489
 
 
490
 
 
491
def get_erasure_profile(service, name):
 
492
    """
 
493
    :param service: six.string_types. The Ceph user name to run the command under
 
494
    :param name:
 
495
    :return:
 
496
    """
 
497
    try:
 
498
        out = check_output(['ceph', '--id', service,
 
499
                            'osd', 'erasure-code-profile', 'get',
 
500
                            name, '--format=json'])
 
501
        return json.loads(out)
 
502
    except (CalledProcessError, OSError, ValueError):
 
503
        return None
 
504
 
 
505
 
 
506
def pool_set(service, pool_name, key, value):
 
507
    """
 
508
    Sets a value for a RADOS pool in ceph.
 
509
    :param service: six.string_types. The Ceph user name to run the command under
 
510
    :param pool_name: six.string_types
 
511
    :param key: six.string_types
 
512
    :param value:
 
513
    :return: None.  Can raise CalledProcessError
 
514
    """
 
515
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
 
516
    try:
 
517
        check_call(cmd)
 
518
    except CalledProcessError:
 
519
        raise
 
520
 
 
521
 
 
522
def snapshot_pool(service, pool_name, snapshot_name):
 
523
    """
 
524
    Snapshots a RADOS pool in ceph.
 
525
    :param service: six.string_types. The Ceph user name to run the command under
 
526
    :param pool_name: six.string_types
 
527
    :param snapshot_name: six.string_types
 
528
    :return: None.  Can raise CalledProcessError
 
529
    """
 
530
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
 
531
    try:
 
532
        check_call(cmd)
 
533
    except CalledProcessError:
 
534
        raise
 
535
 
 
536
 
 
537
def remove_pool_snapshot(service, pool_name, snapshot_name):
 
538
    """
 
539
    Remove a snapshot from a RADOS pool in ceph.
 
540
    :param service: six.string_types. The Ceph user name to run the command under
 
541
    :param pool_name: six.string_types
 
542
    :param snapshot_name: six.string_types
 
543
    :return: None.  Can raise CalledProcessError
 
544
    """
 
545
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
 
546
    try:
 
547
        check_call(cmd)
 
548
    except CalledProcessError:
 
549
        raise
 
550
 
 
551
 
 
552
# max_bytes should be an int or long
 
553
def set_pool_quota(service, pool_name, max_bytes):
 
554
    """
 
555
    :param service: six.string_types. The Ceph user name to run the command under
 
556
    :param pool_name: six.string_types
 
557
    :param max_bytes: int or long
 
558
    :return: None.  Can raise CalledProcessError
 
559
    """
 
560
    # Set a byte quota on a RADOS pool in ceph.
 
561
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
 
562
           'max_bytes', str(max_bytes)]
 
563
    try:
 
564
        check_call(cmd)
 
565
    except CalledProcessError:
 
566
        raise
 
567
 
 
568
 
 
569
def remove_pool_quota(service, pool_name):
 
570
    """
 
571
    Set a byte quota on a RADOS pool in ceph.
 
572
    :param service: six.string_types. The Ceph user name to run the command under
 
573
    :param pool_name: six.string_types
 
574
    :return: None.  Can raise CalledProcessError
 
575
    """
 
576
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
 
577
    try:
 
578
        check_call(cmd)
 
579
    except CalledProcessError:
 
580
        raise
 
581
 
 
582
 
 
583
def remove_erasure_profile(service, profile_name):
 
584
    """
 
585
    Create a new erasure code profile if one does not already exist for it.  Updates
 
586
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
 
587
    for more details
 
588
    :param service: six.string_types. The Ceph user name to run the command under
 
589
    :param profile_name: six.string_types
 
590
    :return: None.  Can raise CalledProcessError
 
591
    """
 
592
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
 
593
           profile_name]
 
594
    try:
 
595
        check_call(cmd)
 
596
    except CalledProcessError:
 
597
        raise
 
598
 
 
599
 
 
600
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
 
601
                           failure_domain='host',
 
602
                           data_chunks=2, coding_chunks=1,
 
603
                           locality=None, durability_estimator=None):
 
604
    """
 
605
    Create a new erasure code profile if one does not already exist for it.  Updates
 
606
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
 
607
    for more details
 
608
    :param service: six.string_types. The Ceph user name to run the command under
 
609
    :param profile_name: six.string_types
 
610
    :param erasure_plugin_name: six.string_types
 
611
    :param failure_domain: six.string_types.  One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
 
612
        'room', 'root', 'row'])
 
613
    :param data_chunks: int
 
614
    :param coding_chunks: int
 
615
    :param locality: int
 
616
    :param durability_estimator: int
 
617
    :return: None.  Can raise CalledProcessError
 
618
    """
 
619
    # Ensure this failure_domain is allowed by Ceph
 
620
    validator(failure_domain, six.string_types,
 
621
              ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
 
622
 
 
623
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
 
624
           'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
 
625
           'ruleset_failure_domain=' + failure_domain]
 
626
    if locality is not None and durability_estimator is not None:
 
627
        raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
 
628
 
 
629
    # Add plugin specific information
 
630
    if locality is not None:
 
631
        # For local erasure codes
 
632
        cmd.append('l=' + str(locality))
 
633
    if durability_estimator is not None:
 
634
        # For Shec erasure codes
 
635
        cmd.append('c=' + str(durability_estimator))
 
636
 
 
637
    if erasure_profile_exists(service, profile_name):
 
638
        cmd.append('--force')
 
639
 
 
640
    try:
 
641
        check_call(cmd)
 
642
    except CalledProcessError:
 
643
        raise
 
644
 
 
645
 
 
646
def rename_pool(service, old_name, new_name):
 
647
    """
 
648
    Rename a Ceph pool from old_name to new_name
 
649
    :param service: six.string_types. The Ceph user name to run the command under
 
650
    :param old_name: six.string_types
 
651
    :param new_name: six.string_types
 
652
    :return: None
 
653
    """
 
654
    validator(value=old_name, valid_type=six.string_types)
 
655
    validator(value=new_name, valid_type=six.string_types)
 
656
 
 
657
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
 
658
    check_call(cmd)
 
659
 
 
660
 
 
661
def erasure_profile_exists(service, name):
 
662
    """
 
663
    Check to see if an Erasure code profile already exists.
 
664
    :param service: six.string_types. The Ceph user name to run the command under
 
665
    :param name: six.string_types
 
666
    :return: int or None
 
667
    """
 
668
    validator(value=name, valid_type=six.string_types)
 
669
    try:
 
670
        check_call(['ceph', '--id', service,
 
671
                    'osd', 'erasure-code-profile', 'get',
 
672
                    name])
 
673
        return True
 
674
    except CalledProcessError:
 
675
        return False
 
676
 
 
677
 
 
678
def get_cache_mode(service, pool_name):
 
679
    """
 
680
    Find the current caching mode of the pool_name given.
 
681
    :param service: six.string_types. The Ceph user name to run the command under
 
682
    :param pool_name: six.string_types
 
683
    :return: int or None
 
684
    """
 
685
    validator(value=service, valid_type=six.string_types)
 
686
    validator(value=pool_name, valid_type=six.string_types)
 
687
    out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
 
688
    try:
 
689
        osd_json = json.loads(out)
 
690
        for pool in osd_json['pools']:
 
691
            if pool['pool_name'] == pool_name:
 
692
                return pool['cache_mode']
 
693
        return None
 
694
    except ValueError:
 
695
        raise
 
696
 
 
697
 
 
698
def pool_exists(service, name):
 
699
    """Check to see if a RADOS pool already exists."""
 
700
    try:
 
701
        out = check_output(['rados', '--id', service,
 
702
                            'lspools']).decode('UTF-8')
 
703
    except CalledProcessError:
 
704
        return False
 
705
 
 
706
    return name in out.split()
 
707
 
 
708
 
 
709
def get_osds(service):
 
710
    """Return a list of all Ceph Object Storage Daemons currently in the
 
711
    cluster.
 
712
    """
 
713
    version = ceph_version()
 
714
    if version and version >= '0.56':
 
715
        return json.loads(check_output(['ceph', '--id', service,
 
716
                                        'osd', 'ls',
 
717
                                        '--format=json']).decode('UTF-8'))
 
718
 
 
719
    return None
 
720
 
71
721
 
72
722
def install():
73
723
    """Basic Ceph client installation."""
96
746
    check_call(cmd)
97
747
 
98
748
 
99
 
def pool_exists(service, name):
100
 
    """Check to see if a RADOS pool already exists."""
101
 
    try:
102
 
        out = check_output(['rados', '--id', service,
103
 
                            'lspools']).decode('UTF-8')
104
 
    except CalledProcessError:
105
 
        return False
106
 
 
107
 
    return name in out
108
 
 
109
 
 
110
 
def get_osds(service):
111
 
    """Return a list of all Ceph Object Storage Daemons currently in the
112
 
    cluster.
113
 
    """
114
 
    version = ceph_version()
115
 
    if version and version >= '0.56':
116
 
        return json.loads(check_output(['ceph', '--id', service,
117
 
                                        'osd', 'ls',
118
 
                                        '--format=json']).decode('UTF-8'))
119
 
 
120
 
    return None
121
 
 
122
 
 
123
 
def create_pool(service, name, replicas=3):
 
749
def update_pool(client, pool, settings):
 
750
    cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
 
751
    for k, v in six.iteritems(settings):
 
752
        cmd.append(k)
 
753
        cmd.append(v)
 
754
 
 
755
    check_call(cmd)
 
756
 
 
757
 
 
758
def create_pool(service, name, replicas=3, pg_num=None):
124
759
    """Create a new RADOS pool."""
125
760
    if pool_exists(service, name):
126
761
        log("Ceph pool {} already exists, skipping creation".format(name),
127
762
            level=WARNING)
128
763
        return
129
764
 
130
 
    # Calculate the number of placement groups based
131
 
    # on upstream recommended best practices.
132
 
    osds = get_osds(service)
133
 
    if osds:
134
 
        pgnum = (len(osds) * 100 // replicas)
135
 
    else:
136
 
        # NOTE(james-page): Default to 200 for older ceph versions
137
 
        # which don't support OSD query from cli
138
 
        pgnum = 200
139
 
 
140
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
141
 
    check_call(cmd)
142
 
 
143
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
144
 
           str(replicas)]
145
 
    check_call(cmd)
 
765
    if not pg_num:
 
766
        # Calculate the number of placement groups based
 
767
        # on upstream recommended best practices.
 
768
        osds = get_osds(service)
 
769
        if osds:
 
770
            pg_num = (len(osds) * 100 // replicas)
 
771
        else:
 
772
            # NOTE(james-page): Default to 200 for older ceph versions
 
773
            # which don't support OSD query from cli
 
774
            pg_num = 200
 
775
 
 
776
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
 
777
    check_call(cmd)
 
778
 
 
779
    update_pool(service, name, settings={'size': str(replicas)})
146
780
 
147
781
 
148
782
def delete_pool(service, name):
197
831
    log('Created new keyfile at %s.' % keyfile, level=INFO)
198
832
 
199
833
 
200
 
def get_ceph_nodes():
201
 
    """Query named relation 'ceph' to determine current nodes."""
 
834
def get_ceph_nodes(relation='ceph'):
 
835
    """Query named relation to determine current nodes."""
202
836
    hosts = []
203
 
    for r_id in relation_ids('ceph'):
 
837
    for r_id in relation_ids(relation):
204
838
        for unit in related_units(r_id):
205
839
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
206
840
 
288
922
    os.chown(data_src_dst, uid, gid)
289
923
 
290
924
 
291
 
# TODO: re-use
292
 
def modprobe(module):
293
 
    """Load a kernel module and configure for auto-load on reboot."""
294
 
    log('Loading kernel module', level=INFO)
295
 
    cmd = ['modprobe', module]
296
 
    check_call(cmd)
297
 
    with open('/etc/modules', 'r+') as modules:
298
 
        if module not in modules.read():
299
 
            modules.write(module)
300
 
 
301
 
 
302
925
def copy_files(src, dst, symlinks=False, ignore=None):
303
926
    """Copy files from src to dst."""
304
927
    for item in os.listdir(src):
363
986
            service_start(svc)
364
987
 
365
988
 
366
 
def ensure_ceph_keyring(service, user=None, group=None):
 
989
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
367
990
    """Ensures a ceph keyring is created for a named service and optionally
368
991
    ensures user and group ownership.
369
992
 
370
993
    Returns False if no ceph key is available in relation state.
371
994
    """
372
995
    key = None
373
 
    for rid in relation_ids('ceph'):
 
996
    for rid in relation_ids(relation):
374
997
        for unit in related_units(rid):
375
998
            key = relation_get('key', rid=rid, unit=unit)
376
999
            if key:
411
1034
 
412
1035
    The API is versioned and defaults to version 1.
413
1036
    """
414
 
    def __init__(self, api_version=1):
 
1037
 
 
1038
    def __init__(self, api_version=1, request_id=None):
415
1039
        self.api_version = api_version
 
1040
        if request_id:
 
1041
            self.request_id = request_id
 
1042
        else:
 
1043
            self.request_id = str(uuid.uuid1())
416
1044
        self.ops = []
417
1045
 
418
 
    def add_op_create_pool(self, name, replica_count=3):
 
1046
    def add_op_create_pool(self, name, replica_count=3, pg_num=None,
 
1047
                           weight=None):
 
1048
        """Adds an operation to create a pool.
 
1049
 
 
1050
        @param pg_num setting:  optional setting. If not provided, this value
 
1051
        will be calculated by the broker based on how many OSDs are in the
 
1052
        cluster at the time of creation. Note that, if provided, this value
 
1053
        will be capped at the current available maximum.
 
1054
        @param weight: the percentage of data the pool makes up
 
1055
        """
 
1056
        if pg_num and weight:
 
1057
            raise ValueError('pg_num and weight are mutually exclusive')
 
1058
 
419
1059
        self.ops.append({'op': 'create-pool', 'name': name,
420
 
                         'replicas': replica_count})
 
1060
                         'replicas': replica_count, 'pg_num': pg_num,
 
1061
                         'weight': weight})
 
1062
 
 
1063
    def set_ops(self, ops):
 
1064
        """Set request ops to provided value.
 
1065
 
 
1066
        Useful for injecting ops that come from a previous request
 
1067
        to allow comparisons to ensure validity.
 
1068
        """
 
1069
        self.ops = ops
421
1070
 
422
1071
    @property
423
1072
    def request(self):
424
 
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
1073
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 
1074
                           'request-id': self.request_id})
 
1075
 
 
1076
    def _ops_equal(self, other):
 
1077
        if len(self.ops) == len(other.ops):
 
1078
            for req_no in range(0, len(self.ops)):
 
1079
                for key in ['replicas', 'name', 'op', 'pg_num', 'weight']:
 
1080
                    if self.ops[req_no].get(key) != other.ops[req_no].get(key):
 
1081
                        return False
 
1082
        else:
 
1083
            return False
 
1084
        return True
 
1085
 
 
1086
    def __eq__(self, other):
 
1087
        if not isinstance(other, self.__class__):
 
1088
            return False
 
1089
        if self.api_version == other.api_version and \
 
1090
                self._ops_equal(other):
 
1091
            return True
 
1092
        else:
 
1093
            return False
 
1094
 
 
1095
    def __ne__(self, other):
 
1096
        return not self.__eq__(other)
425
1097
 
426
1098
 
427
1099
class CephBrokerRsp(object):
431
1103
 
432
1104
    The API is versioned and defaults to version 1.
433
1105
    """
 
1106
 
434
1107
    def __init__(self, encoded_rsp):
435
1108
        self.api_version = None
436
1109
        self.rsp = json.loads(encoded_rsp)
437
1110
 
438
1111
    @property
 
1112
    def request_id(self):
 
1113
        return self.rsp.get('request-id')
 
1114
 
 
1115
    @property
439
1116
    def exit_code(self):
440
1117
        return self.rsp.get('exit-code')
441
1118
 
442
1119
    @property
443
1120
    def exit_msg(self):
444
1121
        return self.rsp.get('stderr')
 
1122
 
 
1123
 
 
1124
# Ceph Broker Conversation:
 
1125
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 
1126
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 
1127
# unique id so that the client can identity which CephBrokerRsp is associated
 
1128
# with the request. Ceph will also respond to each client unit individually
 
1129
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 
1130
# via key broker-rsp-glance-0
 
1131
#
 
1132
# To use this the charm can just do something like:
 
1133
#
 
1134
# from charmhelpers.contrib.storage.linux.ceph import (
 
1135
#     send_request_if_needed,
 
1136
#     is_request_complete,
 
1137
#     CephBrokerRq,
 
1138
# )
 
1139
#
 
1140
# @hooks.hook('ceph-relation-changed')
 
1141
# def ceph_changed():
 
1142
#     rq = CephBrokerRq()
 
1143
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 
1144
#
 
1145
#     if is_request_complete(rq):
 
1146
#         <Request complete actions>
 
1147
#     else:
 
1148
#         send_request_if_needed(get_ceph_request())
 
1149
#
 
1150
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 
1151
# of glance having sent a request to ceph which ceph has successfully processed
 
1152
#  'ceph:8': {
 
1153
#      'ceph/0': {
 
1154
#          'auth': 'cephx',
 
1155
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 
1156
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 
1157
#          'ceph-public-address': '10.5.44.103',
 
1158
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 
1159
#          'private-address': '10.5.44.103',
 
1160
#      },
 
1161
#      'glance/0': {
 
1162
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 
1163
#                         '"ops": [{"replicas": 3, "name": "glance", '
 
1164
#                         '"op": "create-pool"}]}'),
 
1165
#          'private-address': '10.5.44.109',
 
1166
#      },
 
1167
#  }
 
1168
 
 
1169
def get_previous_request(rid):
 
1170
    """Return the last ceph broker request sent on a given relation
 
1171
 
 
1172
    @param rid: Relation id to query for request
 
1173
    """
 
1174
    request = None
 
1175
    broker_req = relation_get(attribute='broker_req', rid=rid,
 
1176
                              unit=local_unit())
 
1177
    if broker_req:
 
1178
        request_data = json.loads(broker_req)
 
1179
        request = CephBrokerRq(api_version=request_data['api-version'],
 
1180
                               request_id=request_data['request-id'])
 
1181
        request.set_ops(request_data['ops'])
 
1182
 
 
1183
    return request
 
1184
 
 
1185
 
 
1186
def get_request_states(request, relation='ceph'):
 
1187
    """Return a dict of requests per relation id with their corresponding
 
1188
       completion state.
 
1189
 
 
1190
    This allows a charm, which has a request for ceph, to see whether there is
 
1191
    an equivalent request already being processed and if so what state that
 
1192
    request is in.
 
1193
 
 
1194
    @param request: A CephBrokerRq object
 
1195
    """
 
1196
    complete = []
 
1197
    requests = {}
 
1198
    for rid in relation_ids(relation):
 
1199
        complete = False
 
1200
        previous_request = get_previous_request(rid)
 
1201
        if request == previous_request:
 
1202
            sent = True
 
1203
            complete = is_request_complete_for_rid(previous_request, rid)
 
1204
        else:
 
1205
            sent = False
 
1206
            complete = False
 
1207
 
 
1208
        requests[rid] = {
 
1209
            'sent': sent,
 
1210
            'complete': complete,
 
1211
        }
 
1212
 
 
1213
    return requests
 
1214
 
 
1215
 
 
1216
def is_request_sent(request, relation='ceph'):
 
1217
    """Check to see if a functionally equivalent request has already been sent
 
1218
 
 
1219
    Returns True if a similair request has been sent
 
1220
 
 
1221
    @param request: A CephBrokerRq object
 
1222
    """
 
1223
    states = get_request_states(request, relation=relation)
 
1224
    for rid in states.keys():
 
1225
        if not states[rid]['sent']:
 
1226
            return False
 
1227
 
 
1228
    return True
 
1229
 
 
1230
 
 
1231
def is_request_complete(request, relation='ceph'):
 
1232
    """Check to see if a functionally equivalent request has already been
 
1233
    completed
 
1234
 
 
1235
    Returns True if a similair request has been completed
 
1236
 
 
1237
    @param request: A CephBrokerRq object
 
1238
    """
 
1239
    states = get_request_states(request, relation=relation)
 
1240
    for rid in states.keys():
 
1241
        if not states[rid]['complete']:
 
1242
            return False
 
1243
 
 
1244
    return True
 
1245
 
 
1246
 
 
1247
def is_request_complete_for_rid(request, rid):
 
1248
    """Check if a given request has been completed on the given relation
 
1249
 
 
1250
    @param request: A CephBrokerRq object
 
1251
    @param rid: Relation ID
 
1252
    """
 
1253
    broker_key = get_broker_rsp_key()
 
1254
    for unit in related_units(rid):
 
1255
        rdata = relation_get(rid=rid, unit=unit)
 
1256
        if rdata.get(broker_key):
 
1257
            rsp = CephBrokerRsp(rdata.get(broker_key))
 
1258
            if rsp.request_id == request.request_id:
 
1259
                if not rsp.exit_code:
 
1260
                    return True
 
1261
        else:
 
1262
            # The remote unit sent no reply targeted at this unit so either the
 
1263
            # remote ceph cluster does not support unit targeted replies or it
 
1264
            # has not processed our request yet.
 
1265
            if rdata.get('broker_rsp'):
 
1266
                request_data = json.loads(rdata['broker_rsp'])
 
1267
                if request_data.get('request-id'):
 
1268
                    log('Ignoring legacy broker_rsp without unit key as remote '
 
1269
                        'service supports unit specific replies', level=DEBUG)
 
1270
                else:
 
1271
                    log('Using legacy broker_rsp as remote service does not '
 
1272
                        'supports unit specific replies', level=DEBUG)
 
1273
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 
1274
                    if not rsp.exit_code:
 
1275
                        return True
 
1276
 
 
1277
    return False
 
1278
 
 
1279
 
 
1280
def get_broker_rsp_key():
 
1281
    """Return broker response key for this unit
 
1282
 
 
1283
    This is the key that ceph is going to use to pass request status
 
1284
    information back to this unit
 
1285
    """
 
1286
    return 'broker-rsp-' + local_unit().replace('/', '-')
 
1287
 
 
1288
 
 
1289
def send_request_if_needed(request, relation='ceph'):
 
1290
    """Send broker request if an equivalent request has not already been sent
 
1291
 
 
1292
    @param request: A CephBrokerRq object
 
1293
    """
 
1294
    if is_request_sent(request, relation=relation):
 
1295
        log('Request already sent but not complete, not sending new request',
 
1296
            level=DEBUG)
 
1297
    else:
 
1298
        for rid in relation_ids(relation):
 
1299
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 
1300
            relation_set(relation_id=rid, broker_req=request.request)
 
1301
 
 
1302
 
 
1303
class CephConfContext(object):
 
1304
    """Ceph config (ceph.conf) context.
 
1305
 
 
1306
    Supports user-provided Ceph configuration settings. Use can provide a
 
1307
    dictionary as the value for the config-flags charm option containing
 
1308
    Ceph configuration settings keyede by their section in ceph.conf.
 
1309
    """
 
1310
    def __init__(self, permitted_sections=None):
 
1311
        self.permitted_sections = permitted_sections or []
 
1312
 
 
1313
    def __call__(self):
 
1314
        conf = config('config-flags')
 
1315
        if not conf:
 
1316
            return {}
 
1317
 
 
1318
        conf = config_flags_parser(conf)
 
1319
        if type(conf) != dict:
 
1320
            log("Provided config-flags is not a dictionary - ignoring",
 
1321
                level=WARNING)
 
1322
            return {}
 
1323
 
 
1324
        permitted = self.permitted_sections
 
1325
        if permitted:
 
1326
            diff = set(conf.keys()).difference(set(permitted))
 
1327
            if diff:
 
1328
                log("Config-flags contains invalid keys '%s' - they will be "
 
1329
                    "ignored" % (', '.join(diff)), level=WARNING)
 
1330
 
 
1331
        ceph_conf = {}
 
1332
        for key in conf:
 
1333
            if permitted and key not in permitted:
 
1334
                log("Ignoring key '%s'" % key, level=WARNING)
 
1335
                continue
 
1336
 
 
1337
            ceph_conf[key] = conf[key]
 
1338
 
 
1339
        return ceph_conf