~openstack-charmers-next/charms/vivid/hacluster/trunk

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Liam Young
  • Date: 2016-03-30 09:07:46 UTC
  • mfrom: (63.1.4 trunk)
  • Revision ID: liam.young@canonical.com-20160330090746-rwwe91yjqi9j9ry3
[gnuoy, r=james-page] Add pause/resume actions

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
#  James Page <james.page@ubuntu.com>
24
24
#  Adam Gandelman <adamg@ubuntu.com>
25
25
#
 
26
import bisect
 
27
import errno
 
28
import hashlib
 
29
import six
26
30
 
27
31
import os
28
32
import shutil
29
33
import json
30
34
import time
 
35
import uuid
31
36
 
32
37
from subprocess import (
33
38
    check_call,
35
40
    CalledProcessError,
36
41
)
37
42
from charmhelpers.core.hookenv import (
 
43
    local_unit,
38
44
    relation_get,
39
45
    relation_ids,
 
46
    relation_set,
40
47
    related_units,
41
48
    log,
42
49
    DEBUG,
56
63
    apt_install,
57
64
)
58
65
 
 
66
from charmhelpers.core.kernel import modprobe
 
67
 
59
68
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
60
69
KEYFILE = '/etc/ceph/ceph.client.{}.key'
61
70
 
67
76
err to syslog = {use_syslog}
68
77
clog to syslog = {use_syslog}
69
78
"""
 
79
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
 
80
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
 
81
 
 
82
 
 
83
def validator(value, valid_type, valid_range=None):
 
84
    """
 
85
    Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
 
86
    Example input:
 
87
        validator(value=1,
 
88
                  valid_type=int,
 
89
                  valid_range=[0, 2])
 
90
    This says I'm testing value=1.  It must be an int inclusive in [0,2]
 
91
 
 
92
    :param value: The value to validate
 
93
    :param valid_type: The type that value should be.
 
94
    :param valid_range: A range of values that value can assume.
 
95
    :return:
 
96
    """
 
97
    assert isinstance(value, valid_type), "{} is not a {}".format(
 
98
        value,
 
99
        valid_type)
 
100
    if valid_range is not None:
 
101
        assert isinstance(valid_range, list), \
 
102
            "valid_range must be a list, was given {}".format(valid_range)
 
103
        # If we're dealing with strings
 
104
        if valid_type is six.string_types:
 
105
            assert value in valid_range, \
 
106
                "{} is not in the list {}".format(value, valid_range)
 
107
        # Integer, float should have a min and max
 
108
        else:
 
109
            if len(valid_range) != 2:
 
110
                raise ValueError(
 
111
                    "Invalid valid_range list of {} for {}.  "
 
112
                    "List must be [min,max]".format(valid_range, value))
 
113
            assert value >= valid_range[0], \
 
114
                "{} is less than minimum allowed value of {}".format(
 
115
                    value, valid_range[0])
 
116
            assert value <= valid_range[1], \
 
117
                "{} is greater than maximum allowed value of {}".format(
 
118
                    value, valid_range[1])
 
119
 
 
120
 
 
121
class PoolCreationError(Exception):
 
122
    """
 
123
    A custom error to inform the caller that a pool creation failed.  Provides an error message
 
124
    """
 
125
 
 
126
    def __init__(self, message):
 
127
        super(PoolCreationError, self).__init__(message)
 
128
 
 
129
 
 
130
class Pool(object):
 
131
    """
 
132
    An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
 
133
    Do not call create() on this base class as it will not do anything.  Instantiate a child class and call create().
 
134
    """
 
135
 
 
136
    def __init__(self, service, name):
 
137
        self.service = service
 
138
        self.name = name
 
139
 
 
140
    # Create the pool if it doesn't exist already
 
141
    # To be implemented by subclasses
 
142
    def create(self):
 
143
        pass
 
144
 
 
145
    def add_cache_tier(self, cache_pool, mode):
 
146
        """
 
147
        Adds a new cache tier to an existing pool.
 
148
        :param cache_pool: six.string_types.  The cache tier pool name to add.
 
149
        :param mode: six.string_types. The caching mode to use for this pool.  valid range = ["readonly", "writeback"]
 
150
        :return: None
 
151
        """
 
152
        # Check the input types and values
 
153
        validator(value=cache_pool, valid_type=six.string_types)
 
154
        validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
 
155
 
 
156
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
 
157
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
 
158
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
 
159
        check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
 
160
 
 
161
    def remove_cache_tier(self, cache_pool):
 
162
        """
 
163
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
 
164
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
 
165
        :return: None
 
166
        """
 
167
        # read-only is easy, writeback is much harder
 
168
        mode = get_cache_mode(self.service, cache_pool)
 
169
        if mode == 'readonly':
 
170
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
 
171
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
 
172
 
 
173
        elif mode == 'writeback':
 
174
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
 
175
            # Flush the cache and wait for it to return
 
176
            check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
 
177
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
 
178
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
 
179
 
 
180
    def get_pgs(self, pool_size):
 
181
        """
 
182
        :param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
 
183
            erasure coded pools
 
184
        :return: int.  The number of pgs to use.
 
185
        """
 
186
        validator(value=pool_size, valid_type=int)
 
187
        osd_list = get_osds(self.service)
 
188
        if not osd_list:
 
189
            # NOTE(james-page): Default to 200 for older ceph versions
 
190
            # which don't support OSD query from cli
 
191
            return 200
 
192
 
 
193
        osd_list_length = len(osd_list)
 
194
        # Calculate based on Ceph best practices
 
195
        if osd_list_length < 5:
 
196
            return 128
 
197
        elif 5 < osd_list_length < 10:
 
198
            return 512
 
199
        elif 10 < osd_list_length < 50:
 
200
            return 4096
 
201
        else:
 
202
            estimate = (osd_list_length * 100) / pool_size
 
203
            # Return the next nearest power of 2
 
204
            index = bisect.bisect_right(powers_of_two, estimate)
 
205
            return powers_of_two[index]
 
206
 
 
207
 
 
208
class ReplicatedPool(Pool):
 
209
    def __init__(self, service, name, pg_num=None, replicas=2):
 
210
        super(ReplicatedPool, self).__init__(service=service, name=name)
 
211
        self.replicas = replicas
 
212
        if pg_num is None:
 
213
            self.pg_num = self.get_pgs(self.replicas)
 
214
        else:
 
215
            self.pg_num = pg_num
 
216
 
 
217
    def create(self):
 
218
        if not pool_exists(self.service, self.name):
 
219
            # Create it
 
220
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
 
221
                   self.name, str(self.pg_num)]
 
222
            try:
 
223
                check_call(cmd)
 
224
            except CalledProcessError:
 
225
                raise
 
226
 
 
227
 
 
228
# Default jerasure erasure coded pool
 
229
class ErasurePool(Pool):
 
230
    def __init__(self, service, name, erasure_code_profile="default"):
 
231
        super(ErasurePool, self).__init__(service=service, name=name)
 
232
        self.erasure_code_profile = erasure_code_profile
 
233
 
 
234
    def create(self):
 
235
        if not pool_exists(self.service, self.name):
 
236
            # Try to find the erasure profile information so we can properly size the pgs
 
237
            erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
 
238
 
 
239
            # Check for errors
 
240
            if erasure_profile is None:
 
241
                log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
 
242
                    level=ERROR)
 
243
                raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
 
244
            if 'k' not in erasure_profile or 'm' not in erasure_profile:
 
245
                # Error
 
246
                log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
 
247
                    level=ERROR)
 
248
                raise PoolCreationError(
 
249
                    message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
 
250
 
 
251
            pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
 
252
            # Create it
 
253
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
 
254
                   'erasure', self.erasure_code_profile]
 
255
            try:
 
256
                check_call(cmd)
 
257
            except CalledProcessError:
 
258
                raise
 
259
 
 
260
    """Get an existing erasure code profile if it already exists.
 
261
       Returns json formatted output"""
 
262
 
 
263
 
 
264
def get_mon_map(service):
 
265
    """
 
266
    Returns the current monitor map.
 
267
    :param service: six.string_types. The Ceph user name to run the command under
 
268
    :return: json string. :raise: ValueError if the monmap fails to parse.
 
269
      Also raises CalledProcessError if our ceph command fails
 
270
    """
 
271
    try:
 
272
        mon_status = check_output(
 
273
            ['ceph', '--id', service,
 
274
             'mon_status', '--format=json'])
 
275
        try:
 
276
            return json.loads(mon_status)
 
277
        except ValueError as v:
 
278
            log("Unable to parse mon_status json: {}. Error: {}".format(
 
279
                mon_status, v.message))
 
280
            raise
 
281
    except CalledProcessError as e:
 
282
        log("mon_status command failed with message: {}".format(
 
283
            e.message))
 
284
        raise
 
285
 
 
286
 
 
287
def hash_monitor_names(service):
 
288
    """
 
289
    Uses the get_mon_map() function to get information about the monitor
 
290
    cluster.
 
291
    Hash the name of each monitor.  Return a sorted list of monitor hashes
 
292
    in an ascending order.
 
293
    :param service: six.string_types. The Ceph user name to run the command under
 
294
    :rtype : dict.   json dict of monitor name, ip address and rank
 
295
    example: {
 
296
        'name': 'ip-172-31-13-165',
 
297
        'rank': 0,
 
298
        'addr': '172.31.13.165:6789/0'}
 
299
    """
 
300
    try:
 
301
        hash_list = []
 
302
        monitor_list = get_mon_map(service=service)
 
303
        if monitor_list['monmap']['mons']:
 
304
            for mon in monitor_list['monmap']['mons']:
 
305
                hash_list.append(
 
306
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
 
307
            return sorted(hash_list)
 
308
        else:
 
309
            return None
 
310
    except (ValueError, CalledProcessError):
 
311
        raise
 
312
 
 
313
 
 
314
def monitor_key_delete(service, key):
 
315
    """
 
316
    Delete a key and value pair from the monitor cluster
 
317
    :param service: six.string_types. The Ceph user name to run the command under
 
318
    Deletes a key value pair on the monitor cluster.
 
319
    :param key: six.string_types.  The key to delete.
 
320
    """
 
321
    try:
 
322
        check_output(
 
323
            ['ceph', '--id', service,
 
324
             'config-key', 'del', str(key)])
 
325
    except CalledProcessError as e:
 
326
        log("Monitor config-key put failed with message: {}".format(
 
327
            e.output))
 
328
        raise
 
329
 
 
330
 
 
331
def monitor_key_set(service, key, value):
 
332
    """
 
333
    Sets a key value pair on the monitor cluster.
 
334
    :param service: six.string_types. The Ceph user name to run the command under
 
335
    :param key: six.string_types.  The key to set.
 
336
    :param value: The value to set.  This will be converted to a string
 
337
        before setting
 
338
    """
 
339
    try:
 
340
        check_output(
 
341
            ['ceph', '--id', service,
 
342
             'config-key', 'put', str(key), str(value)])
 
343
    except CalledProcessError as e:
 
344
        log("Monitor config-key put failed with message: {}".format(
 
345
            e.output))
 
346
        raise
 
347
 
 
348
 
 
349
def monitor_key_get(service, key):
 
350
    """
 
351
    Gets the value of an existing key in the monitor cluster.
 
352
    :param service: six.string_types. The Ceph user name to run the command under
 
353
    :param key: six.string_types.  The key to search for.
 
354
    :return: Returns the value of that key or None if not found.
 
355
    """
 
356
    try:
 
357
        output = check_output(
 
358
            ['ceph', '--id', service,
 
359
             'config-key', 'get', str(key)])
 
360
        return output
 
361
    except CalledProcessError as e:
 
362
        log("Monitor config-key get failed with message: {}".format(
 
363
            e.output))
 
364
        return None
 
365
 
 
366
 
 
367
def monitor_key_exists(service, key):
 
368
    """
 
369
    Searches for the existence of a key in the monitor cluster.
 
370
    :param service: six.string_types. The Ceph user name to run the command under
 
371
    :param key: six.string_types.  The key to search for
 
372
    :return: Returns True if the key exists, False if not and raises an
 
373
     exception if an unknown error occurs. :raise: CalledProcessError if
 
374
     an unknown error occurs
 
375
    """
 
376
    try:
 
377
        check_call(
 
378
            ['ceph', '--id', service,
 
379
             'config-key', 'exists', str(key)])
 
380
        # I can return true here regardless because Ceph returns
 
381
        # ENOENT if the key wasn't found
 
382
        return True
 
383
    except CalledProcessError as e:
 
384
        if e.returncode == errno.ENOENT:
 
385
            return False
 
386
        else:
 
387
            log("Unknown error from ceph config-get exists: {} {}".format(
 
388
                e.returncode, e.output))
 
389
            raise
 
390
 
 
391
 
 
392
def get_erasure_profile(service, name):
 
393
    """
 
394
    :param service: six.string_types. The Ceph user name to run the command under
 
395
    :param name:
 
396
    :return:
 
397
    """
 
398
    try:
 
399
        out = check_output(['ceph', '--id', service,
 
400
                            'osd', 'erasure-code-profile', 'get',
 
401
                            name, '--format=json'])
 
402
        return json.loads(out)
 
403
    except (CalledProcessError, OSError, ValueError):
 
404
        return None
 
405
 
 
406
 
 
407
def pool_set(service, pool_name, key, value):
 
408
    """
 
409
    Sets a value for a RADOS pool in ceph.
 
410
    :param service: six.string_types. The Ceph user name to run the command under
 
411
    :param pool_name: six.string_types
 
412
    :param key: six.string_types
 
413
    :param value:
 
414
    :return: None.  Can raise CalledProcessError
 
415
    """
 
416
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
 
417
    try:
 
418
        check_call(cmd)
 
419
    except CalledProcessError:
 
420
        raise
 
421
 
 
422
 
 
423
def snapshot_pool(service, pool_name, snapshot_name):
 
424
    """
 
425
    Snapshots a RADOS pool in ceph.
 
426
    :param service: six.string_types. The Ceph user name to run the command under
 
427
    :param pool_name: six.string_types
 
428
    :param snapshot_name: six.string_types
 
429
    :return: None.  Can raise CalledProcessError
 
430
    """
 
431
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
 
432
    try:
 
433
        check_call(cmd)
 
434
    except CalledProcessError:
 
435
        raise
 
436
 
 
437
 
 
438
def remove_pool_snapshot(service, pool_name, snapshot_name):
 
439
    """
 
440
    Remove a snapshot from a RADOS pool in ceph.
 
441
    :param service: six.string_types. The Ceph user name to run the command under
 
442
    :param pool_name: six.string_types
 
443
    :param snapshot_name: six.string_types
 
444
    :return: None.  Can raise CalledProcessError
 
445
    """
 
446
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
 
447
    try:
 
448
        check_call(cmd)
 
449
    except CalledProcessError:
 
450
        raise
 
451
 
 
452
 
 
453
# max_bytes should be an int or long
 
454
def set_pool_quota(service, pool_name, max_bytes):
 
455
    """
 
456
    :param service: six.string_types. The Ceph user name to run the command under
 
457
    :param pool_name: six.string_types
 
458
    :param max_bytes: int or long
 
459
    :return: None.  Can raise CalledProcessError
 
460
    """
 
461
    # Set a byte quota on a RADOS pool in ceph.
 
462
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
 
463
           'max_bytes', str(max_bytes)]
 
464
    try:
 
465
        check_call(cmd)
 
466
    except CalledProcessError:
 
467
        raise
 
468
 
 
469
 
 
470
def remove_pool_quota(service, pool_name):
 
471
    """
 
472
    Set a byte quota on a RADOS pool in ceph.
 
473
    :param service: six.string_types. The Ceph user name to run the command under
 
474
    :param pool_name: six.string_types
 
475
    :return: None.  Can raise CalledProcessError
 
476
    """
 
477
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
 
478
    try:
 
479
        check_call(cmd)
 
480
    except CalledProcessError:
 
481
        raise
 
482
 
 
483
 
 
484
def remove_erasure_profile(service, profile_name):
 
485
    """
 
486
    Create a new erasure code profile if one does not already exist for it.  Updates
 
487
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
 
488
    for more details
 
489
    :param service: six.string_types. The Ceph user name to run the command under
 
490
    :param profile_name: six.string_types
 
491
    :return: None.  Can raise CalledProcessError
 
492
    """
 
493
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
 
494
           profile_name]
 
495
    try:
 
496
        check_call(cmd)
 
497
    except CalledProcessError:
 
498
        raise
 
499
 
 
500
 
 
501
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
 
502
                           failure_domain='host',
 
503
                           data_chunks=2, coding_chunks=1,
 
504
                           locality=None, durability_estimator=None):
 
505
    """
 
506
    Create a new erasure code profile if one does not already exist for it.  Updates
 
507
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
 
508
    for more details
 
509
    :param service: six.string_types. The Ceph user name to run the command under
 
510
    :param profile_name: six.string_types
 
511
    :param erasure_plugin_name: six.string_types
 
512
    :param failure_domain: six.string_types.  One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
 
513
        'room', 'root', 'row'])
 
514
    :param data_chunks: int
 
515
    :param coding_chunks: int
 
516
    :param locality: int
 
517
    :param durability_estimator: int
 
518
    :return: None.  Can raise CalledProcessError
 
519
    """
 
520
    # Ensure this failure_domain is allowed by Ceph
 
521
    validator(failure_domain, six.string_types,
 
522
              ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
 
523
 
 
524
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
 
525
           'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
 
526
           'ruleset_failure_domain=' + failure_domain]
 
527
    if locality is not None and durability_estimator is not None:
 
528
        raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
 
529
 
 
530
    # Add plugin specific information
 
531
    if locality is not None:
 
532
        # For local erasure codes
 
533
        cmd.append('l=' + str(locality))
 
534
    if durability_estimator is not None:
 
535
        # For Shec erasure codes
 
536
        cmd.append('c=' + str(durability_estimator))
 
537
 
 
538
    if erasure_profile_exists(service, profile_name):
 
539
        cmd.append('--force')
 
540
 
 
541
    try:
 
542
        check_call(cmd)
 
543
    except CalledProcessError:
 
544
        raise
 
545
 
 
546
 
 
547
def rename_pool(service, old_name, new_name):
 
548
    """
 
549
    Rename a Ceph pool from old_name to new_name
 
550
    :param service: six.string_types. The Ceph user name to run the command under
 
551
    :param old_name: six.string_types
 
552
    :param new_name: six.string_types
 
553
    :return: None
 
554
    """
 
555
    validator(value=old_name, valid_type=six.string_types)
 
556
    validator(value=new_name, valid_type=six.string_types)
 
557
 
 
558
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
 
559
    check_call(cmd)
 
560
 
 
561
 
 
562
def erasure_profile_exists(service, name):
 
563
    """
 
564
    Check to see if an Erasure code profile already exists.
 
565
    :param service: six.string_types. The Ceph user name to run the command under
 
566
    :param name: six.string_types
 
567
    :return: int or None
 
568
    """
 
569
    validator(value=name, valid_type=six.string_types)
 
570
    try:
 
571
        check_call(['ceph', '--id', service,
 
572
                    'osd', 'erasure-code-profile', 'get',
 
573
                    name])
 
574
        return True
 
575
    except CalledProcessError:
 
576
        return False
 
577
 
 
578
 
 
579
def get_cache_mode(service, pool_name):
 
580
    """
 
581
    Find the current caching mode of the pool_name given.
 
582
    :param service: six.string_types. The Ceph user name to run the command under
 
583
    :param pool_name: six.string_types
 
584
    :return: int or None
 
585
    """
 
586
    validator(value=service, valid_type=six.string_types)
 
587
    validator(value=pool_name, valid_type=six.string_types)
 
588
    out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
 
589
    try:
 
590
        osd_json = json.loads(out)
 
591
        for pool in osd_json['pools']:
 
592
            if pool['pool_name'] == pool_name:
 
593
                return pool['cache_mode']
 
594
        return None
 
595
    except ValueError:
 
596
        raise
 
597
 
 
598
 
 
599
def pool_exists(service, name):
 
600
    """Check to see if a RADOS pool already exists."""
 
601
    try:
 
602
        out = check_output(['rados', '--id', service,
 
603
                            'lspools']).decode('UTF-8')
 
604
    except CalledProcessError:
 
605
        return False
 
606
 
 
607
    return name in out
 
608
 
 
609
 
 
610
def get_osds(service):
 
611
    """Return a list of all Ceph Object Storage Daemons currently in the
 
612
    cluster.
 
613
    """
 
614
    version = ceph_version()
 
615
    if version and version >= '0.56':
 
616
        return json.loads(check_output(['ceph', '--id', service,
 
617
                                        'osd', 'ls',
 
618
                                        '--format=json']).decode('UTF-8'))
 
619
 
 
620
    return None
70
621
 
71
622
 
72
623
def install():
96
647
    check_call(cmd)
97
648
 
98
649
 
99
 
def pool_exists(service, name):
100
 
    """Check to see if a RADOS pool already exists."""
101
 
    try:
102
 
        out = check_output(['rados', '--id', service,
103
 
                            'lspools']).decode('UTF-8')
104
 
    except CalledProcessError:
105
 
        return False
106
 
 
107
 
    return name in out
108
 
 
109
 
 
110
 
def get_osds(service):
111
 
    """Return a list of all Ceph Object Storage Daemons currently in the
112
 
    cluster.
113
 
    """
114
 
    version = ceph_version()
115
 
    if version and version >= '0.56':
116
 
        return json.loads(check_output(['ceph', '--id', service,
117
 
                                        'osd', 'ls',
118
 
                                        '--format=json']).decode('UTF-8'))
119
 
 
120
 
    return None
121
 
 
122
 
 
123
 
def create_pool(service, name, replicas=3):
 
650
def update_pool(client, pool, settings):
 
651
    cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
 
652
    for k, v in six.iteritems(settings):
 
653
        cmd.append(k)
 
654
        cmd.append(v)
 
655
 
 
656
    check_call(cmd)
 
657
 
 
658
 
 
659
def create_pool(service, name, replicas=3, pg_num=None):
124
660
    """Create a new RADOS pool."""
125
661
    if pool_exists(service, name):
126
662
        log("Ceph pool {} already exists, skipping creation".format(name),
127
663
            level=WARNING)
128
664
        return
129
665
 
130
 
    # Calculate the number of placement groups based
131
 
    # on upstream recommended best practices.
132
 
    osds = get_osds(service)
133
 
    if osds:
134
 
        pgnum = (len(osds) * 100 // replicas)
135
 
    else:
136
 
        # NOTE(james-page): Default to 200 for older ceph versions
137
 
        # which don't support OSD query from cli
138
 
        pgnum = 200
139
 
 
140
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
141
 
    check_call(cmd)
142
 
 
143
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
144
 
           str(replicas)]
145
 
    check_call(cmd)
 
666
    if not pg_num:
 
667
        # Calculate the number of placement groups based
 
668
        # on upstream recommended best practices.
 
669
        osds = get_osds(service)
 
670
        if osds:
 
671
            pg_num = (len(osds) * 100 // replicas)
 
672
        else:
 
673
            # NOTE(james-page): Default to 200 for older ceph versions
 
674
            # which don't support OSD query from cli
 
675
            pg_num = 200
 
676
 
 
677
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
 
678
    check_call(cmd)
 
679
 
 
680
    update_pool(service, name, settings={'size': str(replicas)})
146
681
 
147
682
 
148
683
def delete_pool(service, name):
197
732
    log('Created new keyfile at %s.' % keyfile, level=INFO)
198
733
 
199
734
 
200
 
def get_ceph_nodes():
201
 
    """Query named relation 'ceph' to determine current nodes."""
 
735
def get_ceph_nodes(relation='ceph'):
 
736
    """Query named relation to determine current nodes."""
202
737
    hosts = []
203
 
    for r_id in relation_ids('ceph'):
 
738
    for r_id in relation_ids(relation):
204
739
        for unit in related_units(r_id):
205
740
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
206
741
 
288
823
    os.chown(data_src_dst, uid, gid)
289
824
 
290
825
 
291
 
# TODO: re-use
292
 
def modprobe(module):
293
 
    """Load a kernel module and configure for auto-load on reboot."""
294
 
    log('Loading kernel module', level=INFO)
295
 
    cmd = ['modprobe', module]
296
 
    check_call(cmd)
297
 
    with open('/etc/modules', 'r+') as modules:
298
 
        if module not in modules.read():
299
 
            modules.write(module)
300
 
 
301
 
 
302
826
def copy_files(src, dst, symlinks=False, ignore=None):
303
827
    """Copy files from src to dst."""
304
828
    for item in os.listdir(src):
363
887
            service_start(svc)
364
888
 
365
889
 
366
 
def ensure_ceph_keyring(service, user=None, group=None):
 
890
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
367
891
    """Ensures a ceph keyring is created for a named service and optionally
368
892
    ensures user and group ownership.
369
893
 
370
894
    Returns False if no ceph key is available in relation state.
371
895
    """
372
896
    key = None
373
 
    for rid in relation_ids('ceph'):
 
897
    for rid in relation_ids(relation):
374
898
        for unit in related_units(rid):
375
899
            key = relation_get('key', rid=rid, unit=unit)
376
900
            if key:
411
935
 
412
936
    The API is versioned and defaults to version 1.
413
937
    """
414
 
    def __init__(self, api_version=1):
 
938
 
 
939
    def __init__(self, api_version=1, request_id=None):
415
940
        self.api_version = api_version
 
941
        if request_id:
 
942
            self.request_id = request_id
 
943
        else:
 
944
            self.request_id = str(uuid.uuid1())
416
945
        self.ops = []
417
946
 
418
 
    def add_op_create_pool(self, name, replica_count=3):
 
947
    def add_op_create_pool(self, name, replica_count=3, pg_num=None):
 
948
        """Adds an operation to create a pool.
 
949
 
 
950
        @param pg_num setting:  optional setting. If not provided, this value
 
951
        will be calculated by the broker based on how many OSDs are in the
 
952
        cluster at the time of creation. Note that, if provided, this value
 
953
        will be capped at the current available maximum.
 
954
        """
419
955
        self.ops.append({'op': 'create-pool', 'name': name,
420
 
                         'replicas': replica_count})
 
956
                         'replicas': replica_count, 'pg_num': pg_num})
 
957
 
 
958
    def set_ops(self, ops):
 
959
        """Set request ops to provided value.
 
960
 
 
961
        Useful for injecting ops that come from a previous request
 
962
        to allow comparisons to ensure validity.
 
963
        """
 
964
        self.ops = ops
421
965
 
422
966
    @property
423
967
    def request(self):
424
 
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
968
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 
969
                           'request-id': self.request_id})
 
970
 
 
971
    def _ops_equal(self, other):
 
972
        if len(self.ops) == len(other.ops):
 
973
            for req_no in range(0, len(self.ops)):
 
974
                for key in ['replicas', 'name', 'op', 'pg_num']:
 
975
                    if self.ops[req_no].get(key) != other.ops[req_no].get(key):
 
976
                        return False
 
977
        else:
 
978
            return False
 
979
        return True
 
980
 
 
981
    def __eq__(self, other):
 
982
        if not isinstance(other, self.__class__):
 
983
            return False
 
984
        if self.api_version == other.api_version and \
 
985
                self._ops_equal(other):
 
986
            return True
 
987
        else:
 
988
            return False
 
989
 
 
990
    def __ne__(self, other):
 
991
        return not self.__eq__(other)
425
992
 
426
993
 
427
994
class CephBrokerRsp(object):
431
998
 
432
999
    The API is versioned and defaults to version 1.
433
1000
    """
 
1001
 
434
1002
    def __init__(self, encoded_rsp):
435
1003
        self.api_version = None
436
1004
        self.rsp = json.loads(encoded_rsp)
437
1005
 
438
1006
    @property
 
1007
    def request_id(self):
 
1008
        return self.rsp.get('request-id')
 
1009
 
 
1010
    @property
439
1011
    def exit_code(self):
440
1012
        return self.rsp.get('exit-code')
441
1013
 
442
1014
    @property
443
1015
    def exit_msg(self):
444
1016
        return self.rsp.get('stderr')
 
1017
 
 
1018
 
 
1019
# Ceph Broker Conversation:
 
1020
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 
1021
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 
1022
# unique id so that the client can identity which CephBrokerRsp is associated
 
1023
# with the request. Ceph will also respond to each client unit individually
 
1024
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 
1025
# via key broker-rsp-glance-0
 
1026
#
 
1027
# To use this the charm can just do something like:
 
1028
#
 
1029
# from charmhelpers.contrib.storage.linux.ceph import (
 
1030
#     send_request_if_needed,
 
1031
#     is_request_complete,
 
1032
#     CephBrokerRq,
 
1033
# )
 
1034
#
 
1035
# @hooks.hook('ceph-relation-changed')
 
1036
# def ceph_changed():
 
1037
#     rq = CephBrokerRq()
 
1038
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 
1039
#
 
1040
#     if is_request_complete(rq):
 
1041
#         <Request complete actions>
 
1042
#     else:
 
1043
#         send_request_if_needed(get_ceph_request())
 
1044
#
 
1045
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 
1046
# of glance having sent a request to ceph which ceph has successfully processed
 
1047
#  'ceph:8': {
 
1048
#      'ceph/0': {
 
1049
#          'auth': 'cephx',
 
1050
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 
1051
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 
1052
#          'ceph-public-address': '10.5.44.103',
 
1053
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 
1054
#          'private-address': '10.5.44.103',
 
1055
#      },
 
1056
#      'glance/0': {
 
1057
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 
1058
#                         '"ops": [{"replicas": 3, "name": "glance", '
 
1059
#                         '"op": "create-pool"}]}'),
 
1060
#          'private-address': '10.5.44.109',
 
1061
#      },
 
1062
#  }
 
1063
 
 
1064
def get_previous_request(rid):
 
1065
    """Return the last ceph broker request sent on a given relation
 
1066
 
 
1067
    @param rid: Relation id to query for request
 
1068
    """
 
1069
    request = None
 
1070
    broker_req = relation_get(attribute='broker_req', rid=rid,
 
1071
                              unit=local_unit())
 
1072
    if broker_req:
 
1073
        request_data = json.loads(broker_req)
 
1074
        request = CephBrokerRq(api_version=request_data['api-version'],
 
1075
                               request_id=request_data['request-id'])
 
1076
        request.set_ops(request_data['ops'])
 
1077
 
 
1078
    return request
 
1079
 
 
1080
 
 
1081
def get_request_states(request, relation='ceph'):
 
1082
    """Return a dict of requests per relation id with their corresponding
 
1083
       completion state.
 
1084
 
 
1085
    This allows a charm, which has a request for ceph, to see whether there is
 
1086
    an equivalent request already being processed and if so what state that
 
1087
    request is in.
 
1088
 
 
1089
    @param request: A CephBrokerRq object
 
1090
    """
 
1091
    complete = []
 
1092
    requests = {}
 
1093
    for rid in relation_ids(relation):
 
1094
        complete = False
 
1095
        previous_request = get_previous_request(rid)
 
1096
        if request == previous_request:
 
1097
            sent = True
 
1098
            complete = is_request_complete_for_rid(previous_request, rid)
 
1099
        else:
 
1100
            sent = False
 
1101
            complete = False
 
1102
 
 
1103
        requests[rid] = {
 
1104
            'sent': sent,
 
1105
            'complete': complete,
 
1106
        }
 
1107
 
 
1108
    return requests
 
1109
 
 
1110
 
 
1111
def is_request_sent(request, relation='ceph'):
 
1112
    """Check to see if a functionally equivalent request has already been sent
 
1113
 
 
1114
    Returns True if a similair request has been sent
 
1115
 
 
1116
    @param request: A CephBrokerRq object
 
1117
    """
 
1118
    states = get_request_states(request, relation=relation)
 
1119
    for rid in states.keys():
 
1120
        if not states[rid]['sent']:
 
1121
            return False
 
1122
 
 
1123
    return True
 
1124
 
 
1125
 
 
1126
def is_request_complete(request, relation='ceph'):
 
1127
    """Check to see if a functionally equivalent request has already been
 
1128
    completed
 
1129
 
 
1130
    Returns True if a similair request has been completed
 
1131
 
 
1132
    @param request: A CephBrokerRq object
 
1133
    """
 
1134
    states = get_request_states(request, relation=relation)
 
1135
    for rid in states.keys():
 
1136
        if not states[rid]['complete']:
 
1137
            return False
 
1138
 
 
1139
    return True
 
1140
 
 
1141
 
 
1142
def is_request_complete_for_rid(request, rid):
 
1143
    """Check if a given request has been completed on the given relation
 
1144
 
 
1145
    @param request: A CephBrokerRq object
 
1146
    @param rid: Relation ID
 
1147
    """
 
1148
    broker_key = get_broker_rsp_key()
 
1149
    for unit in related_units(rid):
 
1150
        rdata = relation_get(rid=rid, unit=unit)
 
1151
        if rdata.get(broker_key):
 
1152
            rsp = CephBrokerRsp(rdata.get(broker_key))
 
1153
            if rsp.request_id == request.request_id:
 
1154
                if not rsp.exit_code:
 
1155
                    return True
 
1156
        else:
 
1157
            # The remote unit sent no reply targeted at this unit so either the
 
1158
            # remote ceph cluster does not support unit targeted replies or it
 
1159
            # has not processed our request yet.
 
1160
            if rdata.get('broker_rsp'):
 
1161
                request_data = json.loads(rdata['broker_rsp'])
 
1162
                if request_data.get('request-id'):
 
1163
                    log('Ignoring legacy broker_rsp without unit key as remote '
 
1164
                        'service supports unit specific replies', level=DEBUG)
 
1165
                else:
 
1166
                    log('Using legacy broker_rsp as remote service does not '
 
1167
                        'supports unit specific replies', level=DEBUG)
 
1168
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 
1169
                    if not rsp.exit_code:
 
1170
                        return True
 
1171
 
 
1172
    return False
 
1173
 
 
1174
 
 
1175
def get_broker_rsp_key():
 
1176
    """Return broker response key for this unit
 
1177
 
 
1178
    This is the key that ceph is going to use to pass request status
 
1179
    information back to this unit
 
1180
    """
 
1181
    return 'broker-rsp-' + local_unit().replace('/', '-')
 
1182
 
 
1183
 
 
1184
def send_request_if_needed(request, relation='ceph'):
 
1185
    """Send broker request if an equivalent request has not already been sent
 
1186
 
 
1187
    @param request: A CephBrokerRq object
 
1188
    """
 
1189
    if is_request_sent(request, relation=relation):
 
1190
        log('Request already sent but not complete, not sending new request',
 
1191
            level=DEBUG)
 
1192
    else:
 
1193
        for rid in relation_ids(relation):
 
1194
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 
1195
            relation_set(relation_id=rid, broker_req=request.request)