~joetalbott/charms/trusty/snappy-proposed-image-builder/create_home

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: Joe Talbott
  • Date: 2015-05-20 16:55:29 UTC
  • Revision ID: joe.talbott@canonical.com-20150520165529-o7lnhb1k13xa1fdj
Initial copy of core-image-publisher with name changes

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2014-2015 Canonical Limited.
 
2
#
 
3
# This file is part of charm-helpers.
 
4
#
 
5
# charm-helpers is free software: you can redistribute it and/or modify
 
6
# it under the terms of the GNU Lesser General Public License version 3 as
 
7
# published by the Free Software Foundation.
 
8
#
 
9
# charm-helpers is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU Lesser General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU Lesser General Public License
 
15
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
 
16
 
 
17
#
 
18
# Copyright 2012 Canonical Ltd.
 
19
#
 
20
# This file is sourced from lp:openstack-charm-helpers
 
21
#
 
22
# Authors:
 
23
#  James Page <james.page@ubuntu.com>
 
24
#  Adam Gandelman <adamg@ubuntu.com>
 
25
#
 
26
 
 
27
import os
 
28
import shutil
 
29
import json
 
30
import time
 
31
 
 
32
from subprocess import (
 
33
    check_call,
 
34
    check_output,
 
35
    CalledProcessError,
 
36
)
 
37
from charmhelpers.core.hookenv import (
 
38
    relation_get,
 
39
    relation_ids,
 
40
    related_units,
 
41
    log,
 
42
    DEBUG,
 
43
    INFO,
 
44
    WARNING,
 
45
    ERROR,
 
46
)
 
47
from charmhelpers.core.host import (
 
48
    mount,
 
49
    mounts,
 
50
    service_start,
 
51
    service_stop,
 
52
    service_running,
 
53
    umount,
 
54
)
 
55
from charmhelpers.fetch import (
 
56
    apt_install,
 
57
)
 
58
 
 
59
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
 
60
KEYFILE = '/etc/ceph/ceph.client.{}.key'
 
61
 
 
62
CEPH_CONF = """[global]
 
63
 auth supported = {auth}
 
64
 keyring = {keyring}
 
65
 mon host = {mon_hosts}
 
66
 log to syslog = {use_syslog}
 
67
 err to syslog = {use_syslog}
 
68
 clog to syslog = {use_syslog}
 
69
"""
 
70
 
 
71
 
 
72
def install():
 
73
    """Basic Ceph client installation."""
 
74
    ceph_dir = "/etc/ceph"
 
75
    if not os.path.exists(ceph_dir):
 
76
        os.mkdir(ceph_dir)
 
77
 
 
78
    apt_install('ceph-common', fatal=True)
 
79
 
 
80
 
 
81
def rbd_exists(service, pool, rbd_img):
 
82
    """Check to see if a RADOS block device exists."""
 
83
    try:
 
84
        out = check_output(['rbd', 'list', '--id',
 
85
                            service, '--pool', pool]).decode('UTF-8')
 
86
    except CalledProcessError:
 
87
        return False
 
88
 
 
89
    return rbd_img in out
 
90
 
 
91
 
 
92
def create_rbd_image(service, pool, image, sizemb):
 
93
    """Create a new RADOS block device."""
 
94
    cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
 
95
           '--pool', pool]
 
96
    check_call(cmd)
 
97
 
 
98
 
 
99
def pool_exists(service, name):
 
100
    """Check to see if a RADOS pool already exists."""
 
101
    try:
 
102
        out = check_output(['rados', '--id', service,
 
103
                            'lspools']).decode('UTF-8')
 
104
    except CalledProcessError:
 
105
        return False
 
106
 
 
107
    return name in out
 
108
 
 
109
 
 
110
def get_osds(service):
 
111
    """Return a list of all Ceph Object Storage Daemons currently in the
 
112
    cluster.
 
113
    """
 
114
    version = ceph_version()
 
115
    if version and version >= '0.56':
 
116
        return json.loads(check_output(['ceph', '--id', service,
 
117
                                        'osd', 'ls',
 
118
                                        '--format=json']).decode('UTF-8'))
 
119
 
 
120
    return None
 
121
 
 
122
 
 
123
def create_pool(service, name, replicas=3):
 
124
    """Create a new RADOS pool."""
 
125
    if pool_exists(service, name):
 
126
        log("Ceph pool {} already exists, skipping creation".format(name),
 
127
            level=WARNING)
 
128
        return
 
129
 
 
130
    # Calculate the number of placement groups based
 
131
    # on upstream recommended best practices.
 
132
    osds = get_osds(service)
 
133
    if osds:
 
134
        pgnum = (len(osds) * 100 // replicas)
 
135
    else:
 
136
        # NOTE(james-page): Default to 200 for older ceph versions
 
137
        # which don't support OSD query from cli
 
138
        pgnum = 200
 
139
 
 
140
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
 
141
    check_call(cmd)
 
142
 
 
143
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
 
144
           str(replicas)]
 
145
    check_call(cmd)
 
146
 
 
147
 
 
148
def delete_pool(service, name):
 
149
    """Delete a RADOS pool from ceph."""
 
150
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
 
151
           '--yes-i-really-really-mean-it']
 
152
    check_call(cmd)
 
153
 
 
154
 
 
155
def _keyfile_path(service):
 
156
    return KEYFILE.format(service)
 
157
 
 
158
 
 
159
def _keyring_path(service):
 
160
    return KEYRING.format(service)
 
161
 
 
162
 
 
163
def create_keyring(service, key):
 
164
    """Create a new Ceph keyring containing key."""
 
165
    keyring = _keyring_path(service)
 
166
    if os.path.exists(keyring):
 
167
        log('Ceph keyring exists at %s.' % keyring, level=WARNING)
 
168
        return
 
169
 
 
170
    cmd = ['ceph-authtool', keyring, '--create-keyring',
 
171
           '--name=client.{}'.format(service), '--add-key={}'.format(key)]
 
172
    check_call(cmd)
 
173
    log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
 
174
 
 
175
 
 
176
def delete_keyring(service):
 
177
    """Delete an existing Ceph keyring."""
 
178
    keyring = _keyring_path(service)
 
179
    if not os.path.exists(keyring):
 
180
        log('Keyring does not exist at %s' % keyring, level=WARNING)
 
181
        return
 
182
 
 
183
    os.remove(keyring)
 
184
    log('Deleted ring at %s.' % keyring, level=INFO)
 
185
 
 
186
 
 
187
def create_key_file(service, key):
 
188
    """Create a file containing key."""
 
189
    keyfile = _keyfile_path(service)
 
190
    if os.path.exists(keyfile):
 
191
        log('Keyfile exists at %s.' % keyfile, level=WARNING)
 
192
        return
 
193
 
 
194
    with open(keyfile, 'w') as fd:
 
195
        fd.write(key)
 
196
 
 
197
    log('Created new keyfile at %s.' % keyfile, level=INFO)
 
198
 
 
199
 
 
200
def get_ceph_nodes():
 
201
    """Query named relation 'ceph' to determine current nodes."""
 
202
    hosts = []
 
203
    for r_id in relation_ids('ceph'):
 
204
        for unit in related_units(r_id):
 
205
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
 
206
 
 
207
    return hosts
 
208
 
 
209
 
 
210
def configure(service, key, auth, use_syslog):
 
211
    """Perform basic configuration of Ceph."""
 
212
    create_keyring(service, key)
 
213
    create_key_file(service, key)
 
214
    hosts = get_ceph_nodes()
 
215
    with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
 
216
        ceph_conf.write(CEPH_CONF.format(auth=auth,
 
217
                                         keyring=_keyring_path(service),
 
218
                                         mon_hosts=",".join(map(str, hosts)),
 
219
                                         use_syslog=use_syslog))
 
220
    modprobe('rbd')
 
221
 
 
222
 
 
223
def image_mapped(name):
 
224
    """Determine whether a RADOS block device is mapped locally."""
 
225
    try:
 
226
        out = check_output(['rbd', 'showmapped']).decode('UTF-8')
 
227
    except CalledProcessError:
 
228
        return False
 
229
 
 
230
    return name in out
 
231
 
 
232
 
 
233
def map_block_storage(service, pool, image):
 
234
    """Map a RADOS block device for local use."""
 
235
    cmd = [
 
236
        'rbd',
 
237
        'map',
 
238
        '{}/{}'.format(pool, image),
 
239
        '--user',
 
240
        service,
 
241
        '--secret',
 
242
        _keyfile_path(service),
 
243
    ]
 
244
    check_call(cmd)
 
245
 
 
246
 
 
247
def filesystem_mounted(fs):
 
248
    """Determine whether a filesytems is already mounted."""
 
249
    return fs in [f for f, m in mounts()]
 
250
 
 
251
 
 
252
def make_filesystem(blk_device, fstype='ext4', timeout=10):
 
253
    """Make a new filesystem on the specified block device."""
 
254
    count = 0
 
255
    e_noent = os.errno.ENOENT
 
256
    while not os.path.exists(blk_device):
 
257
        if count >= timeout:
 
258
            log('Gave up waiting on block device %s' % blk_device,
 
259
                level=ERROR)
 
260
            raise IOError(e_noent, os.strerror(e_noent), blk_device)
 
261
 
 
262
        log('Waiting for block device %s to appear' % blk_device,
 
263
            level=DEBUG)
 
264
        count += 1
 
265
        time.sleep(1)
 
266
    else:
 
267
        log('Formatting block device %s as filesystem %s.' %
 
268
            (blk_device, fstype), level=INFO)
 
269
        check_call(['mkfs', '-t', fstype, blk_device])
 
270
 
 
271
 
 
272
def place_data_on_block_device(blk_device, data_src_dst):
 
273
    """Migrate data in data_src_dst to blk_device and then remount."""
 
274
    # mount block device into /mnt
 
275
    mount(blk_device, '/mnt')
 
276
    # copy data to /mnt
 
277
    copy_files(data_src_dst, '/mnt')
 
278
    # umount block device
 
279
    umount('/mnt')
 
280
    # Grab user/group ID's from original source
 
281
    _dir = os.stat(data_src_dst)
 
282
    uid = _dir.st_uid
 
283
    gid = _dir.st_gid
 
284
    # re-mount where the data should originally be
 
285
    # TODO: persist is currently a NO-OP in core.host
 
286
    mount(blk_device, data_src_dst, persist=True)
 
287
    # ensure original ownership of new mount.
 
288
    os.chown(data_src_dst, uid, gid)
 
289
 
 
290
 
 
291
# TODO: re-use
 
292
def modprobe(module):
 
293
    """Load a kernel module and configure for auto-load on reboot."""
 
294
    log('Loading kernel module', level=INFO)
 
295
    cmd = ['modprobe', module]
 
296
    check_call(cmd)
 
297
    with open('/etc/modules', 'r+') as modules:
 
298
        if module not in modules.read():
 
299
            modules.write(module)
 
300
 
 
301
 
 
302
def copy_files(src, dst, symlinks=False, ignore=None):
 
303
    """Copy files from src to dst."""
 
304
    for item in os.listdir(src):
 
305
        s = os.path.join(src, item)
 
306
        d = os.path.join(dst, item)
 
307
        if os.path.isdir(s):
 
308
            shutil.copytree(s, d, symlinks, ignore)
 
309
        else:
 
310
            shutil.copy2(s, d)
 
311
 
 
312
 
 
313
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
 
314
                        blk_device, fstype, system_services=[],
 
315
                        replicas=3):
 
316
    """NOTE: This function must only be called from a single service unit for
 
317
    the same rbd_img otherwise data loss will occur.
 
318
 
 
319
    Ensures given pool and RBD image exists, is mapped to a block device,
 
320
    and the device is formatted and mounted at the given mount_point.
 
321
 
 
322
    If formatting a device for the first time, data existing at mount_point
 
323
    will be migrated to the RBD device before being re-mounted.
 
324
 
 
325
    All services listed in system_services will be stopped prior to data
 
326
    migration and restarted when complete.
 
327
    """
 
328
    # Ensure pool, RBD image, RBD mappings are in place.
 
329
    if not pool_exists(service, pool):
 
330
        log('Creating new pool {}.'.format(pool), level=INFO)
 
331
        create_pool(service, pool, replicas=replicas)
 
332
 
 
333
    if not rbd_exists(service, pool, rbd_img):
 
334
        log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
 
335
        create_rbd_image(service, pool, rbd_img, sizemb)
 
336
 
 
337
    if not image_mapped(rbd_img):
 
338
        log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
 
339
            level=INFO)
 
340
        map_block_storage(service, pool, rbd_img)
 
341
 
 
342
    # make file system
 
343
    # TODO: What happens if for whatever reason this is run again and
 
344
    # the data is already in the rbd device and/or is mounted??
 
345
    # When it is mounted already, it will fail to make the fs
 
346
    # XXX: This is really sketchy!  Need to at least add an fstab entry
 
347
    #      otherwise this hook will blow away existing data if its executed
 
348
    #      after a reboot.
 
349
    if not filesystem_mounted(mount_point):
 
350
        make_filesystem(blk_device, fstype)
 
351
 
 
352
        for svc in system_services:
 
353
            if service_running(svc):
 
354
                log('Stopping services {} prior to migrating data.'
 
355
                    .format(svc), level=DEBUG)
 
356
                service_stop(svc)
 
357
 
 
358
        place_data_on_block_device(blk_device, mount_point)
 
359
 
 
360
        for svc in system_services:
 
361
            log('Starting service {} after migrating data.'
 
362
                .format(svc), level=DEBUG)
 
363
            service_start(svc)
 
364
 
 
365
 
 
366
def ensure_ceph_keyring(service, user=None, group=None):
 
367
    """Ensures a ceph keyring is created for a named service and optionally
 
368
    ensures user and group ownership.
 
369
 
 
370
    Returns False if no ceph key is available in relation state.
 
371
    """
 
372
    key = None
 
373
    for rid in relation_ids('ceph'):
 
374
        for unit in related_units(rid):
 
375
            key = relation_get('key', rid=rid, unit=unit)
 
376
            if key:
 
377
                break
 
378
 
 
379
    if not key:
 
380
        return False
 
381
 
 
382
    create_keyring(service=service, key=key)
 
383
    keyring = _keyring_path(service)
 
384
    if user and group:
 
385
        check_call(['chown', '%s.%s' % (user, group), keyring])
 
386
 
 
387
    return True
 
388
 
 
389
 
 
390
def ceph_version():
 
391
    """Retrieve the local version of ceph."""
 
392
    if os.path.exists('/usr/bin/ceph'):
 
393
        cmd = ['ceph', '-v']
 
394
        output = check_output(cmd).decode('US-ASCII')
 
395
        output = output.split()
 
396
        if len(output) > 3:
 
397
            return output[2]
 
398
        else:
 
399
            return None
 
400
    else:
 
401
        return None
 
402
 
 
403
 
 
404
class CephBrokerRq(object):
 
405
    """Ceph broker request.
 
406
 
 
407
    Multiple operations can be added to a request and sent to the Ceph broker
 
408
    to be executed.
 
409
 
 
410
    Request is json-encoded for sending over the wire.
 
411
 
 
412
    The API is versioned and defaults to version 1.
 
413
    """
 
414
    def __init__(self, api_version=1):
 
415
        self.api_version = api_version
 
416
        self.ops = []
 
417
 
 
418
    def add_op_create_pool(self, name, replica_count=3):
 
419
        self.ops.append({'op': 'create-pool', 'name': name,
 
420
                         'replicas': replica_count})
 
421
 
 
422
    @property
 
423
    def request(self):
 
424
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
425
 
 
426
 
 
427
class CephBrokerRsp(object):
 
428
    """Ceph broker response.
 
429
 
 
430
    Response is json-decoded and contents provided as methods/properties.
 
431
 
 
432
    The API is versioned and defaults to version 1.
 
433
    """
 
434
    def __init__(self, encoded_rsp):
 
435
        self.api_version = None
 
436
        self.rsp = json.loads(encoded_rsp)
 
437
 
 
438
    @property
 
439
    def exit_code(self):
 
440
        return self.rsp.get('exit-code')
 
441
 
 
442
    @property
 
443
    def exit_msg(self):
 
444
        return self.rsp.get('stderr')