~openstack-charmers-next/charms/wily/ceph-osd/trunk

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Gerrit Code Review
  • Author(s): Jenkins
  • Date: 2016-03-25 17:16:26 UTC
  • mfrom: (74.1.2 trunk)
  • Revision ID: review@openstack.org-20160325171626-q3u1teku7zqziyz4
Merge "Revert "Rolling upgrades of ceph osd cluster""

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2014-2015 Canonical Limited.
2
 
#
3
 
# This file is part of charm-helpers.
4
 
#
5
 
# charm-helpers is free software: you can redistribute it and/or modify
6
 
# it under the terms of the GNU Lesser General Public License version 3 as
7
 
# published by the Free Software Foundation.
8
 
#
9
 
# charm-helpers is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU Lesser General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU Lesser General Public License
15
 
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
16
 
 
17
 
#
18
 
# Copyright 2012 Canonical Ltd.
19
 
#
20
 
# This file is sourced from lp:openstack-charm-helpers
21
 
#
22
 
# Authors:
23
 
#  James Page <james.page@ubuntu.com>
24
 
#  Adam Gandelman <adamg@ubuntu.com>
25
 
#
26
 
import bisect
27
 
import errno
28
 
import hashlib
29
 
import six
30
 
 
31
 
import os
32
 
import shutil
33
 
import json
34
 
import time
35
 
import uuid
36
 
 
37
 
from subprocess import (
38
 
    check_call,
39
 
    check_output,
40
 
    CalledProcessError,
41
 
)
42
 
from charmhelpers.core.hookenv import (
43
 
    local_unit,
44
 
    relation_get,
45
 
    relation_ids,
46
 
    relation_set,
47
 
    related_units,
48
 
    log,
49
 
    DEBUG,
50
 
    INFO,
51
 
    WARNING,
52
 
    ERROR,
53
 
)
54
 
from charmhelpers.core.host import (
55
 
    mount,
56
 
    mounts,
57
 
    service_start,
58
 
    service_stop,
59
 
    service_running,
60
 
    umount,
61
 
)
62
 
from charmhelpers.fetch import (
63
 
    apt_install,
64
 
)
65
 
 
66
 
from charmhelpers.core.kernel import modprobe
67
 
 
68
 
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
69
 
KEYFILE = '/etc/ceph/ceph.client.{}.key'
70
 
 
71
 
CEPH_CONF = """[global]
72
 
auth supported = {auth}
73
 
keyring = {keyring}
74
 
mon host = {mon_hosts}
75
 
log to syslog = {use_syslog}
76
 
err to syslog = {use_syslog}
77
 
clog to syslog = {use_syslog}
78
 
"""
79
 
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
80
 
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
81
 
 
82
 
 
83
 
def validator(value, valid_type, valid_range=None):
84
 
    """
85
 
    Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
86
 
    Example input:
87
 
        validator(value=1,
88
 
                  valid_type=int,
89
 
                  valid_range=[0, 2])
90
 
    This says I'm testing value=1.  It must be an int inclusive in [0,2]
91
 
 
92
 
    :param value: The value to validate
93
 
    :param valid_type: The type that value should be.
94
 
    :param valid_range: A range of values that value can assume.
95
 
    :return:
96
 
    """
97
 
    assert isinstance(value, valid_type), "{} is not a {}".format(
98
 
        value,
99
 
        valid_type)
100
 
    if valid_range is not None:
101
 
        assert isinstance(valid_range, list), \
102
 
            "valid_range must be a list, was given {}".format(valid_range)
103
 
        # If we're dealing with strings
104
 
        if valid_type is six.string_types:
105
 
            assert value in valid_range, \
106
 
                "{} is not in the list {}".format(value, valid_range)
107
 
        # Integer, float should have a min and max
108
 
        else:
109
 
            if len(valid_range) != 2:
110
 
                raise ValueError(
111
 
                    "Invalid valid_range list of {} for {}.  "
112
 
                    "List must be [min,max]".format(valid_range, value))
113
 
            assert value >= valid_range[0], \
114
 
                "{} is less than minimum allowed value of {}".format(
115
 
                    value, valid_range[0])
116
 
            assert value <= valid_range[1], \
117
 
                "{} is greater than maximum allowed value of {}".format(
118
 
                    value, valid_range[1])
119
 
 
120
 
 
121
 
class PoolCreationError(Exception):
122
 
    """
123
 
    A custom error to inform the caller that a pool creation failed.  Provides an error message
124
 
    """
125
 
 
126
 
    def __init__(self, message):
127
 
        super(PoolCreationError, self).__init__(message)
128
 
 
129
 
 
130
 
class Pool(object):
131
 
    """
132
 
    An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
133
 
    Do not call create() on this base class as it will not do anything.  Instantiate a child class and call create().
134
 
    """
135
 
 
136
 
    def __init__(self, service, name):
137
 
        self.service = service
138
 
        self.name = name
139
 
 
140
 
    # Create the pool if it doesn't exist already
141
 
    # To be implemented by subclasses
142
 
    def create(self):
143
 
        pass
144
 
 
145
 
    def add_cache_tier(self, cache_pool, mode):
146
 
        """
147
 
        Adds a new cache tier to an existing pool.
148
 
        :param cache_pool: six.string_types.  The cache tier pool name to add.
149
 
        :param mode: six.string_types. The caching mode to use for this pool.  valid range = ["readonly", "writeback"]
150
 
        :return: None
151
 
        """
152
 
        # Check the input types and values
153
 
        validator(value=cache_pool, valid_type=six.string_types)
154
 
        validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
155
 
 
156
 
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
157
 
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
158
 
        check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
159
 
        check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
160
 
 
161
 
    def remove_cache_tier(self, cache_pool):
162
 
        """
163
 
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
164
 
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
165
 
        :return: None
166
 
        """
167
 
        # read-only is easy, writeback is much harder
168
 
        mode = get_cache_mode(self.service, cache_pool)
169
 
        if mode == 'readonly':
170
 
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
171
 
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
172
 
 
173
 
        elif mode == 'writeback':
174
 
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
175
 
            # Flush the cache and wait for it to return
176
 
            check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
177
 
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
178
 
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
179
 
 
180
 
    def get_pgs(self, pool_size):
181
 
        """
182
 
        :param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
183
 
            erasure coded pools
184
 
        :return: int.  The number of pgs to use.
185
 
        """
186
 
        validator(value=pool_size, valid_type=int)
187
 
        osd_list = get_osds(self.service)
188
 
        if not osd_list:
189
 
            # NOTE(james-page): Default to 200 for older ceph versions
190
 
            # which don't support OSD query from cli
191
 
            return 200
192
 
 
193
 
        osd_list_length = len(osd_list)
194
 
        # Calculate based on Ceph best practices
195
 
        if osd_list_length < 5:
196
 
            return 128
197
 
        elif 5 < osd_list_length < 10:
198
 
            return 512
199
 
        elif 10 < osd_list_length < 50:
200
 
            return 4096
201
 
        else:
202
 
            estimate = (osd_list_length * 100) / pool_size
203
 
            # Return the next nearest power of 2
204
 
            index = bisect.bisect_right(powers_of_two, estimate)
205
 
            return powers_of_two[index]
206
 
 
207
 
 
208
 
class ReplicatedPool(Pool):
209
 
    def __init__(self, service, name, pg_num=None, replicas=2):
210
 
        super(ReplicatedPool, self).__init__(service=service, name=name)
211
 
        self.replicas = replicas
212
 
        if pg_num is None:
213
 
            self.pg_num = self.get_pgs(self.replicas)
214
 
        else:
215
 
            self.pg_num = pg_num
216
 
 
217
 
    def create(self):
218
 
        if not pool_exists(self.service, self.name):
219
 
            # Create it
220
 
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
221
 
                   self.name, str(self.pg_num)]
222
 
            try:
223
 
                check_call(cmd)
224
 
            except CalledProcessError:
225
 
                raise
226
 
 
227
 
 
228
 
# Default jerasure erasure coded pool
229
 
class ErasurePool(Pool):
230
 
    def __init__(self, service, name, erasure_code_profile="default"):
231
 
        super(ErasurePool, self).__init__(service=service, name=name)
232
 
        self.erasure_code_profile = erasure_code_profile
233
 
 
234
 
    def create(self):
235
 
        if not pool_exists(self.service, self.name):
236
 
            # Try to find the erasure profile information so we can properly size the pgs
237
 
            erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
238
 
 
239
 
            # Check for errors
240
 
            if erasure_profile is None:
241
 
                log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
242
 
                    level=ERROR)
243
 
                raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
244
 
            if 'k' not in erasure_profile or 'm' not in erasure_profile:
245
 
                # Error
246
 
                log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
247
 
                    level=ERROR)
248
 
                raise PoolCreationError(
249
 
                    message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
250
 
 
251
 
            pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
252
 
            # Create it
253
 
            cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
254
 
                   'erasure', self.erasure_code_profile]
255
 
            try:
256
 
                check_call(cmd)
257
 
            except CalledProcessError:
258
 
                raise
259
 
 
260
 
    """Get an existing erasure code profile if it already exists.
261
 
       Returns json formatted output"""
262
 
 
263
 
 
264
 
def get_mon_map(service):
265
 
    """
266
 
    Returns the current monitor map.
267
 
    :param service: six.string_types. The Ceph user name to run the command under
268
 
    :return: json string. :raise: ValueError if the monmap fails to parse.
269
 
      Also raises CalledProcessError if our ceph command fails
270
 
    """
271
 
    try:
272
 
        mon_status = check_output(
273
 
            ['ceph', '--id', service,
274
 
             'mon_status', '--format=json'])
275
 
        try:
276
 
            return json.loads(mon_status)
277
 
        except ValueError as v:
278
 
            log("Unable to parse mon_status json: {}. Error: {}".format(
279
 
                mon_status, v.message))
280
 
            raise
281
 
    except CalledProcessError as e:
282
 
        log("mon_status command failed with message: {}".format(
283
 
            e.message))
284
 
        raise
285
 
 
286
 
 
287
 
def hash_monitor_names(service):
288
 
    """
289
 
    Uses the get_mon_map() function to get information about the monitor
290
 
    cluster.
291
 
    Hash the name of each monitor.  Return a sorted list of monitor hashes
292
 
    in an ascending order.
293
 
    :param service: six.string_types. The Ceph user name to run the command under
294
 
    :rtype : dict.   json dict of monitor name, ip address and rank
295
 
    example: {
296
 
        'name': 'ip-172-31-13-165',
297
 
        'rank': 0,
298
 
        'addr': '172.31.13.165:6789/0'}
299
 
    """
300
 
    try:
301
 
        hash_list = []
302
 
        monitor_list = get_mon_map(service=service)
303
 
        if monitor_list['monmap']['mons']:
304
 
            for mon in monitor_list['monmap']['mons']:
305
 
                hash_list.append(
306
 
                    hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
307
 
            return sorted(hash_list)
308
 
        else:
309
 
            return None
310
 
    except (ValueError, CalledProcessError):
311
 
        raise
312
 
 
313
 
 
314
 
def monitor_key_delete(service, key):
315
 
    """
316
 
    Delete a key and value pair from the monitor cluster
317
 
    :param service: six.string_types. The Ceph user name to run the command under
318
 
    Deletes a key value pair on the monitor cluster.
319
 
    :param key: six.string_types.  The key to delete.
320
 
    """
321
 
    try:
322
 
        check_output(
323
 
            ['ceph', '--id', service,
324
 
             'config-key', 'del', str(key)])
325
 
    except CalledProcessError as e:
326
 
        log("Monitor config-key put failed with message: {}".format(
327
 
            e.output))
328
 
        raise
329
 
 
330
 
 
331
 
def monitor_key_set(service, key, value):
332
 
    """
333
 
    Sets a key value pair on the monitor cluster.
334
 
    :param service: six.string_types. The Ceph user name to run the command under
335
 
    :param key: six.string_types.  The key to set.
336
 
    :param value: The value to set.  This will be converted to a string
337
 
        before setting
338
 
    """
339
 
    try:
340
 
        check_output(
341
 
            ['ceph', '--id', service,
342
 
             'config-key', 'put', str(key), str(value)])
343
 
    except CalledProcessError as e:
344
 
        log("Monitor config-key put failed with message: {}".format(
345
 
            e.output))
346
 
        raise
347
 
 
348
 
 
349
 
def monitor_key_get(service, key):
350
 
    """
351
 
    Gets the value of an existing key in the monitor cluster.
352
 
    :param service: six.string_types. The Ceph user name to run the command under
353
 
    :param key: six.string_types.  The key to search for.
354
 
    :return: Returns the value of that key or None if not found.
355
 
    """
356
 
    try:
357
 
        output = check_output(
358
 
            ['ceph', '--id', service,
359
 
             'config-key', 'get', str(key)])
360
 
        return output
361
 
    except CalledProcessError as e:
362
 
        log("Monitor config-key get failed with message: {}".format(
363
 
            e.output))
364
 
        return None
365
 
 
366
 
 
367
 
def monitor_key_exists(service, key):
368
 
    """
369
 
    Searches for the existence of a key in the monitor cluster.
370
 
    :param service: six.string_types. The Ceph user name to run the command under
371
 
    :param key: six.string_types.  The key to search for
372
 
    :return: Returns True if the key exists, False if not and raises an
373
 
     exception if an unknown error occurs. :raise: CalledProcessError if
374
 
     an unknown error occurs
375
 
    """
376
 
    try:
377
 
        check_call(
378
 
            ['ceph', '--id', service,
379
 
             'config-key', 'exists', str(key)])
380
 
        # I can return true here regardless because Ceph returns
381
 
        # ENOENT if the key wasn't found
382
 
        return True
383
 
    except CalledProcessError as e:
384
 
        if e.returncode == errno.ENOENT:
385
 
            return False
386
 
        else:
387
 
            log("Unknown error from ceph config-get exists: {} {}".format(
388
 
                e.returncode, e.output))
389
 
            raise
390
 
 
391
 
 
392
 
def get_erasure_profile(service, name):
393
 
    """
394
 
    :param service: six.string_types. The Ceph user name to run the command under
395
 
    :param name:
396
 
    :return:
397
 
    """
398
 
    try:
399
 
        out = check_output(['ceph', '--id', service,
400
 
                            'osd', 'erasure-code-profile', 'get',
401
 
                            name, '--format=json'])
402
 
        return json.loads(out)
403
 
    except (CalledProcessError, OSError, ValueError):
404
 
        return None
405
 
 
406
 
 
407
 
def pool_set(service, pool_name, key, value):
408
 
    """
409
 
    Sets a value for a RADOS pool in ceph.
410
 
    :param service: six.string_types. The Ceph user name to run the command under
411
 
    :param pool_name: six.string_types
412
 
    :param key: six.string_types
413
 
    :param value:
414
 
    :return: None.  Can raise CalledProcessError
415
 
    """
416
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
417
 
    try:
418
 
        check_call(cmd)
419
 
    except CalledProcessError:
420
 
        raise
421
 
 
422
 
 
423
 
def snapshot_pool(service, pool_name, snapshot_name):
424
 
    """
425
 
    Snapshots a RADOS pool in ceph.
426
 
    :param service: six.string_types. The Ceph user name to run the command under
427
 
    :param pool_name: six.string_types
428
 
    :param snapshot_name: six.string_types
429
 
    :return: None.  Can raise CalledProcessError
430
 
    """
431
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
432
 
    try:
433
 
        check_call(cmd)
434
 
    except CalledProcessError:
435
 
        raise
436
 
 
437
 
 
438
 
def remove_pool_snapshot(service, pool_name, snapshot_name):
439
 
    """
440
 
    Remove a snapshot from a RADOS pool in ceph.
441
 
    :param service: six.string_types. The Ceph user name to run the command under
442
 
    :param pool_name: six.string_types
443
 
    :param snapshot_name: six.string_types
444
 
    :return: None.  Can raise CalledProcessError
445
 
    """
446
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
447
 
    try:
448
 
        check_call(cmd)
449
 
    except CalledProcessError:
450
 
        raise
451
 
 
452
 
 
453
 
# max_bytes should be an int or long
454
 
def set_pool_quota(service, pool_name, max_bytes):
455
 
    """
456
 
    :param service: six.string_types. The Ceph user name to run the command under
457
 
    :param pool_name: six.string_types
458
 
    :param max_bytes: int or long
459
 
    :return: None.  Can raise CalledProcessError
460
 
    """
461
 
    # Set a byte quota on a RADOS pool in ceph.
462
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
463
 
           'max_bytes', str(max_bytes)]
464
 
    try:
465
 
        check_call(cmd)
466
 
    except CalledProcessError:
467
 
        raise
468
 
 
469
 
 
470
 
def remove_pool_quota(service, pool_name):
471
 
    """
472
 
    Set a byte quota on a RADOS pool in ceph.
473
 
    :param service: six.string_types. The Ceph user name to run the command under
474
 
    :param pool_name: six.string_types
475
 
    :return: None.  Can raise CalledProcessError
476
 
    """
477
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
478
 
    try:
479
 
        check_call(cmd)
480
 
    except CalledProcessError:
481
 
        raise
482
 
 
483
 
 
484
 
def remove_erasure_profile(service, profile_name):
485
 
    """
486
 
    Create a new erasure code profile if one does not already exist for it.  Updates
487
 
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
488
 
    for more details
489
 
    :param service: six.string_types. The Ceph user name to run the command under
490
 
    :param profile_name: six.string_types
491
 
    :return: None.  Can raise CalledProcessError
492
 
    """
493
 
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
494
 
           profile_name]
495
 
    try:
496
 
        check_call(cmd)
497
 
    except CalledProcessError:
498
 
        raise
499
 
 
500
 
 
501
 
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
502
 
                           failure_domain='host',
503
 
                           data_chunks=2, coding_chunks=1,
504
 
                           locality=None, durability_estimator=None):
505
 
    """
506
 
    Create a new erasure code profile if one does not already exist for it.  Updates
507
 
    the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
508
 
    for more details
509
 
    :param service: six.string_types. The Ceph user name to run the command under
510
 
    :param profile_name: six.string_types
511
 
    :param erasure_plugin_name: six.string_types
512
 
    :param failure_domain: six.string_types.  One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
513
 
        'room', 'root', 'row'])
514
 
    :param data_chunks: int
515
 
    :param coding_chunks: int
516
 
    :param locality: int
517
 
    :param durability_estimator: int
518
 
    :return: None.  Can raise CalledProcessError
519
 
    """
520
 
    # Ensure this failure_domain is allowed by Ceph
521
 
    validator(failure_domain, six.string_types,
522
 
              ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
523
 
 
524
 
    cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
525
 
           'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
526
 
           'ruleset_failure_domain=' + failure_domain]
527
 
    if locality is not None and durability_estimator is not None:
528
 
        raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
529
 
 
530
 
    # Add plugin specific information
531
 
    if locality is not None:
532
 
        # For local erasure codes
533
 
        cmd.append('l=' + str(locality))
534
 
    if durability_estimator is not None:
535
 
        # For Shec erasure codes
536
 
        cmd.append('c=' + str(durability_estimator))
537
 
 
538
 
    if erasure_profile_exists(service, profile_name):
539
 
        cmd.append('--force')
540
 
 
541
 
    try:
542
 
        check_call(cmd)
543
 
    except CalledProcessError:
544
 
        raise
545
 
 
546
 
 
547
 
def rename_pool(service, old_name, new_name):
548
 
    """
549
 
    Rename a Ceph pool from old_name to new_name
550
 
    :param service: six.string_types. The Ceph user name to run the command under
551
 
    :param old_name: six.string_types
552
 
    :param new_name: six.string_types
553
 
    :return: None
554
 
    """
555
 
    validator(value=old_name, valid_type=six.string_types)
556
 
    validator(value=new_name, valid_type=six.string_types)
557
 
 
558
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
559
 
    check_call(cmd)
560
 
 
561
 
 
562
 
def erasure_profile_exists(service, name):
563
 
    """
564
 
    Check to see if an Erasure code profile already exists.
565
 
    :param service: six.string_types. The Ceph user name to run the command under
566
 
    :param name: six.string_types
567
 
    :return: int or None
568
 
    """
569
 
    validator(value=name, valid_type=six.string_types)
570
 
    try:
571
 
        check_call(['ceph', '--id', service,
572
 
                    'osd', 'erasure-code-profile', 'get',
573
 
                    name])
574
 
        return True
575
 
    except CalledProcessError:
576
 
        return False
577
 
 
578
 
 
579
 
def get_cache_mode(service, pool_name):
580
 
    """
581
 
    Find the current caching mode of the pool_name given.
582
 
    :param service: six.string_types. The Ceph user name to run the command under
583
 
    :param pool_name: six.string_types
584
 
    :return: int or None
585
 
    """
586
 
    validator(value=service, valid_type=six.string_types)
587
 
    validator(value=pool_name, valid_type=six.string_types)
588
 
    out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
589
 
    try:
590
 
        osd_json = json.loads(out)
591
 
        for pool in osd_json['pools']:
592
 
            if pool['pool_name'] == pool_name:
593
 
                return pool['cache_mode']
594
 
        return None
595
 
    except ValueError:
596
 
        raise
597
 
 
598
 
 
599
 
def pool_exists(service, name):
600
 
    """Check to see if a RADOS pool already exists."""
601
 
    try:
602
 
        out = check_output(['rados', '--id', service,
603
 
                            'lspools']).decode('UTF-8')
604
 
    except CalledProcessError:
605
 
        return False
606
 
 
607
 
    return name in out
608
 
 
609
 
 
610
 
def get_osds(service):
611
 
    """Return a list of all Ceph Object Storage Daemons currently in the
612
 
    cluster.
613
 
    """
614
 
    version = ceph_version()
615
 
    if version and version >= '0.56':
616
 
        return json.loads(check_output(['ceph', '--id', service,
617
 
                                        'osd', 'ls',
618
 
                                        '--format=json']).decode('UTF-8'))
619
 
 
620
 
    return None
621
 
 
622
 
 
623
 
def install():
624
 
    """Basic Ceph client installation."""
625
 
    ceph_dir = "/etc/ceph"
626
 
    if not os.path.exists(ceph_dir):
627
 
        os.mkdir(ceph_dir)
628
 
 
629
 
    apt_install('ceph-common', fatal=True)
630
 
 
631
 
 
632
 
def rbd_exists(service, pool, rbd_img):
633
 
    """Check to see if a RADOS block device exists."""
634
 
    try:
635
 
        out = check_output(['rbd', 'list', '--id',
636
 
                            service, '--pool', pool]).decode('UTF-8')
637
 
    except CalledProcessError:
638
 
        return False
639
 
 
640
 
    return rbd_img in out
641
 
 
642
 
 
643
 
def create_rbd_image(service, pool, image, sizemb):
644
 
    """Create a new RADOS block device."""
645
 
    cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
646
 
           '--pool', pool]
647
 
    check_call(cmd)
648
 
 
649
 
 
650
 
def update_pool(client, pool, settings):
651
 
    cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool]
652
 
    for k, v in six.iteritems(settings):
653
 
        cmd.append(k)
654
 
        cmd.append(v)
655
 
 
656
 
    check_call(cmd)
657
 
 
658
 
 
659
 
def create_pool(service, name, replicas=3, pg_num=None):
660
 
    """Create a new RADOS pool."""
661
 
    if pool_exists(service, name):
662
 
        log("Ceph pool {} already exists, skipping creation".format(name),
663
 
            level=WARNING)
664
 
        return
665
 
 
666
 
    if not pg_num:
667
 
        # Calculate the number of placement groups based
668
 
        # on upstream recommended best practices.
669
 
        osds = get_osds(service)
670
 
        if osds:
671
 
            pg_num = (len(osds) * 100 // replicas)
672
 
        else:
673
 
            # NOTE(james-page): Default to 200 for older ceph versions
674
 
            # which don't support OSD query from cli
675
 
            pg_num = 200
676
 
 
677
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)]
678
 
    check_call(cmd)
679
 
 
680
 
    update_pool(service, name, settings={'size': str(replicas)})
681
 
 
682
 
 
683
 
def delete_pool(service, name):
684
 
    """Delete a RADOS pool from ceph."""
685
 
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
686
 
           '--yes-i-really-really-mean-it']
687
 
    check_call(cmd)
688
 
 
689
 
 
690
 
def _keyfile_path(service):
691
 
    return KEYFILE.format(service)
692
 
 
693
 
 
694
 
def _keyring_path(service):
695
 
    return KEYRING.format(service)
696
 
 
697
 
 
698
 
def create_keyring(service, key):
699
 
    """Create a new Ceph keyring containing key."""
700
 
    keyring = _keyring_path(service)
701
 
    if os.path.exists(keyring):
702
 
        log('Ceph keyring exists at %s.' % keyring, level=WARNING)
703
 
        return
704
 
 
705
 
    cmd = ['ceph-authtool', keyring, '--create-keyring',
706
 
           '--name=client.{}'.format(service), '--add-key={}'.format(key)]
707
 
    check_call(cmd)
708
 
    log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
709
 
 
710
 
 
711
 
def delete_keyring(service):
712
 
    """Delete an existing Ceph keyring."""
713
 
    keyring = _keyring_path(service)
714
 
    if not os.path.exists(keyring):
715
 
        log('Keyring does not exist at %s' % keyring, level=WARNING)
716
 
        return
717
 
 
718
 
    os.remove(keyring)
719
 
    log('Deleted ring at %s.' % keyring, level=INFO)
720
 
 
721
 
 
722
 
def create_key_file(service, key):
723
 
    """Create a file containing key."""
724
 
    keyfile = _keyfile_path(service)
725
 
    if os.path.exists(keyfile):
726
 
        log('Keyfile exists at %s.' % keyfile, level=WARNING)
727
 
        return
728
 
 
729
 
    with open(keyfile, 'w') as fd:
730
 
        fd.write(key)
731
 
 
732
 
    log('Created new keyfile at %s.' % keyfile, level=INFO)
733
 
 
734
 
 
735
 
def get_ceph_nodes(relation='ceph'):
736
 
    """Query named relation to determine current nodes."""
737
 
    hosts = []
738
 
    for r_id in relation_ids(relation):
739
 
        for unit in related_units(r_id):
740
 
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
741
 
 
742
 
    return hosts
743
 
 
744
 
 
745
 
def configure(service, key, auth, use_syslog):
746
 
    """Perform basic configuration of Ceph."""
747
 
    create_keyring(service, key)
748
 
    create_key_file(service, key)
749
 
    hosts = get_ceph_nodes()
750
 
    with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
751
 
        ceph_conf.write(CEPH_CONF.format(auth=auth,
752
 
                                         keyring=_keyring_path(service),
753
 
                                         mon_hosts=",".join(map(str, hosts)),
754
 
                                         use_syslog=use_syslog))
755
 
    modprobe('rbd')
756
 
 
757
 
 
758
 
def image_mapped(name):
759
 
    """Determine whether a RADOS block device is mapped locally."""
760
 
    try:
761
 
        out = check_output(['rbd', 'showmapped']).decode('UTF-8')
762
 
    except CalledProcessError:
763
 
        return False
764
 
 
765
 
    return name in out
766
 
 
767
 
 
768
 
def map_block_storage(service, pool, image):
769
 
    """Map a RADOS block device for local use."""
770
 
    cmd = [
771
 
        'rbd',
772
 
        'map',
773
 
        '{}/{}'.format(pool, image),
774
 
        '--user',
775
 
        service,
776
 
        '--secret',
777
 
        _keyfile_path(service),
778
 
    ]
779
 
    check_call(cmd)
780
 
 
781
 
 
782
 
def filesystem_mounted(fs):
783
 
    """Determine whether a filesytems is already mounted."""
784
 
    return fs in [f for f, m in mounts()]
785
 
 
786
 
 
787
 
def make_filesystem(blk_device, fstype='ext4', timeout=10):
788
 
    """Make a new filesystem on the specified block device."""
789
 
    count = 0
790
 
    e_noent = os.errno.ENOENT
791
 
    while not os.path.exists(blk_device):
792
 
        if count >= timeout:
793
 
            log('Gave up waiting on block device %s' % blk_device,
794
 
                level=ERROR)
795
 
            raise IOError(e_noent, os.strerror(e_noent), blk_device)
796
 
 
797
 
        log('Waiting for block device %s to appear' % blk_device,
798
 
            level=DEBUG)
799
 
        count += 1
800
 
        time.sleep(1)
801
 
    else:
802
 
        log('Formatting block device %s as filesystem %s.' %
803
 
            (blk_device, fstype), level=INFO)
804
 
        check_call(['mkfs', '-t', fstype, blk_device])
805
 
 
806
 
 
807
 
def place_data_on_block_device(blk_device, data_src_dst):
808
 
    """Migrate data in data_src_dst to blk_device and then remount."""
809
 
    # mount block device into /mnt
810
 
    mount(blk_device, '/mnt')
811
 
    # copy data to /mnt
812
 
    copy_files(data_src_dst, '/mnt')
813
 
    # umount block device
814
 
    umount('/mnt')
815
 
    # Grab user/group ID's from original source
816
 
    _dir = os.stat(data_src_dst)
817
 
    uid = _dir.st_uid
818
 
    gid = _dir.st_gid
819
 
    # re-mount where the data should originally be
820
 
    # TODO: persist is currently a NO-OP in core.host
821
 
    mount(blk_device, data_src_dst, persist=True)
822
 
    # ensure original ownership of new mount.
823
 
    os.chown(data_src_dst, uid, gid)
824
 
 
825
 
 
826
 
def copy_files(src, dst, symlinks=False, ignore=None):
827
 
    """Copy files from src to dst."""
828
 
    for item in os.listdir(src):
829
 
        s = os.path.join(src, item)
830
 
        d = os.path.join(dst, item)
831
 
        if os.path.isdir(s):
832
 
            shutil.copytree(s, d, symlinks, ignore)
833
 
        else:
834
 
            shutil.copy2(s, d)
835
 
 
836
 
 
837
 
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
838
 
                        blk_device, fstype, system_services=[],
839
 
                        replicas=3):
840
 
    """NOTE: This function must only be called from a single service unit for
841
 
    the same rbd_img otherwise data loss will occur.
842
 
 
843
 
    Ensures given pool and RBD image exists, is mapped to a block device,
844
 
    and the device is formatted and mounted at the given mount_point.
845
 
 
846
 
    If formatting a device for the first time, data existing at mount_point
847
 
    will be migrated to the RBD device before being re-mounted.
848
 
 
849
 
    All services listed in system_services will be stopped prior to data
850
 
    migration and restarted when complete.
851
 
    """
852
 
    # Ensure pool, RBD image, RBD mappings are in place.
853
 
    if not pool_exists(service, pool):
854
 
        log('Creating new pool {}.'.format(pool), level=INFO)
855
 
        create_pool(service, pool, replicas=replicas)
856
 
 
857
 
    if not rbd_exists(service, pool, rbd_img):
858
 
        log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
859
 
        create_rbd_image(service, pool, rbd_img, sizemb)
860
 
 
861
 
    if not image_mapped(rbd_img):
862
 
        log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
863
 
            level=INFO)
864
 
        map_block_storage(service, pool, rbd_img)
865
 
 
866
 
    # make file system
867
 
    # TODO: What happens if for whatever reason this is run again and
868
 
    # the data is already in the rbd device and/or is mounted??
869
 
    # When it is mounted already, it will fail to make the fs
870
 
    # XXX: This is really sketchy!  Need to at least add an fstab entry
871
 
    #      otherwise this hook will blow away existing data if its executed
872
 
    #      after a reboot.
873
 
    if not filesystem_mounted(mount_point):
874
 
        make_filesystem(blk_device, fstype)
875
 
 
876
 
        for svc in system_services:
877
 
            if service_running(svc):
878
 
                log('Stopping services {} prior to migrating data.'
879
 
                    .format(svc), level=DEBUG)
880
 
                service_stop(svc)
881
 
 
882
 
        place_data_on_block_device(blk_device, mount_point)
883
 
 
884
 
        for svc in system_services:
885
 
            log('Starting service {} after migrating data.'
886
 
                .format(svc), level=DEBUG)
887
 
            service_start(svc)
888
 
 
889
 
 
890
 
def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
891
 
    """Ensures a ceph keyring is created for a named service and optionally
892
 
    ensures user and group ownership.
893
 
 
894
 
    Returns False if no ceph key is available in relation state.
895
 
    """
896
 
    key = None
897
 
    for rid in relation_ids(relation):
898
 
        for unit in related_units(rid):
899
 
            key = relation_get('key', rid=rid, unit=unit)
900
 
            if key:
901
 
                break
902
 
 
903
 
    if not key:
904
 
        return False
905
 
 
906
 
    create_keyring(service=service, key=key)
907
 
    keyring = _keyring_path(service)
908
 
    if user and group:
909
 
        check_call(['chown', '%s.%s' % (user, group), keyring])
910
 
 
911
 
    return True
912
 
 
913
 
 
914
 
def ceph_version():
915
 
    """Retrieve the local version of ceph."""
916
 
    if os.path.exists('/usr/bin/ceph'):
917
 
        cmd = ['ceph', '-v']
918
 
        output = check_output(cmd).decode('US-ASCII')
919
 
        output = output.split()
920
 
        if len(output) > 3:
921
 
            return output[2]
922
 
        else:
923
 
            return None
924
 
    else:
925
 
        return None
926
 
 
927
 
 
928
 
class CephBrokerRq(object):
929
 
    """Ceph broker request.
930
 
 
931
 
    Multiple operations can be added to a request and sent to the Ceph broker
932
 
    to be executed.
933
 
 
934
 
    Request is json-encoded for sending over the wire.
935
 
 
936
 
    The API is versioned and defaults to version 1.
937
 
    """
938
 
 
939
 
    def __init__(self, api_version=1, request_id=None):
940
 
        self.api_version = api_version
941
 
        if request_id:
942
 
            self.request_id = request_id
943
 
        else:
944
 
            self.request_id = str(uuid.uuid1())
945
 
        self.ops = []
946
 
 
947
 
    def add_op_create_pool(self, name, replica_count=3, pg_num=None):
948
 
        """Adds an operation to create a pool.
949
 
 
950
 
        @param pg_num setting:  optional setting. If not provided, this value
951
 
        will be calculated by the broker based on how many OSDs are in the
952
 
        cluster at the time of creation. Note that, if provided, this value
953
 
        will be capped at the current available maximum.
954
 
        """
955
 
        self.ops.append({'op': 'create-pool', 'name': name,
956
 
                         'replicas': replica_count, 'pg_num': pg_num})
957
 
 
958
 
    def set_ops(self, ops):
959
 
        """Set request ops to provided value.
960
 
 
961
 
        Useful for injecting ops that come from a previous request
962
 
        to allow comparisons to ensure validity.
963
 
        """
964
 
        self.ops = ops
965
 
 
966
 
    @property
967
 
    def request(self):
968
 
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
969
 
                           'request-id': self.request_id})
970
 
 
971
 
    def _ops_equal(self, other):
972
 
        if len(self.ops) == len(other.ops):
973
 
            for req_no in range(0, len(self.ops)):
974
 
                for key in ['replicas', 'name', 'op', 'pg_num']:
975
 
                    if self.ops[req_no].get(key) != other.ops[req_no].get(key):
976
 
                        return False
977
 
        else:
978
 
            return False
979
 
        return True
980
 
 
981
 
    def __eq__(self, other):
982
 
        if not isinstance(other, self.__class__):
983
 
            return False
984
 
        if self.api_version == other.api_version and \
985
 
                self._ops_equal(other):
986
 
            return True
987
 
        else:
988
 
            return False
989
 
 
990
 
    def __ne__(self, other):
991
 
        return not self.__eq__(other)
992
 
 
993
 
 
994
 
class CephBrokerRsp(object):
995
 
    """Ceph broker response.
996
 
 
997
 
    Response is json-decoded and contents provided as methods/properties.
998
 
 
999
 
    The API is versioned and defaults to version 1.
1000
 
    """
1001
 
 
1002
 
    def __init__(self, encoded_rsp):
1003
 
        self.api_version = None
1004
 
        self.rsp = json.loads(encoded_rsp)
1005
 
 
1006
 
    @property
1007
 
    def request_id(self):
1008
 
        return self.rsp.get('request-id')
1009
 
 
1010
 
    @property
1011
 
    def exit_code(self):
1012
 
        return self.rsp.get('exit-code')
1013
 
 
1014
 
    @property
1015
 
    def exit_msg(self):
1016
 
        return self.rsp.get('stderr')
1017
 
 
1018
 
 
1019
 
# Ceph Broker Conversation:
1020
 
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1021
 
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1022
 
# unique id so that the client can identity which CephBrokerRsp is associated
1023
 
# with the request. Ceph will also respond to each client unit individually
1024
 
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1025
 
# via key broker-rsp-glance-0
1026
 
#
1027
 
# To use this the charm can just do something like:
1028
 
#
1029
 
# from charmhelpers.contrib.storage.linux.ceph import (
1030
 
#     send_request_if_needed,
1031
 
#     is_request_complete,
1032
 
#     CephBrokerRq,
1033
 
# )
1034
 
#
1035
 
# @hooks.hook('ceph-relation-changed')
1036
 
# def ceph_changed():
1037
 
#     rq = CephBrokerRq()
1038
 
#     rq.add_op_create_pool(name='poolname', replica_count=3)
1039
 
#
1040
 
#     if is_request_complete(rq):
1041
 
#         <Request complete actions>
1042
 
#     else:
1043
 
#         send_request_if_needed(get_ceph_request())
1044
 
#
1045
 
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1046
 
# of glance having sent a request to ceph which ceph has successfully processed
1047
 
#  'ceph:8': {
1048
 
#      'ceph/0': {
1049
 
#          'auth': 'cephx',
1050
 
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1051
 
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1052
 
#          'ceph-public-address': '10.5.44.103',
1053
 
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1054
 
#          'private-address': '10.5.44.103',
1055
 
#      },
1056
 
#      'glance/0': {
1057
 
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1058
 
#                         '"ops": [{"replicas": 3, "name": "glance", '
1059
 
#                         '"op": "create-pool"}]}'),
1060
 
#          'private-address': '10.5.44.109',
1061
 
#      },
1062
 
#  }
1063
 
 
1064
 
def get_previous_request(rid):
1065
 
    """Return the last ceph broker request sent on a given relation
1066
 
 
1067
 
    @param rid: Relation id to query for request
1068
 
    """
1069
 
    request = None
1070
 
    broker_req = relation_get(attribute='broker_req', rid=rid,
1071
 
                              unit=local_unit())
1072
 
    if broker_req:
1073
 
        request_data = json.loads(broker_req)
1074
 
        request = CephBrokerRq(api_version=request_data['api-version'],
1075
 
                               request_id=request_data['request-id'])
1076
 
        request.set_ops(request_data['ops'])
1077
 
 
1078
 
    return request
1079
 
 
1080
 
 
1081
 
def get_request_states(request, relation='ceph'):
1082
 
    """Return a dict of requests per relation id with their corresponding
1083
 
       completion state.
1084
 
 
1085
 
    This allows a charm, which has a request for ceph, to see whether there is
1086
 
    an equivalent request already being processed and if so what state that
1087
 
    request is in.
1088
 
 
1089
 
    @param request: A CephBrokerRq object
1090
 
    """
1091
 
    complete = []
1092
 
    requests = {}
1093
 
    for rid in relation_ids(relation):
1094
 
        complete = False
1095
 
        previous_request = get_previous_request(rid)
1096
 
        if request == previous_request:
1097
 
            sent = True
1098
 
            complete = is_request_complete_for_rid(previous_request, rid)
1099
 
        else:
1100
 
            sent = False
1101
 
            complete = False
1102
 
 
1103
 
        requests[rid] = {
1104
 
            'sent': sent,
1105
 
            'complete': complete,
1106
 
        }
1107
 
 
1108
 
    return requests
1109
 
 
1110
 
 
1111
 
def is_request_sent(request, relation='ceph'):
1112
 
    """Check to see if a functionally equivalent request has already been sent
1113
 
 
1114
 
    Returns True if a similair request has been sent
1115
 
 
1116
 
    @param request: A CephBrokerRq object
1117
 
    """
1118
 
    states = get_request_states(request, relation=relation)
1119
 
    for rid in states.keys():
1120
 
        if not states[rid]['sent']:
1121
 
            return False
1122
 
 
1123
 
    return True
1124
 
 
1125
 
 
1126
 
def is_request_complete(request, relation='ceph'):
1127
 
    """Check to see if a functionally equivalent request has already been
1128
 
    completed
1129
 
 
1130
 
    Returns True if a similair request has been completed
1131
 
 
1132
 
    @param request: A CephBrokerRq object
1133
 
    """
1134
 
    states = get_request_states(request, relation=relation)
1135
 
    for rid in states.keys():
1136
 
        if not states[rid]['complete']:
1137
 
            return False
1138
 
 
1139
 
    return True
1140
 
 
1141
 
 
1142
 
def is_request_complete_for_rid(request, rid):
1143
 
    """Check if a given request has been completed on the given relation
1144
 
 
1145
 
    @param request: A CephBrokerRq object
1146
 
    @param rid: Relation ID
1147
 
    """
1148
 
    broker_key = get_broker_rsp_key()
1149
 
    for unit in related_units(rid):
1150
 
        rdata = relation_get(rid=rid, unit=unit)
1151
 
        if rdata.get(broker_key):
1152
 
            rsp = CephBrokerRsp(rdata.get(broker_key))
1153
 
            if rsp.request_id == request.request_id:
1154
 
                if not rsp.exit_code:
1155
 
                    return True
1156
 
        else:
1157
 
            # The remote unit sent no reply targeted at this unit so either the
1158
 
            # remote ceph cluster does not support unit targeted replies or it
1159
 
            # has not processed our request yet.
1160
 
            if rdata.get('broker_rsp'):
1161
 
                request_data = json.loads(rdata['broker_rsp'])
1162
 
                if request_data.get('request-id'):
1163
 
                    log('Ignoring legacy broker_rsp without unit key as remote '
1164
 
                        'service supports unit specific replies', level=DEBUG)
1165
 
                else:
1166
 
                    log('Using legacy broker_rsp as remote service does not '
1167
 
                        'supports unit specific replies', level=DEBUG)
1168
 
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
1169
 
                    if not rsp.exit_code:
1170
 
                        return True
1171
 
 
1172
 
    return False
1173
 
 
1174
 
 
1175
 
def get_broker_rsp_key():
1176
 
    """Return broker response key for this unit
1177
 
 
1178
 
    This is the key that ceph is going to use to pass request status
1179
 
    information back to this unit
1180
 
    """
1181
 
    return 'broker-rsp-' + local_unit().replace('/', '-')
1182
 
 
1183
 
 
1184
 
def send_request_if_needed(request, relation='ceph'):
1185
 
    """Send broker request if an equivalent request has not already been sent
1186
 
 
1187
 
    @param request: A CephBrokerRq object
1188
 
    """
1189
 
    if is_request_sent(request, relation=relation):
1190
 
        log('Request already sent but not complete, not sending new request',
1191
 
            level=DEBUG)
1192
 
    else:
1193
 
        for rid in relation_ids(relation):
1194
 
            log('Sending request {}'.format(request.request_id), level=DEBUG)
1195
 
            relation_set(relation_id=rid, broker_req=request.request)