~lazypower/charms/trusty/keystone/fix_proof

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Ante Karamatic
  • Date: 2014-02-25 11:34:13 UTC
  • Revision ID: ivoks@ubuntu.com-20140225113413-tlm02x1ibc6xb10d
Rewrite charm to get it more in line with other OpenStack charms.

Added support for contexts and templating. Makes use of charm-helpers
instead of relaying on its own tools (probably could use some additional work).

HA is currently non-functional. ETA for fixing: less than 2 days.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Copyright 2012 Canonical Ltd.
 
3
#
 
4
# This file is sourced from lp:openstack-charm-helpers
 
5
#
 
6
# Authors:
 
7
#  James Page <james.page@ubuntu.com>
 
8
#  Adam Gandelman <adamg@ubuntu.com>
 
9
#
 
10
 
 
11
import os
 
12
import shutil
 
13
import json
 
14
import time
 
15
 
 
16
from subprocess import (
 
17
    check_call,
 
18
    check_output,
 
19
    CalledProcessError
 
20
)
 
21
 
 
22
from charmhelpers.core.hookenv import (
 
23
    relation_get,
 
24
    relation_ids,
 
25
    related_units,
 
26
    log,
 
27
    INFO,
 
28
    WARNING,
 
29
    ERROR
 
30
)
 
31
 
 
32
from charmhelpers.core.host import (
 
33
    mount,
 
34
    mounts,
 
35
    service_start,
 
36
    service_stop,
 
37
    service_running,
 
38
    umount,
 
39
)
 
40
 
 
41
from charmhelpers.fetch import (
 
42
    apt_install,
 
43
)
 
44
 
 
45
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
 
46
KEYFILE = '/etc/ceph/ceph.client.{}.key'
 
47
 
 
48
CEPH_CONF = """[global]
 
49
 auth supported = {auth}
 
50
 keyring = {keyring}
 
51
 mon host = {mon_hosts}
 
52
 log to syslog = {use_syslog}
 
53
 err to syslog = {use_syslog}
 
54
 clog to syslog = {use_syslog}
 
55
"""
 
56
 
 
57
 
 
58
def install():
 
59
    ''' Basic Ceph client installation '''
 
60
    ceph_dir = "/etc/ceph"
 
61
    if not os.path.exists(ceph_dir):
 
62
        os.mkdir(ceph_dir)
 
63
    apt_install('ceph-common', fatal=True)
 
64
 
 
65
 
 
66
def rbd_exists(service, pool, rbd_img):
 
67
    ''' Check to see if a RADOS block device exists '''
 
68
    try:
 
69
        out = check_output(['rbd', 'list', '--id', service,
 
70
                            '--pool', pool])
 
71
    except CalledProcessError:
 
72
        return False
 
73
    else:
 
74
        return rbd_img in out
 
75
 
 
76
 
 
77
def create_rbd_image(service, pool, image, sizemb):
 
78
    ''' Create a new RADOS block device '''
 
79
    cmd = [
 
80
        'rbd',
 
81
        'create',
 
82
        image,
 
83
        '--size',
 
84
        str(sizemb),
 
85
        '--id',
 
86
        service,
 
87
        '--pool',
 
88
        pool
 
89
    ]
 
90
    check_call(cmd)
 
91
 
 
92
 
 
93
def pool_exists(service, name):
 
94
    ''' Check to see if a RADOS pool already exists '''
 
95
    try:
 
96
        out = check_output(['rados', '--id', service, 'lspools'])
 
97
    except CalledProcessError:
 
98
        return False
 
99
    else:
 
100
        return name in out
 
101
 
 
102
 
 
103
def get_osds(service):
 
104
    '''
 
105
    Return a list of all Ceph Object Storage Daemons
 
106
    currently in the cluster
 
107
    '''
 
108
    version = ceph_version()
 
109
    if version and version >= '0.56':
 
110
        return json.loads(check_output(['ceph', '--id', service,
 
111
                                        'osd', 'ls', '--format=json']))
 
112
    else:
 
113
        return None
 
114
 
 
115
 
 
116
def create_pool(service, name, replicas=2):
 
117
    ''' Create a new RADOS pool '''
 
118
    if pool_exists(service, name):
 
119
        log("Ceph pool {} already exists, skipping creation".format(name),
 
120
            level=WARNING)
 
121
        return
 
122
    # Calculate the number of placement groups based
 
123
    # on upstream recommended best practices.
 
124
    osds = get_osds(service)
 
125
    if osds:
 
126
        pgnum = (len(osds) * 100 / replicas)
 
127
    else:
 
128
        # NOTE(james-page): Default to 200 for older ceph versions
 
129
        # which don't support OSD query from cli
 
130
        pgnum = 200
 
131
    cmd = [
 
132
        'ceph', '--id', service,
 
133
        'osd', 'pool', 'create',
 
134
        name, str(pgnum)
 
135
    ]
 
136
    check_call(cmd)
 
137
    cmd = [
 
138
        'ceph', '--id', service,
 
139
        'osd', 'pool', 'set', name,
 
140
        'size', str(replicas)
 
141
    ]
 
142
    check_call(cmd)
 
143
 
 
144
 
 
145
def delete_pool(service, name):
 
146
    ''' Delete a RADOS pool from ceph '''
 
147
    cmd = [
 
148
        'ceph', '--id', service,
 
149
        'osd', 'pool', 'delete',
 
150
        name, '--yes-i-really-really-mean-it'
 
151
    ]
 
152
    check_call(cmd)
 
153
 
 
154
 
 
155
def _keyfile_path(service):
 
156
    return KEYFILE.format(service)
 
157
 
 
158
 
 
159
def _keyring_path(service):
 
160
    return KEYRING.format(service)
 
161
 
 
162
 
 
163
def create_keyring(service, key):
 
164
    ''' Create a new Ceph keyring containing key'''
 
165
    keyring = _keyring_path(service)
 
166
    if os.path.exists(keyring):
 
167
        log('ceph: Keyring exists at %s.' % keyring, level=WARNING)
 
168
        return
 
169
    cmd = [
 
170
        'ceph-authtool',
 
171
        keyring,
 
172
        '--create-keyring',
 
173
        '--name=client.{}'.format(service),
 
174
        '--add-key={}'.format(key)
 
175
    ]
 
176
    check_call(cmd)
 
177
    log('ceph: Created new ring at %s.' % keyring, level=INFO)
 
178
 
 
179
 
 
180
def create_key_file(service, key):
 
181
    ''' Create a file containing key '''
 
182
    keyfile = _keyfile_path(service)
 
183
    if os.path.exists(keyfile):
 
184
        log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING)
 
185
        return
 
186
    with open(keyfile, 'w') as fd:
 
187
        fd.write(key)
 
188
    log('ceph: Created new keyfile at %s.' % keyfile, level=INFO)
 
189
 
 
190
 
 
191
def get_ceph_nodes():
 
192
    ''' Query named relation 'ceph' to detemine current nodes '''
 
193
    hosts = []
 
194
    for r_id in relation_ids('ceph'):
 
195
        for unit in related_units(r_id):
 
196
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
 
197
    return hosts
 
198
 
 
199
 
 
200
def configure(service, key, auth, use_syslog):
 
201
    ''' Perform basic configuration of Ceph '''
 
202
    create_keyring(service, key)
 
203
    create_key_file(service, key)
 
204
    hosts = get_ceph_nodes()
 
205
    with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
 
206
        ceph_conf.write(CEPH_CONF.format(auth=auth,
 
207
                                         keyring=_keyring_path(service),
 
208
                                         mon_hosts=",".join(map(str, hosts)),
 
209
                                         use_syslog=use_syslog))
 
210
    modprobe('rbd')
 
211
 
 
212
 
 
213
def image_mapped(name):
 
214
    ''' Determine whether a RADOS block device is mapped locally '''
 
215
    try:
 
216
        out = check_output(['rbd', 'showmapped'])
 
217
    except CalledProcessError:
 
218
        return False
 
219
    else:
 
220
        return name in out
 
221
 
 
222
 
 
223
def map_block_storage(service, pool, image):
 
224
    ''' Map a RADOS block device for local use '''
 
225
    cmd = [
 
226
        'rbd',
 
227
        'map',
 
228
        '{}/{}'.format(pool, image),
 
229
        '--user',
 
230
        service,
 
231
        '--secret',
 
232
        _keyfile_path(service),
 
233
    ]
 
234
    check_call(cmd)
 
235
 
 
236
 
 
237
def filesystem_mounted(fs):
 
238
    ''' Determine whether a filesytems is already mounted '''
 
239
    return fs in [f for f, m in mounts()]
 
240
 
 
241
 
 
242
def make_filesystem(blk_device, fstype='ext4', timeout=10):
 
243
    ''' Make a new filesystem on the specified block device '''
 
244
    count = 0
 
245
    e_noent = os.errno.ENOENT
 
246
    while not os.path.exists(blk_device):
 
247
        if count >= timeout:
 
248
            log('ceph: gave up waiting on block device %s' % blk_device,
 
249
                level=ERROR)
 
250
            raise IOError(e_noent, os.strerror(e_noent), blk_device)
 
251
        log('ceph: waiting for block device %s to appear' % blk_device,
 
252
            level=INFO)
 
253
        count += 1
 
254
        time.sleep(1)
 
255
    else:
 
256
        log('ceph: Formatting block device %s as filesystem %s.' %
 
257
            (blk_device, fstype), level=INFO)
 
258
        check_call(['mkfs', '-t', fstype, blk_device])
 
259
 
 
260
 
 
261
def place_data_on_block_device(blk_device, data_src_dst):
 
262
    ''' Migrate data in data_src_dst to blk_device and then remount '''
 
263
    # mount block device into /mnt
 
264
    mount(blk_device, '/mnt')
 
265
    # copy data to /mnt
 
266
    copy_files(data_src_dst, '/mnt')
 
267
    # umount block device
 
268
    umount('/mnt')
 
269
    # Grab user/group ID's from original source
 
270
    _dir = os.stat(data_src_dst)
 
271
    uid = _dir.st_uid
 
272
    gid = _dir.st_gid
 
273
    # re-mount where the data should originally be
 
274
    # TODO: persist is currently a NO-OP in core.host
 
275
    mount(blk_device, data_src_dst, persist=True)
 
276
    # ensure original ownership of new mount.
 
277
    os.chown(data_src_dst, uid, gid)
 
278
 
 
279
 
 
280
# TODO: re-use
 
281
def modprobe(module):
 
282
    ''' Load a kernel module and configure for auto-load on reboot '''
 
283
    log('ceph: Loading kernel module', level=INFO)
 
284
    cmd = ['modprobe', module]
 
285
    check_call(cmd)
 
286
    with open('/etc/modules', 'r+') as modules:
 
287
        if module not in modules.read():
 
288
            modules.write(module)
 
289
 
 
290
 
 
291
def copy_files(src, dst, symlinks=False, ignore=None):
 
292
    ''' Copy files from src to dst '''
 
293
    for item in os.listdir(src):
 
294
        s = os.path.join(src, item)
 
295
        d = os.path.join(dst, item)
 
296
        if os.path.isdir(s):
 
297
            shutil.copytree(s, d, symlinks, ignore)
 
298
        else:
 
299
            shutil.copy2(s, d)
 
300
 
 
301
 
 
302
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
 
303
                        blk_device, fstype, system_services=[]):
 
304
    """
 
305
    NOTE: This function must only be called from a single service unit for
 
306
          the same rbd_img otherwise data loss will occur.
 
307
 
 
308
    Ensures given pool and RBD image exists, is mapped to a block device,
 
309
    and the device is formatted and mounted at the given mount_point.
 
310
 
 
311
    If formatting a device for the first time, data existing at mount_point
 
312
    will be migrated to the RBD device before being re-mounted.
 
313
 
 
314
    All services listed in system_services will be stopped prior to data
 
315
    migration and restarted when complete.
 
316
    """
 
317
    # Ensure pool, RBD image, RBD mappings are in place.
 
318
    if not pool_exists(service, pool):
 
319
        log('ceph: Creating new pool {}.'.format(pool))
 
320
        create_pool(service, pool)
 
321
 
 
322
    if not rbd_exists(service, pool, rbd_img):
 
323
        log('ceph: Creating RBD image ({}).'.format(rbd_img))
 
324
        create_rbd_image(service, pool, rbd_img, sizemb)
 
325
 
 
326
    if not image_mapped(rbd_img):
 
327
        log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img))
 
328
        map_block_storage(service, pool, rbd_img)
 
329
 
 
330
    # make file system
 
331
    # TODO: What happens if for whatever reason this is run again and
 
332
    # the data is already in the rbd device and/or is mounted??
 
333
    # When it is mounted already, it will fail to make the fs
 
334
    # XXX: This is really sketchy!  Need to at least add an fstab entry
 
335
    #      otherwise this hook will blow away existing data if its executed
 
336
    #      after a reboot.
 
337
    if not filesystem_mounted(mount_point):
 
338
        make_filesystem(blk_device, fstype)
 
339
 
 
340
        for svc in system_services:
 
341
            if service_running(svc):
 
342
                log('ceph: Stopping services {} prior to migrating data.'
 
343
                    .format(svc))
 
344
                service_stop(svc)
 
345
 
 
346
        place_data_on_block_device(blk_device, mount_point)
 
347
 
 
348
        for svc in system_services:
 
349
            log('ceph: Starting service {} after migrating data.'
 
350
                .format(svc))
 
351
            service_start(svc)
 
352
 
 
353
 
 
354
def ensure_ceph_keyring(service, user=None, group=None):
 
355
    '''
 
356
    Ensures a ceph keyring is created for a named service
 
357
    and optionally ensures user and group ownership.
 
358
 
 
359
    Returns False if no ceph key is available in relation state.
 
360
    '''
 
361
    key = None
 
362
    for rid in relation_ids('ceph'):
 
363
        for unit in related_units(rid):
 
364
            key = relation_get('key', rid=rid, unit=unit)
 
365
            if key:
 
366
                break
 
367
    if not key:
 
368
        return False
 
369
    create_keyring(service=service, key=key)
 
370
    keyring = _keyring_path(service)
 
371
    if user and group:
 
372
        check_call(['chown', '%s.%s' % (user, group), keyring])
 
373
    return True
 
374
 
 
375
 
 
376
def ceph_version():
 
377
    ''' Retrieve the local version of ceph '''
 
378
    if os.path.exists('/usr/bin/ceph'):
 
379
        cmd = ['ceph', '-v']
 
380
        output = check_output(cmd)
 
381
        output = output.split()
 
382
        if len(output) > 3:
 
383
            return output[2]
 
384
        else:
 
385
            return None
 
386
    else:
 
387
        return None