~junaidali/charms/trusty/plumgrid-gateway/analyst_opsvm

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: bbaqar at plumgrid
  • Date: 2016-04-25 09:21:09 UTC
  • mfrom: (26.1.2 plumgrid-gateway)
  • Revision ID: bbaqar@plumgrid.com-20160425092109-kweey25bx97pmj80
Merge: Liberty/Mitaka support

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
#  James Page <james.page@ubuntu.com>
24
24
#  Adam Gandelman <adamg@ubuntu.com>
25
25
#
 
26
import bisect
 
27
import errno
 
28
import hashlib
 
29
import six
26
30
 
27
31
import os
28
32
import shutil
29
33
import json
30
34
import time
 
35
import uuid
31
36
 
32
37
from subprocess import (
33
38
    check_call,
35
40
    CalledProcessError,
36
41
)
37
42
from charmhelpers.core.hookenv import (
 
43
    local_unit,
38
44
    relation_get,
39
45
    relation_ids,
 
46
    relation_set,
40
47
    related_units,
41
48
    log,
42
49
    DEBUG,
56
63
    apt_install,
57
64
)
58
65
 
 
66
from charmhelpers.core.kernel import modprobe
 
67
 
59
68
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
60
69
KEYFILE = '/etc/ceph/ceph.client.{}.key'
61
70
 
67
76
err to syslog = {use_syslog}
68
77
clog to syslog = {use_syslog}
69
78
"""
 
79
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
 
80
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
 
81
 
 
82
 
 
83
def validator(value, valid_type, valid_range=None):
 
84
    """
 
85
    Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
 
86
    Example input:
 
87
        validator(value=1,
 
88
                  valid_type=int,
 
89
                  valid_range=[0, 2])
 
90
    This says I'm testing value=1.  It must be an int inclusive in [0,2]
 
91
 
 
92
    :param value: The value to validate
 
93
    :param valid_type: The type that value should be.
 
94
    :param valid_range: A range of values that value can assume.
 
95
    :return:
 
96
    """
 
97
    assert isinstance(value, valid_type), "{} is not a {}".format(
 
98
        value,
 
99
        valid_type)
 
100
    if valid_range is not None:
 
101
        assert isinstance(valid_range, list), \
 
102
            "valid_range must be a list, was given {}".format(valid_range)
 
103
        # If we're dealing with strings
 
104
        if valid_type is six.string_types:
 
105
            assert value in valid_range, \
 
106
                "{} is not in the list {}".format(value, valid_range)
 
107
        # Integer, float should have a min and max
 
108
        else:
 
109
            if len(valid_range) != 2:
 
110
                raise ValueError(
 
111
                    "Invalid valid_range list of {} for {}.  "
 
112
                    "List must be [min,max]".format(valid_range, value))
 
113
            assert value >= valid_range[0], \
 
114
                "{} is less than minimum allowed value of {}".format(
 
115
                    value, valid_range[0])
 
116
            assert value <= valid_range[1], \
 
117
                "{} is greater than maximum allowed value of {}".format(
 
118
                    value, valid_range[1])
 
119
 
 
120
 
 
121
class PoolCreationError(Exception):
 
122
    """
 
123
    A custom error to inform the caller that a pool creation failed.  Provides an error message
 
124
    """
 
125
 
 
126
    def __init__(self, message):
 
127
        super(PoolCreationError, self).__init__(message)
 
128
 
 
129
 
 
130
class Pool(object):
 
131
    """
 
132
    An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
 
133
    Do not call create() on this base class as it will not do anything.  Instantiate a child class and call create().
 
134
    """
 
135
 
 
136
    def __init__(self, service, name):
 
137
        self.service = service
 
138
        self.name = name
 
139
 
 
140
    # Create the pool if it doesn't exist already
 
141
    # To be implemented by subclasses
 
142
    def create(self):
 
143
        pass
 
144
 
 
145
    def add_cache_tier(self, cache_pool, mode):
 
146
        """
 
147
        Adds a new cache tier to an existing pool.
 
148
        :param cache_pool: six.string_types.  The cache tier pool name to add.
 
149
        :param mode: six.string_types. The caching mode to use for this pool.  valid range = ["readonly", "writeback"]
 
150
        :return: None
 
151
        """
 
152
        # Check the input types and values
 
153
        validator(value=cache_pool, valid_type=six.string_types)
 
154
        validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
 
155
 
 
156
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
 
157
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
 
158
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
 
159
        check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
 
160
 
 
161
    def remove_cache_tier(self, cache_pool):
 
162
        """
 
163
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
 
164
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
 
165
        :return: None
 
166
        """
 
167
        # read-only is easy, writeback is much harder
 
168
        mode = get_cache_mode(self.service, cache_pool)
 
169
        version = ceph_version()
 
170
        if mode == 'readonly':
 
171
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
 
172
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
 
173
 
 
174
        elif mode == 'writeback':
 
175
            pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
 
176
                                'cache-mode', cache_pool, 'forward']
 
177
            if version >= '10.1':
 
178
                # Jewel added a mandatory flag
 
179
                pool_forward_cmd.append('--yes-i-really-mean-it')
 
180
 
 
181
            check_call(pool_forward_cmd)
 
182
            # Flush the cache and wait for it to return
 
183
            check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
 
184
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
 
185
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
 
186
 
 
187
    def get_pgs(self, pool_size):
 
188
        """
 
189
        :param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
 
190
            erasure coded pools
 
191
        :return: int.  The number of pgs to use.
 
192
        """
 
193
        validator(value=pool_size, valid_type=int)
 
194
        osd_list = get_osds(self.service)
 
195
        if not osd_list:
 
196
            # NOTE(james-page): Default to 200 for older ceph versions
 
197
            # which don't support OSD query from cli
 
198
            return 200
 
199
 
 
200
        osd_list_length = len(osd_list)
 
201
        # Calculate based on Ceph best practices
 
202
        if osd_list_length < 5:
 
203
            return 128
 
204
        elif 5 < osd_list_length < 10:
 
205
            return 512
 
206
        elif 10 < osd_list_length < 50:
 
207
            return 4096
 
208
        else:
 
209
            estimate = (osd_list_length * 100) / pool_size
 
210
            # Return the next nearest power of 2
 
211
            index = bisect.bisect_right(powers_of_two, estimate)
 
212
            return powers_of_two[index]
 
213
 
 
214
 
 
215
class ReplicatedPool(Pool):
 
216
    def __init__(self, service, name, pg_num=None, replicas=2):
 
217
        super(ReplicatedPool, self).__init__(service=service, name=name)
 
218
        self.replicas = replicas
 
219
        if pg_num is None:
 
220
            self.pg_num = self.get_pgs(self.replicas)
 
221
        else:
 
222
            self.pg_num = pg_num
 
223
 
 
224
    def create(self):
 
225
        if not pool_exists(self.service, self.name):
 
226
            # Create it
 
227
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
 
228
                   self.name, str(self.pg_num)]
 
229
            try:
 
230
                check_call(cmd)
 
231
                # Set the pool replica size
 
232
                update_pool(client=self.service,
 
233
                            pool=self.name,
 
234
                            settings={'size': str(self.replicas)})
 
235
            except CalledProcessError:
 
236
                raise
 
237
 
 
238
 
 
239
# Default jerasure erasure coded pool
 
240
class ErasurePool(Pool):
 
241
    def __init__(self, service, name, erasure_code_profile="default"):
 
242
        super(ErasurePool, self).__init__(service=service, name=name)
 
243
        self.erasure_code_profile = erasure_code_profile
 
244
 
 
245
    def create(self):
 
246
        if not pool_exists(self.service, self.name):
 
247
            # Try to find the erasure profile information so we can properly size the pgs
 
248
            erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
 
249
 
 
250
            # Check for errors
 
251
            if erasure_profile is None:
 
252
                log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
 
253
                    level=ERROR)
 
254
                raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
 
255
            if 'k' not in erasure_profile or 'm' not in erasure_profile:
 
256
                # Error
 
257
                log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
 
258
                    level=ERROR)
 
259
                raise PoolCreationError(
 
260
                    message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
 
261
 
 
262
            pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
 
263
            # Create it
 
264
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
 
265
                   'erasure', self.erasure_code_profile]
 
266
            try:
 
267
                check_call(cmd)
 
268
            except CalledProcessError:
 
269
                raise
 
270
 
 
271
    """Get an existing erasure code profile if it already exists.
 
272
       Returns json formatted output"""
 
273
 
 
274
 
 
275
def get_mon_map(service):
 
276
    """
 
277
    Returns the current monitor map.
 
278
    :param service: six.string_types. The Ceph user name to run the command under
 
279
    :return: json string. :raise: ValueError if the monmap fails to parse.
 
280
      Also raises CalledProcessError if our ceph command fails
 
281
    """
 
282
    try:
 
283
        mon_status = check_output(
 
284
            ['ceph', '--id', service,
 
285
             'mon_status', '--format=json'])
 
286
        try:
 
287
            return json.loads(mon_status)
 
288
        except ValueError as v:
 
289
            log("Unable to parse mon_status json: {}. Error: {}".format(
 
290
                mon_status, v.message))
 
291
            raise
 
292
    except CalledProcessError as e:
 
293
        log("mon_status command failed with message: {}".format(
 
294
            e.message))
 
295
        raise
 
296
 
 
297
 
 
298
def hash_monitor_names(service):
 
299
    """
 
300
    Uses the get_mon_map() function to get information about the monitor
 
301
    cluster.
 
302
    Hash the name of each monitor.  Return a sorted list of monitor hashes
 
303
    in an ascending order.
 
304
    :param service: six.string_types. The Ceph user name to run the command under
 
305
    :rtype : dict.   json dict of monitor name, ip address and rank
 
306
    example: {
 
307
        'name': 'ip-172-31-13-165',
 
308
        'rank': 0,
 
309
        'addr': '172.31.13.165:6789/0'}
 
310
    """
 
311
    try:
 
312
        hash_list = []
 
313
        monitor_list = get_mon_map(service=service)
 
314
        if monitor_list['monmap']['mons']:
 
315
            for mon in monitor_list['monmap']['mons']:
 
316
                hash_list.append(
 
317
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
 
318
            return sorted(hash_list)
 
319
        else:
 
320
            return None
 
321
    except (ValueError, CalledProcessError):
 
322
        raise
 
323
 
 
324
 
 
325
def monitor_key_delete(service, key):
 
326
    """
 
327
    Delete a key and value pair from the monitor cluster
 
328
    :param service: six.string_types. The Ceph user name to run the command under
 
329
    Deletes a key value pair on the monitor cluster.
 
330
    :param key: six.string_types.  The key to delete.
 
331
    """
 
332
    try:
 
333
        check_output(
 
334
            ['ceph', '--id', service,
 
335
             'config-key', 'del', str(key)])
 
336
    except CalledProcessError as e:
 
337
        log("Monitor config-key put failed with message: {}".format(
 
338
            e.output))
 
339
        raise
 
340
 
 
341
 
 
342
def monitor_key_set(service, key, value):
 
343
    """
 
344
    Sets a key value pair on the monitor cluster.
 
345
    :param service: six.string_types. The Ceph user name to run the command under
 
346
    :param key: six.string_types.  The key to set.
 
347
    :param value: The value to set.  This will be converted to a string
 
348
        before setting
 
349
    """
 
350
    try:
 
351
        check_output(
 
352
            ['ceph', '--id', service,
 
353
             'config-key', 'put', str(key), str(value)])
 
354
    except CalledProcessError as e:
 
355
        log("Monitor config-key put failed with message: {}".format(
 
356
            e.output))
 
357
        raise
 
358
 
 
359
 
 
360
def monitor_key_get(service, key):
 
361
    """
 
362
    Gets the value of an existing key in the monitor cluster.
 
363
    :param service: six.string_types. The Ceph user name to run the command under
 
364
    :param key: six.string_types.  The key to search for.
 
365
    :return: Returns the value of that key or None if not found.
 
366
    """
 
367
    try:
 
368
        output = check_output(
 
369
            ['ceph', '--id', service,
 
370
             'config-key', 'get', str(key)])
 
371
        return output
 
372
    except CalledProcessError as e:
 
373
        log("Monitor config-key get failed with message: {}".format(
 
374
            e.output))
 
375
        return None
 
376
 
 
377
 
 
378
def monitor_key_exists(service, key):
 
379
    """
 
380
    Searches for the existence of a key in the monitor cluster.
 
381
    :param service: six.string_types. The Ceph user name to run the command under
 
382
    :param key: six.string_types.  The key to search for
 
383
    :return: Returns True if the key exists, False if not and raises an
 
384
     exception if an unknown error occurs. :raise: CalledProcessError if
 
385
     an unknown error occurs
 
386
    """
 
387
    try:
 
388
        check_call(
 
389
            ['ceph', '--id', service,
 
390
             'config-key', 'exists', str(key)])
 
391
        # I can return true here regardless because Ceph returns
 
392
        # ENOENT if the key wasn't found
 
393
        return True
 
394
    except CalledProcessError as e:
 
395
        if e.returncode == errno.ENOENT:
 
396
            return False
 
397
        else:
 
398
            log("Unknown error from ceph config-get exists: {} {}".format(
 
399
                e.returncode, e.output))
 
400
            raise
 
401
 
 
402
 
 
403
def get_erasure_profile(service, name):
 
404
    """
 
405
    :param service: six.string_types. The Ceph user name to run the command under
 
406
    :param name:
 
407
    :return:
 
408
    """
 
409
    try:
 
410
        out = check_output(['ceph', '--id', service,
 
411
                            'osd', 'erasure-code-profile', 'get',
 
412
                            name, '--format=json'])
 
413
        return json.loads(out)
 
414
    except (CalledProcessError, OSError, ValueError):
 
415
        return None
 
416
 
 
417
 
 
418
def pool_set(service, pool_name, key, value):
 
419
    """
 
420
    Sets a value for a RADOS pool in ceph.
 
421
    :param service: six.string_types. The Ceph user name to run the command under
 
422
    :param pool_name: six.string_types
 
423
    :param key: six.string_types
 
424
    :param value:
 
425
    :return: None.  Can raise CalledProcessError
 
426
    """
 
427
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
 
428
    try:
 
429
        check_call(cmd)
 
430
    except CalledProcessError:
 
431
        raise
 
432
 
 
433
 
 
434
def snapshot_pool(service, pool_name, snapshot_name):
 
435
    """
 
436
    Snapshots a RADOS pool in ceph.
 
437
    :param service: six.string_types. The Ceph user name to run the command under
 
438
    :param pool_name: six.string_types
 
439
    :param snapshot_name: six.string_types
 
440
    :return: None.  Can raise CalledProcessError
 
441
    """
 
442
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
 
443
    try:
 
444
        check_call(cmd)
 
445
    except CalledProcessError:
 
446
        raise
 
447
 
 
448
 
 
449
def remove_pool_snapshot(service, pool_name, snapshot_name):
 
450
    """
 
451
    Remove a snapshot from a RADOS pool in ceph.
 
452
    :param service: six.string_types. The Ceph user name to run the command under
 
453
    :param pool_name: six.string_types
 
454
    :param snapshot_name: six.string_types
 
455
    :return: None.  Can raise CalledProcessError
 
456
    """
 
457
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
 
458
    try:
 
459
        check_call(cmd)
 
460
    except CalledProcessError:
 
461
        raise
 
462
 
 
463
 
 
464
# max_bytes should be an int or long
 
465
def set_pool_quota(service, pool_name, max_bytes):
 
466
    """
 
467
    :param service: six.string_types. The Ceph user name to run the command under
 
468
    :param pool_name: six.string_types
 
469
    :param max_bytes: int or long
 
470
    :return: None.  Can raise CalledProcessError
 
471
    """
 
472
    # Set a byte quota on a RADOS pool in ceph.
 
473
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
 
474
           'max_bytes', str(max_bytes)]
 
475
    try:
 
476
        check_call(cmd)
 
477
    except CalledProcessError:
 
478
        raise
 
479
 
 
480
 
 
481
def remove_pool_quota(service, pool_name):
 
482
    """
 
483
    Set a byte quota on a RADOS pool in ceph.
 
484
    :param service: six.string_types. The Ceph user name to run the command under
 
485
    :param pool_name: six.string_types
 
486
    :return: None.  Can raise CalledProcessError
 
487
    """
 
488
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
 
489
    try:
 
490
        check_call(cmd)
 
491
    except CalledProcessError:
 
492
        raise
 
493
 
 
494
 
 
495
def remove_erasure_profile(service, profile_name):
 
496
    """
 
497
    Create a new erasure code profile if one does not already exist for it.  Updates
 
498
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
 
499
    for more details
 
500
    :param service: six.string_types. The Ceph user name to run the command under
 
501
    :param profile_name: six.string_types
 
502
    :return: None.  Can raise CalledProcessError
 
503
    """
 
504
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
 
505
           profile_name]
 
506
    try:
 
507
        check_call(cmd)
 
508
    except CalledProcessError:
 
509
        raise
 
510
 
 
511
 
 
512
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
 
513
                           failure_domain='host',
 
514
                           data_chunks=2, coding_chunks=1,
 
515
                           locality=None, durability_estimator=None):
 
516
    """
 
517
    Create a new erasure code profile if one does not already exist for it.  Updates
 
518
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
 
519
    for more details
 
520
    :param service: six.string_types. The Ceph user name to run the command under
 
521
    :param profile_name: six.string_types
 
522
    :param erasure_plugin_name: six.string_types
 
523
    :param failure_domain: six.string_types.  One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
 
524
        'room', 'root', 'row'])
 
525
    :param data_chunks: int
 
526
    :param coding_chunks: int
 
527
    :param locality: int
 
528
    :param durability_estimator: int
 
529
    :return: None.  Can raise CalledProcessError
 
530
    """
 
531
    # Ensure this failure_domain is allowed by Ceph
 
532
    validator(failure_domain, six.string_types,
 
533
              ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
 
534
 
 
535
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
 
536
           'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
 
537
           'ruleset_failure_domain=' + failure_domain]
 
538
    if locality is not None and durability_estimator is not None:
 
539
        raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
 
540
 
 
541
    # Add plugin specific information
 
542
    if locality is not None:
 
543
        # For local erasure codes
 
544
        cmd.append('l=' + str(locality))
 
545
    if durability_estimator is not None:
 
546
        # For Shec erasure codes
 
547
        cmd.append('c=' + str(durability_estimator))
 
548
 
 
549
    if erasure_profile_exists(service, profile_name):
 
550
        cmd.append('--force')
 
551
 
 
552
    try:
 
553
        check_call(cmd)
 
554
    except CalledProcessError:
 
555
        raise
 
556
 
 
557
 
 
558
def rename_pool(service, old_name, new_name):
 
559
    """
 
560
    Rename a Ceph pool from old_name to new_name
 
561
    :param service: six.string_types. The Ceph user name to run the command under
 
562
    :param old_name: six.string_types
 
563
    :param new_name: six.string_types
 
564
    :return: None
 
565
    """
 
566
    validator(value=old_name, valid_type=six.string_types)
 
567
    validator(value=new_name, valid_type=six.string_types)
 
568
 
 
569
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
 
570
    check_call(cmd)
 
571
 
 
572
 
 
573
def erasure_profile_exists(service, name):
 
574
    """
 
575
    Check to see if an Erasure code profile already exists.
 
576
    :param service: six.string_types. The Ceph user name to run the command under
 
577
    :param name: six.string_types
 
578
    :return: int or None
 
579
    """
 
580
    validator(value=name, valid_type=six.string_types)
 
581
    try:
 
582
        check_call(['ceph', '--id', service,
 
583
                    'osd', 'erasure-code-profile', 'get',
 
584
                    name])
 
585
        return True
 
586
    except CalledProcessError:
 
587
        return False
 
588
 
 
589
 
 
590
def get_cache_mode(service, pool_name):
 
591
    """
 
592
    Find the current caching mode of the pool_name given.
 
593
    :param service: six.string_types. The Ceph user name to run the command under
 
594
    :param pool_name: six.string_types
 
595
    :return: int or None
 
596
    """
 
597
    validator(value=service, valid_type=six.string_types)
 
598
    validator(value=pool_name, valid_type=six.string_types)
 
599
    out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
 
600
    try:
 
601
        osd_json = json.loads(out)
 
602
        for pool in osd_json['pools']:
 
603
            if pool['pool_name'] == pool_name:
 
604
                return pool['cache_mode']
 
605
        return None
 
606
    except ValueError:
 
607
        raise
 
608
 
 
609
 
 
610
def pool_exists(service, name):
 
611
    """Check to see if a RADOS pool already exists."""
 
612
    try:
 
613
        out = check_output(['rados', '--id', service,
 
614
                            'lspools']).decode('UTF-8')
 
615
    except CalledProcessError:
 
616
        return False
 
617
 
 
618
    return name in out.split()
 
619
 
 
620
 
 
621
def get_osds(service):
 
622
    """Return a list of all Ceph Object Storage Daemons currently in the
 
623
    cluster.
 
624
    """
 
625
    version = ceph_version()
 
626
    if version and version >= '0.56':
 
627
        return json.loads(check_output(['ceph', '--id', service,
 
628
                                        'osd', 'ls',
 
629
                                        '--format=json']).decode('UTF-8'))
 
630
 
 
631
    return None
70
632
 
71
633
 
72
634
def install():
96
658
    check_call(cmd)
97
659
 
98
660
 
99
 
def pool_exists(service, name):
100
 
    """Check to see if a RADOS pool already exists."""
101
 
    try:
102
 
        out = check_output(['rados', '--id', service,
103
 
                            'lspools']).decode('UTF-8')
104
 
    except CalledProcessError:
105
 
        return False
106
 
 
107
 
    return name in out
108
 
 
109
 
 
110
 
def get_osds(service):
111
 
    """Return a list of all Ceph Object Storage Daemons currently in the
112
 
    cluster.
113
 
    """
114
 
    version = ceph_version()
115
 
    if version and version >= '0.56':
116
 
        return json.loads(check_output(['ceph', '--id', service,
117
 
                                        'osd', 'ls',
118
 
                                        '--format=json']).decode('UTF-8'))
119
 
 
120
 
    return None
121
 
 
122
 
 
123
 
def create_pool(service, name, replicas=3):
 
661
def update_pool(client, pool, settings):
 
662
    cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
 
663
    for k, v in six.iteritems(settings):
 
664
        cmd.append(k)
 
665
        cmd.append(v)
 
666
 
 
667
    check_call(cmd)
 
668
 
 
669
 
 
670
def create_pool(service, name, replicas=3, pg_num=None):
124
671
    """Create a new RADOS pool."""
125
672
    if pool_exists(service, name):
126
673
        log("Ceph pool {} already exists, skipping creation".format(name),
127
674
            level=WARNING)
128
675
        return
129
676
 
130
 
    # Calculate the number of placement groups based
131
 
    # on upstream recommended best practices.
132
 
    osds = get_osds(service)
133
 
    if osds:
134
 
        pgnum = (len(osds) * 100 // replicas)
135
 
    else:
136
 
        # NOTE(james-page): Default to 200 for older ceph versions
137
 
        # which don't support OSD query from cli
138
 
        pgnum = 200
139
 
 
140
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
141
 
    check_call(cmd)
142
 
 
143
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
144
 
           str(replicas)]
145
 
    check_call(cmd)
 
677
    if not pg_num:
 
678
        # Calculate the number of placement groups based
 
679
        # on upstream recommended best practices.
 
680
        osds = get_osds(service)
 
681
        if osds:
 
682
            pg_num = (len(osds) * 100 // replicas)
 
683
        else:
 
684
            # NOTE(james-page): Default to 200 for older ceph versions
 
685
            # which don't support OSD query from cli
 
686
            pg_num = 200
 
687
 
 
688
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
 
689
    check_call(cmd)
 
690
 
 
691
    update_pool(service, name, settings={'size': str(replicas)})
146
692
 
147
693
 
148
694
def delete_pool(service, name):
197
743
    log('Created new keyfile at %s.' % keyfile, level=INFO)
198
744
 
199
745
 
200
 
def get_ceph_nodes():
201
 
    """Query named relation 'ceph' to determine current nodes."""
 
746
def get_ceph_nodes(relation='ceph'):
 
747
    """Query named relation to determine current nodes."""
202
748
    hosts = []
203
 
    for r_id in relation_ids('ceph'):
 
749
    for r_id in relation_ids(relation):
204
750
        for unit in related_units(r_id):
205
751
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
206
752
 
288
834
    os.chown(data_src_dst, uid, gid)
289
835
 
290
836
 
291
 
# TODO: re-use
292
 
def modprobe(module):
293
 
    """Load a kernel module and configure for auto-load on reboot."""
294
 
    log('Loading kernel module', level=INFO)
295
 
    cmd = ['modprobe', module]
296
 
    check_call(cmd)
297
 
    with open('/etc/modules', 'r+') as modules:
298
 
        if module not in modules.read():
299
 
            modules.write(module)
300
 
 
301
 
 
302
837
def copy_files(src, dst, symlinks=False, ignore=None):
303
838
    """Copy files from src to dst."""
304
839
    for item in os.listdir(src):
363
898
            service_start(svc)
364
899
 
365
900
 
366
 
def ensure_ceph_keyring(service, user=None, group=None):
 
901
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
367
902
    """Ensures a ceph keyring is created for a named service and optionally
368
903
    ensures user and group ownership.
369
904
 
370
905
    Returns False if no ceph key is available in relation state.
371
906
    """
372
907
    key = None
373
 
    for rid in relation_ids('ceph'):
 
908
    for rid in relation_ids(relation):
374
909
        for unit in related_units(rid):
375
910
            key = relation_get('key', rid=rid, unit=unit)
376
911
            if key:
411
946
 
412
947
    The API is versioned and defaults to version 1.
413
948
    """
414
 
    def __init__(self, api_version=1):
 
949
 
 
950
    def __init__(self, api_version=1, request_id=None):
415
951
        self.api_version = api_version
 
952
        if request_id:
 
953
            self.request_id = request_id
 
954
        else:
 
955
            self.request_id = str(uuid.uuid1())
416
956
        self.ops = []
417
957
 
418
 
    def add_op_create_pool(self, name, replica_count=3):
 
958
    def add_op_create_pool(self, name, replica_count=3, pg_num=None):
 
959
        """Adds an operation to create a pool.
 
960
 
 
961
        @param pg_num setting:  optional setting. If not provided, this value
 
962
        will be calculated by the broker based on how many OSDs are in the
 
963
        cluster at the time of creation. Note that, if provided, this value
 
964
        will be capped at the current available maximum.
 
965
        """
419
966
        self.ops.append({'op': 'create-pool', 'name': name,
420
 
                         'replicas': replica_count})
 
967
                         'replicas': replica_count, 'pg_num': pg_num})
 
968
 
 
969
    def set_ops(self, ops):
 
970
        """Set request ops to provided value.
 
971
 
 
972
        Useful for injecting ops that come from a previous request
 
973
        to allow comparisons to ensure validity.
 
974
        """
 
975
        self.ops = ops
421
976
 
422
977
    @property
423
978
    def request(self):
424
 
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
979
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 
980
                           'request-id': self.request_id})
 
981
 
 
982
    def _ops_equal(self, other):
 
983
        if len(self.ops) == len(other.ops):
 
984
            for req_no in range(0, len(self.ops)):
 
985
                for key in ['replicas', 'name', 'op', 'pg_num']:
 
986
                    if self.ops[req_no].get(key) != other.ops[req_no].get(key):
 
987
                        return False
 
988
        else:
 
989
            return False
 
990
        return True
 
991
 
 
992
    def __eq__(self, other):
 
993
        if not isinstance(other, self.__class__):
 
994
            return False
 
995
        if self.api_version == other.api_version and \
 
996
                self._ops_equal(other):
 
997
            return True
 
998
        else:
 
999
            return False
 
1000
 
 
1001
    def __ne__(self, other):
 
1002
        return not self.__eq__(other)
425
1003
 
426
1004
 
427
1005
class CephBrokerRsp(object):
431
1009
 
432
1010
    The API is versioned and defaults to version 1.
433
1011
    """
 
1012
 
434
1013
    def __init__(self, encoded_rsp):
435
1014
        self.api_version = None
436
1015
        self.rsp = json.loads(encoded_rsp)
437
1016
 
438
1017
    @property
 
1018
    def request_id(self):
 
1019
        return self.rsp.get('request-id')
 
1020
 
 
1021
    @property
439
1022
    def exit_code(self):
440
1023
        return self.rsp.get('exit-code')
441
1024
 
442
1025
    @property
443
1026
    def exit_msg(self):
444
1027
        return self.rsp.get('stderr')
 
1028
 
 
1029
 
 
1030
# Ceph Broker Conversation:
 
1031
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 
1032
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 
1033
# unique id so that the client can identity which CephBrokerRsp is associated
 
1034
# with the request. Ceph will also respond to each client unit individually
 
1035
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 
1036
# via key broker-rsp-glance-0
 
1037
#
 
1038
# To use this the charm can just do something like:
 
1039
#
 
1040
# from charmhelpers.contrib.storage.linux.ceph import (
 
1041
#     send_request_if_needed,
 
1042
#     is_request_complete,
 
1043
#     CephBrokerRq,
 
1044
# )
 
1045
#
 
1046
# @hooks.hook('ceph-relation-changed')
 
1047
# def ceph_changed():
 
1048
#     rq = CephBrokerRq()
 
1049
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 
1050
#
 
1051
#     if is_request_complete(rq):
 
1052
#         <Request complete actions>
 
1053
#     else:
 
1054
#         send_request_if_needed(get_ceph_request())
 
1055
#
 
1056
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 
1057
# of glance having sent a request to ceph which ceph has successfully processed
 
1058
#  'ceph:8': {
 
1059
#      'ceph/0': {
 
1060
#          'auth': 'cephx',
 
1061
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 
1062
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 
1063
#          'ceph-public-address': '10.5.44.103',
 
1064
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 
1065
#          'private-address': '10.5.44.103',
 
1066
#      },
 
1067
#      'glance/0': {
 
1068
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 
1069
#                         '"ops": [{"replicas": 3, "name": "glance", '
 
1070
#                         '"op": "create-pool"}]}'),
 
1071
#          'private-address': '10.5.44.109',
 
1072
#      },
 
1073
#  }
 
1074
 
 
1075
def get_previous_request(rid):
 
1076
    """Return the last ceph broker request sent on a given relation
 
1077
 
 
1078
    @param rid: Relation id to query for request
 
1079
    """
 
1080
    request = None
 
1081
    broker_req = relation_get(attribute='broker_req', rid=rid,
 
1082
                              unit=local_unit())
 
1083
    if broker_req:
 
1084
        request_data = json.loads(broker_req)
 
1085
        request = CephBrokerRq(api_version=request_data['api-version'],
 
1086
                               request_id=request_data['request-id'])
 
1087
        request.set_ops(request_data['ops'])
 
1088
 
 
1089
    return request
 
1090
 
 
1091
 
 
1092
def get_request_states(request, relation='ceph'):
 
1093
    """Return a dict of requests per relation id with their corresponding
 
1094
       completion state.
 
1095
 
 
1096
    This allows a charm, which has a request for ceph, to see whether there is
 
1097
    an equivalent request already being processed and if so what state that
 
1098
    request is in.
 
1099
 
 
1100
    @param request: A CephBrokerRq object
 
1101
    """
 
1102
    complete = []
 
1103
    requests = {}
 
1104
    for rid in relation_ids(relation):
 
1105
        complete = False
 
1106
        previous_request = get_previous_request(rid)
 
1107
        if request == previous_request:
 
1108
            sent = True
 
1109
            complete = is_request_complete_for_rid(previous_request, rid)
 
1110
        else:
 
1111
            sent = False
 
1112
            complete = False
 
1113
 
 
1114
        requests[rid] = {
 
1115
            'sent': sent,
 
1116
            'complete': complete,
 
1117
        }
 
1118
 
 
1119
    return requests
 
1120
 
 
1121
 
 
1122
def is_request_sent(request, relation='ceph'):
 
1123
    """Check to see if a functionally equivalent request has already been sent
 
1124
 
 
1125
    Returns True if a similair request has been sent
 
1126
 
 
1127
    @param request: A CephBrokerRq object
 
1128
    """
 
1129
    states = get_request_states(request, relation=relation)
 
1130
    for rid in states.keys():
 
1131
        if not states[rid]['sent']:
 
1132
            return False
 
1133
 
 
1134
    return True
 
1135
 
 
1136
 
 
1137
def is_request_complete(request, relation='ceph'):
 
1138
    """Check to see if a functionally equivalent request has already been
 
1139
    completed
 
1140
 
 
1141
    Returns True if a similair request has been completed
 
1142
 
 
1143
    @param request: A CephBrokerRq object
 
1144
    """
 
1145
    states = get_request_states(request, relation=relation)
 
1146
    for rid in states.keys():
 
1147
        if not states[rid]['complete']:
 
1148
            return False
 
1149
 
 
1150
    return True
 
1151
 
 
1152
 
 
1153
def is_request_complete_for_rid(request, rid):
 
1154
    """Check if a given request has been completed on the given relation
 
1155
 
 
1156
    @param request: A CephBrokerRq object
 
1157
    @param rid: Relation ID
 
1158
    """
 
1159
    broker_key = get_broker_rsp_key()
 
1160
    for unit in related_units(rid):
 
1161
        rdata = relation_get(rid=rid, unit=unit)
 
1162
        if rdata.get(broker_key):
 
1163
            rsp = CephBrokerRsp(rdata.get(broker_key))
 
1164
            if rsp.request_id == request.request_id:
 
1165
                if not rsp.exit_code:
 
1166
                    return True
 
1167
        else:
 
1168
            # The remote unit sent no reply targeted at this unit so either the
 
1169
            # remote ceph cluster does not support unit targeted replies or it
 
1170
            # has not processed our request yet.
 
1171
            if rdata.get('broker_rsp'):
 
1172
                request_data = json.loads(rdata['broker_rsp'])
 
1173
                if request_data.get('request-id'):
 
1174
                    log('Ignoring legacy broker_rsp without unit key as remote '
 
1175
                        'service supports unit specific replies', level=DEBUG)
 
1176
                else:
 
1177
                    log('Using legacy broker_rsp as remote service does not '
 
1178
                        'supports unit specific replies', level=DEBUG)
 
1179
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 
1180
                    if not rsp.exit_code:
 
1181
                        return True
 
1182
 
 
1183
    return False
 
1184
 
 
1185
 
 
1186
def get_broker_rsp_key():
 
1187
    """Return broker response key for this unit
 
1188
 
 
1189
    This is the key that ceph is going to use to pass request status
 
1190
    information back to this unit
 
1191
    """
 
1192
    return 'broker-rsp-' + local_unit().replace('/', '-')
 
1193
 
 
1194
 
 
1195
def send_request_if_needed(request, relation='ceph'):
 
1196
    """Send broker request if an equivalent request has not already been sent
 
1197
 
 
1198
    @param request: A CephBrokerRq object
 
1199
    """
 
1200
    if is_request_sent(request, relation=relation):
 
1201
        log('Request already sent but not complete, not sending new request',
 
1202
            level=DEBUG)
 
1203
    else:
 
1204
        for rid in relation_ids(relation):
 
1205
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 
1206
            relation_set(relation_id=rid, broker_req=request.request)