~jjo/charms/trusty/rabbitmq-server/gnuoy-lp1355848_jjo-lp1274947

« back to all changes in this revision

Viewing changes to lib/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Charles Butler
  • Date: 2014-04-15 21:35:51 UTC
  • mfrom: (50.2.28 rabbitmq-server)
  • Revision ID: chuck@dasroot.net-20140415213551-8n4ji1f3vq2pawos
[james Page] This is a fairly major redux to use charm-helpers in the same way that other charms do; this branch also includes the ssl-everywhere work (might need a resync).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
# Copyright 2012 Canonical Ltd.
3
 
#
4
 
# This file is sourced from lp:openstack-charm-helpers
5
 
#
6
 
# Authors:
7
 
#  James Page <james.page@ubuntu.com>
8
 
#  Adam Gandelman <adamg@ubuntu.com>
9
 
#
10
 
 
11
 
import os
12
 
import shutil
13
 
import json
14
 
import time
15
 
 
16
 
from subprocess import (
17
 
    check_call,
18
 
    check_output,
19
 
    CalledProcessError
20
 
)
21
 
 
22
 
from charmhelpers.core.hookenv import (
23
 
    relation_get,
24
 
    relation_ids,
25
 
    related_units,
26
 
    log,
27
 
    INFO,
28
 
    WARNING,
29
 
    ERROR
30
 
)
31
 
 
32
 
from charmhelpers.core.host import (
33
 
    mount,
34
 
    mounts,
35
 
    service_start,
36
 
    service_stop,
37
 
    service_running,
38
 
    umount,
39
 
)
40
 
 
41
 
from charmhelpers.fetch import (
42
 
    apt_install,
43
 
)
44
 
 
45
 
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
46
 
KEYFILE = '/etc/ceph/ceph.client.{}.key'
47
 
 
48
 
CEPH_CONF = """[global]
49
 
 auth supported = {auth}
50
 
 keyring = {keyring}
51
 
 mon host = {mon_hosts}
52
 
 log to syslog = {use_syslog}
53
 
 err to syslog = {use_syslog}
54
 
 clog to syslog = {use_syslog}
55
 
"""
56
 
 
57
 
 
58
 
def install():
59
 
    ''' Basic Ceph client installation '''
60
 
    ceph_dir = "/etc/ceph"
61
 
    if not os.path.exists(ceph_dir):
62
 
        os.mkdir(ceph_dir)
63
 
    apt_install('ceph-common', fatal=True)
64
 
 
65
 
 
66
 
def rbd_exists(service, pool, rbd_img):
67
 
    ''' Check to see if a RADOS block device exists '''
68
 
    try:
69
 
        out = check_output(['rbd', 'list', '--id', service,
70
 
                            '--pool', pool])
71
 
    except CalledProcessError:
72
 
        return False
73
 
    else:
74
 
        return rbd_img in out
75
 
 
76
 
 
77
 
def create_rbd_image(service, pool, image, sizemb):
78
 
    ''' Create a new RADOS block device '''
79
 
    cmd = [
80
 
        'rbd',
81
 
        'create',
82
 
        image,
83
 
        '--size',
84
 
        str(sizemb),
85
 
        '--id',
86
 
        service,
87
 
        '--pool',
88
 
        pool
89
 
    ]
90
 
    check_call(cmd)
91
 
 
92
 
 
93
 
def pool_exists(service, name):
94
 
    ''' Check to see if a RADOS pool already exists '''
95
 
    try:
96
 
        out = check_output(['rados', '--id', service, 'lspools'])
97
 
    except CalledProcessError:
98
 
        return False
99
 
    else:
100
 
        return name in out
101
 
 
102
 
 
103
 
def get_osds(service):
104
 
    '''
105
 
    Return a list of all Ceph Object Storage Daemons
106
 
    currently in the cluster
107
 
    '''
108
 
    version = ceph_version()
109
 
    if version and version >= '0.56':
110
 
        return json.loads(check_output(['ceph', '--id', service,
111
 
                                        'osd', 'ls', '--format=json']))
112
 
    else:
113
 
        return None
114
 
 
115
 
 
116
 
def create_pool(service, name, replicas=2):
117
 
    ''' Create a new RADOS pool '''
118
 
    if pool_exists(service, name):
119
 
        log("Ceph pool {} already exists, skipping creation".format(name),
120
 
            level=WARNING)
121
 
        return
122
 
    # Calculate the number of placement groups based
123
 
    # on upstream recommended best practices.
124
 
    osds = get_osds(service)
125
 
    if osds:
126
 
        pgnum = (len(osds) * 100 / replicas)
127
 
    else:
128
 
        # NOTE(james-page): Default to 200 for older ceph versions
129
 
        # which don't support OSD query from cli
130
 
        pgnum = 200
131
 
    cmd = [
132
 
        'ceph', '--id', service,
133
 
        'osd', 'pool', 'create',
134
 
        name, str(pgnum)
135
 
    ]
136
 
    check_call(cmd)
137
 
    cmd = [
138
 
        'ceph', '--id', service,
139
 
        'osd', 'pool', 'set', name,
140
 
        'size', str(replicas)
141
 
    ]
142
 
    check_call(cmd)
143
 
 
144
 
 
145
 
def delete_pool(service, name):
146
 
    ''' Delete a RADOS pool from ceph '''
147
 
    cmd = [
148
 
        'ceph', '--id', service,
149
 
        'osd', 'pool', 'delete',
150
 
        name, '--yes-i-really-really-mean-it'
151
 
    ]
152
 
    check_call(cmd)
153
 
 
154
 
 
155
 
def _keyfile_path(service):
156
 
    return KEYFILE.format(service)
157
 
 
158
 
 
159
 
def _keyring_path(service):
160
 
    return KEYRING.format(service)
161
 
 
162
 
 
163
 
def create_keyring(service, key):
164
 
    ''' Create a new Ceph keyring containing key'''
165
 
    keyring = _keyring_path(service)
166
 
    if os.path.exists(keyring):
167
 
        log('ceph: Keyring exists at %s.' % keyring, level=WARNING)
168
 
        return
169
 
    cmd = [
170
 
        'ceph-authtool',
171
 
        keyring,
172
 
        '--create-keyring',
173
 
        '--name=client.{}'.format(service),
174
 
        '--add-key={}'.format(key)
175
 
    ]
176
 
    check_call(cmd)
177
 
    log('ceph: Created new ring at %s.' % keyring, level=INFO)
178
 
 
179
 
 
180
 
def create_key_file(service, key):
181
 
    ''' Create a file containing key '''
182
 
    keyfile = _keyfile_path(service)
183
 
    if os.path.exists(keyfile):
184
 
        log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING)
185
 
        return
186
 
    with open(keyfile, 'w') as fd:
187
 
        fd.write(key)
188
 
    log('ceph: Created new keyfile at %s.' % keyfile, level=INFO)
189
 
 
190
 
 
191
 
def get_ceph_nodes():
192
 
    ''' Query named relation 'ceph' to detemine current nodes '''
193
 
    hosts = []
194
 
    for r_id in relation_ids('ceph'):
195
 
        for unit in related_units(r_id):
196
 
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
197
 
    return hosts
198
 
 
199
 
 
200
 
def configure(service, key, auth, use_syslog):
201
 
    ''' Perform basic configuration of Ceph '''
202
 
    create_keyring(service, key)
203
 
    create_key_file(service, key)
204
 
    hosts = get_ceph_nodes()
205
 
    with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
206
 
        ceph_conf.write(CEPH_CONF.format(auth=auth,
207
 
                                         keyring=_keyring_path(service),
208
 
                                         mon_hosts=",".join(map(str, hosts)),
209
 
                                         use_syslog=use_syslog))
210
 
    modprobe('rbd')
211
 
 
212
 
 
213
 
def image_mapped(name):
214
 
    ''' Determine whether a RADOS block device is mapped locally '''
215
 
    try:
216
 
        out = check_output(['rbd', 'showmapped'])
217
 
    except CalledProcessError:
218
 
        return False
219
 
    else:
220
 
        return name in out
221
 
 
222
 
 
223
 
def map_block_storage(service, pool, image):
224
 
    ''' Map a RADOS block device for local use '''
225
 
    cmd = [
226
 
        'rbd',
227
 
        'map',
228
 
        '{}/{}'.format(pool, image),
229
 
        '--user',
230
 
        service,
231
 
        '--secret',
232
 
        _keyfile_path(service),
233
 
    ]
234
 
    check_call(cmd)
235
 
 
236
 
 
237
 
def filesystem_mounted(fs):
238
 
    ''' Determine whether a filesytems is already mounted '''
239
 
    return fs in [f for f, m in mounts()]
240
 
 
241
 
 
242
 
def make_filesystem(blk_device, fstype='ext4', timeout=10):
243
 
    ''' Make a new filesystem on the specified block device '''
244
 
    count = 0
245
 
    e_noent = os.errno.ENOENT
246
 
    while not os.path.exists(blk_device):
247
 
        if count >= timeout:
248
 
            log('ceph: gave up waiting on block device %s' % blk_device,
249
 
                level=ERROR)
250
 
            raise IOError(e_noent, os.strerror(e_noent), blk_device)
251
 
        log('ceph: waiting for block device %s to appear' % blk_device,
252
 
            level=INFO)
253
 
        count += 1
254
 
        time.sleep(1)
255
 
    else:
256
 
        log('ceph: Formatting block device %s as filesystem %s.' %
257
 
            (blk_device, fstype), level=INFO)
258
 
        check_call(['mkfs', '-t', fstype, blk_device])
259
 
 
260
 
 
261
 
def place_data_on_block_device(blk_device, data_src_dst):
262
 
    ''' Migrate data in data_src_dst to blk_device and then remount '''
263
 
    # mount block device into /mnt
264
 
    mount(blk_device, '/mnt')
265
 
    # copy data to /mnt
266
 
    copy_files(data_src_dst, '/mnt')
267
 
    # umount block device
268
 
    umount('/mnt')
269
 
    # Grab user/group ID's from original source
270
 
    _dir = os.stat(data_src_dst)
271
 
    uid = _dir.st_uid
272
 
    gid = _dir.st_gid
273
 
    # re-mount where the data should originally be
274
 
    # TODO: persist is currently a NO-OP in core.host
275
 
    mount(blk_device, data_src_dst, persist=True)
276
 
    # ensure original ownership of new mount.
277
 
    os.chown(data_src_dst, uid, gid)
278
 
 
279
 
 
280
 
# TODO: re-use
281
 
def modprobe(module):
282
 
    ''' Load a kernel module and configure for auto-load on reboot '''
283
 
    log('ceph: Loading kernel module', level=INFO)
284
 
    cmd = ['modprobe', module]
285
 
    check_call(cmd)
286
 
    with open('/etc/modules', 'r+') as modules:
287
 
        if module not in modules.read():
288
 
            modules.write(module)
289
 
 
290
 
 
291
 
def copy_files(src, dst, symlinks=False, ignore=None):
292
 
    ''' Copy files from src to dst '''
293
 
    for item in os.listdir(src):
294
 
        s = os.path.join(src, item)
295
 
        d = os.path.join(dst, item)
296
 
        if os.path.isdir(s):
297
 
            shutil.copytree(s, d, symlinks, ignore)
298
 
        else:
299
 
            shutil.copy2(s, d)
300
 
 
301
 
 
302
 
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
303
 
                        blk_device, fstype, system_services=[]):
304
 
    """
305
 
    NOTE: This function must only be called from a single service unit for
306
 
          the same rbd_img otherwise data loss will occur.
307
 
 
308
 
    Ensures given pool and RBD image exists, is mapped to a block device,
309
 
    and the device is formatted and mounted at the given mount_point.
310
 
 
311
 
    If formatting a device for the first time, data existing at mount_point
312
 
    will be migrated to the RBD device before being re-mounted.
313
 
 
314
 
    All services listed in system_services will be stopped prior to data
315
 
    migration and restarted when complete.
316
 
    """
317
 
    # Ensure pool, RBD image, RBD mappings are in place.
318
 
    if not pool_exists(service, pool):
319
 
        log('ceph: Creating new pool {}.'.format(pool))
320
 
        create_pool(service, pool)
321
 
 
322
 
    if not rbd_exists(service, pool, rbd_img):
323
 
        log('ceph: Creating RBD image ({}).'.format(rbd_img))
324
 
        create_rbd_image(service, pool, rbd_img, sizemb)
325
 
 
326
 
    if not image_mapped(rbd_img):
327
 
        log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img))
328
 
        map_block_storage(service, pool, rbd_img)
329
 
 
330
 
    # make file system
331
 
    # TODO: What happens if for whatever reason this is run again and
332
 
    # the data is already in the rbd device and/or is mounted??
333
 
    # When it is mounted already, it will fail to make the fs
334
 
    # XXX: This is really sketchy!  Need to at least add an fstab entry
335
 
    #      otherwise this hook will blow away existing data if its executed
336
 
    #      after a reboot.
337
 
    if not filesystem_mounted(mount_point):
338
 
        make_filesystem(blk_device, fstype)
339
 
 
340
 
        for svc in system_services:
341
 
            if service_running(svc):
342
 
                log('ceph: Stopping services {} prior to migrating data.'
343
 
                    .format(svc))
344
 
                service_stop(svc)
345
 
 
346
 
        place_data_on_block_device(blk_device, mount_point)
347
 
 
348
 
        for svc in system_services:
349
 
            log('ceph: Starting service {} after migrating data.'
350
 
                .format(svc))
351
 
            service_start(svc)
352
 
 
353
 
 
354
 
def ensure_ceph_keyring(service, user=None, group=None):
355
 
    '''
356
 
    Ensures a ceph keyring is created for a named service
357
 
    and optionally ensures user and group ownership.
358
 
 
359
 
    Returns False if no ceph key is available in relation state.
360
 
    '''
361
 
    key = None
362
 
    for rid in relation_ids('ceph'):
363
 
        for unit in related_units(rid):
364
 
            key = relation_get('key', rid=rid, unit=unit)
365
 
            if key:
366
 
                break
367
 
    if not key:
368
 
        return False
369
 
    create_keyring(service=service, key=key)
370
 
    keyring = _keyring_path(service)
371
 
    if user and group:
372
 
        check_call(['chown', '%s.%s' % (user, group), keyring])
373
 
    return True
374
 
 
375
 
 
376
 
def ceph_version():
377
 
    ''' Retrieve the local version of ceph '''
378
 
    if os.path.exists('/usr/bin/ceph'):
379
 
        cmd = ['ceph', '-v']
380
 
        output = check_output(cmd)
381
 
        output = output.split()
382
 
        if len(output) > 3:
383
 
            return output[2]
384
 
        else:
385
 
            return None
386
 
    else:
387
 
        return None