~corey.bryant/charms/trusty/cinder/git

« back to all changes in this revision

Viewing changes to hooks/charmhelpers/contrib/storage/linux/ceph.py

  • Committer: james.page at ubuntu
  • Date: 2015-01-13 14:36:44 UTC
  • mfrom: (68 cinder)
  • mto: This revision was merged to the branch mainline in revision 69.
  • Revision ID: james.page@ubuntu.com-20150113143644-docfl0wwq5gf4svy
Rebase on next

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
from subprocess import (
17
17
    check_call,
18
18
    check_output,
19
 
    CalledProcessError
 
19
    CalledProcessError,
20
20
)
21
 
 
22
21
from charmhelpers.core.hookenv import (
23
22
    relation_get,
24
23
    relation_ids,
25
24
    related_units,
26
25
    log,
 
26
    DEBUG,
27
27
    INFO,
28
28
    WARNING,
29
 
    ERROR
 
29
    ERROR,
30
30
)
31
 
 
32
31
from charmhelpers.core.host import (
33
32
    mount,
34
33
    mounts,
37
36
    service_running,
38
37
    umount,
39
38
)
40
 
 
41
39
from charmhelpers.fetch import (
42
40
    apt_install,
43
41
)
56
54
 
57
55
 
58
56
def install():
59
 
    ''' Basic Ceph client installation '''
 
57
    """Basic Ceph client installation."""
60
58
    ceph_dir = "/etc/ceph"
61
59
    if not os.path.exists(ceph_dir):
62
60
        os.mkdir(ceph_dir)
 
61
 
63
62
    apt_install('ceph-common', fatal=True)
64
63
 
65
64
 
66
65
def rbd_exists(service, pool, rbd_img):
67
 
    ''' Check to see if a RADOS block device exists '''
 
66
    """Check to see if a RADOS block device exists."""
68
67
    try:
69
 
        out = check_output(['rbd', 'list', '--id', service,
70
 
                            '--pool', pool])
 
68
        out = check_output(['rbd', 'list', '--id',
 
69
                            service, '--pool', pool]).decode('UTF-8')
71
70
    except CalledProcessError:
72
71
        return False
73
 
    else:
74
 
        return rbd_img in out
 
72
 
 
73
    return rbd_img in out
75
74
 
76
75
 
77
76
def create_rbd_image(service, pool, image, sizemb):
78
 
    ''' Create a new RADOS block device '''
79
 
    cmd = [
80
 
        'rbd',
81
 
        'create',
82
 
        image,
83
 
        '--size',
84
 
        str(sizemb),
85
 
        '--id',
86
 
        service,
87
 
        '--pool',
88
 
        pool
89
 
    ]
 
77
    """Create a new RADOS block device."""
 
78
    cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
 
79
           '--pool', pool]
90
80
    check_call(cmd)
91
81
 
92
82
 
93
83
def pool_exists(service, name):
94
 
    ''' Check to see if a RADOS pool already exists '''
 
84
    """Check to see if a RADOS pool already exists."""
95
85
    try:
96
 
        out = check_output(['rados', '--id', service, 'lspools'])
 
86
        out = check_output(['rados', '--id', service,
 
87
                            'lspools']).decode('UTF-8')
97
88
    except CalledProcessError:
98
89
        return False
99
 
    else:
100
 
        return name in out
 
90
 
 
91
    return name in out
101
92
 
102
93
 
103
94
def get_osds(service):
104
 
    '''
105
 
    Return a list of all Ceph Object Storage Daemons
106
 
    currently in the cluster
107
 
    '''
 
95
    """Return a list of all Ceph Object Storage Daemons currently in the
 
96
    cluster.
 
97
    """
108
98
    version = ceph_version()
109
99
    if version and version >= '0.56':
110
100
        return json.loads(check_output(['ceph', '--id', service,
111
 
                                        'osd', 'ls', '--format=json']))
112
 
    else:
113
 
        return None
 
101
                                        'osd', 'ls',
 
102
                                        '--format=json']).decode('UTF-8'))
 
103
 
 
104
    return None
114
105
 
115
106
 
116
107
def create_pool(service, name, replicas=3):
117
 
    ''' Create a new RADOS pool '''
 
108
    """Create a new RADOS pool."""
118
109
    if pool_exists(service, name):
119
110
        log("Ceph pool {} already exists, skipping creation".format(name),
120
111
            level=WARNING)
121
112
        return
 
113
 
122
114
    # Calculate the number of placement groups based
123
115
    # on upstream recommended best practices.
124
116
    osds = get_osds(service)
125
117
    if osds:
126
 
        pgnum = (len(osds) * 100 / replicas)
 
118
        pgnum = (len(osds) * 100 // replicas)
127
119
    else:
128
120
        # NOTE(james-page): Default to 200 for older ceph versions
129
121
        # which don't support OSD query from cli
130
122
        pgnum = 200
131
 
    cmd = [
132
 
        'ceph', '--id', service,
133
 
        'osd', 'pool', 'create',
134
 
        name, str(pgnum)
135
 
    ]
 
123
 
 
124
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
136
125
    check_call(cmd)
137
 
    cmd = [
138
 
        'ceph', '--id', service,
139
 
        'osd', 'pool', 'set', name,
140
 
        'size', str(replicas)
141
 
    ]
 
126
 
 
127
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
 
128
           str(replicas)]
142
129
    check_call(cmd)
143
130
 
144
131
 
145
132
def delete_pool(service, name):
146
 
    ''' Delete a RADOS pool from ceph '''
147
 
    cmd = [
148
 
        'ceph', '--id', service,
149
 
        'osd', 'pool', 'delete',
150
 
        name, '--yes-i-really-really-mean-it'
151
 
    ]
 
133
    """Delete a RADOS pool from ceph."""
 
134
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
 
135
           '--yes-i-really-really-mean-it']
152
136
    check_call(cmd)
153
137
 
154
138
 
161
145
 
162
146
 
163
147
def create_keyring(service, key):
164
 
    ''' Create a new Ceph keyring containing key'''
 
148
    """Create a new Ceph keyring containing key."""
165
149
    keyring = _keyring_path(service)
166
150
    if os.path.exists(keyring):
167
 
        log('ceph: Keyring exists at %s.' % keyring, level=WARNING)
 
151
        log('Ceph keyring exists at %s.' % keyring, level=WARNING)
168
152
        return
169
 
    cmd = [
170
 
        'ceph-authtool',
171
 
        keyring,
172
 
        '--create-keyring',
173
 
        '--name=client.{}'.format(service),
174
 
        '--add-key={}'.format(key)
175
 
    ]
 
153
 
 
154
    cmd = ['ceph-authtool', keyring, '--create-keyring',
 
155
           '--name=client.{}'.format(service), '--add-key={}'.format(key)]
176
156
    check_call(cmd)
177
 
    log('ceph: Created new ring at %s.' % keyring, level=INFO)
 
157
    log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
 
158
 
 
159
 
 
160
def delete_keyring(service):
 
161
    """Delete an existing Ceph keyring."""
 
162
    keyring = _keyring_path(service)
 
163
    if not os.path.exists(keyring):
 
164
        log('Keyring does not exist at %s' % keyring, level=WARNING)
 
165
        return
 
166
 
 
167
    os.remove(keyring)
 
168
    log('Deleted ring at %s.' % keyring, level=INFO)
178
169
 
179
170
 
180
171
def create_key_file(service, key):
181
 
    ''' Create a file containing key '''
 
172
    """Create a file containing key."""
182
173
    keyfile = _keyfile_path(service)
183
174
    if os.path.exists(keyfile):
184
 
        log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING)
 
175
        log('Keyfile exists at %s.' % keyfile, level=WARNING)
185
176
        return
 
177
 
186
178
    with open(keyfile, 'w') as fd:
187
179
        fd.write(key)
188
 
    log('ceph: Created new keyfile at %s.' % keyfile, level=INFO)
 
180
 
 
181
    log('Created new keyfile at %s.' % keyfile, level=INFO)
189
182
 
190
183
 
191
184
def get_ceph_nodes():
192
 
    ''' Query named relation 'ceph' to detemine current nodes '''
 
185
    """Query named relation 'ceph' to determine current nodes."""
193
186
    hosts = []
194
187
    for r_id in relation_ids('ceph'):
195
188
        for unit in related_units(r_id):
196
189
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
 
190
 
197
191
    return hosts
198
192
 
199
193
 
200
194
def configure(service, key, auth, use_syslog):
201
 
    ''' Perform basic configuration of Ceph '''
 
195
    """Perform basic configuration of Ceph."""
202
196
    create_keyring(service, key)
203
197
    create_key_file(service, key)
204
198
    hosts = get_ceph_nodes()
211
205
 
212
206
 
213
207
def image_mapped(name):
214
 
    ''' Determine whether a RADOS block device is mapped locally '''
 
208
    """Determine whether a RADOS block device is mapped locally."""
215
209
    try:
216
 
        out = check_output(['rbd', 'showmapped'])
 
210
        out = check_output(['rbd', 'showmapped']).decode('UTF-8')
217
211
    except CalledProcessError:
218
212
        return False
219
 
    else:
220
 
        return name in out
 
213
 
 
214
    return name in out
221
215
 
222
216
 
223
217
def map_block_storage(service, pool, image):
224
 
    ''' Map a RADOS block device for local use '''
 
218
    """Map a RADOS block device for local use."""
225
219
    cmd = [
226
220
        'rbd',
227
221
        'map',
235
229
 
236
230
 
237
231
def filesystem_mounted(fs):
238
 
    ''' Determine whether a filesytems is already mounted '''
 
232
    """Determine whether a filesytems is already mounted."""
239
233
    return fs in [f for f, m in mounts()]
240
234
 
241
235
 
242
236
def make_filesystem(blk_device, fstype='ext4', timeout=10):
243
 
    ''' Make a new filesystem on the specified block device '''
 
237
    """Make a new filesystem on the specified block device."""
244
238
    count = 0
245
239
    e_noent = os.errno.ENOENT
246
240
    while not os.path.exists(blk_device):
247
241
        if count >= timeout:
248
 
            log('ceph: gave up waiting on block device %s' % blk_device,
 
242
            log('Gave up waiting on block device %s' % blk_device,
249
243
                level=ERROR)
250
244
            raise IOError(e_noent, os.strerror(e_noent), blk_device)
251
 
        log('ceph: waiting for block device %s to appear' % blk_device,
252
 
            level=INFO)
 
245
 
 
246
        log('Waiting for block device %s to appear' % blk_device,
 
247
            level=DEBUG)
253
248
        count += 1
254
249
        time.sleep(1)
255
250
    else:
256
 
        log('ceph: Formatting block device %s as filesystem %s.' %
 
251
        log('Formatting block device %s as filesystem %s.' %
257
252
            (blk_device, fstype), level=INFO)
258
253
        check_call(['mkfs', '-t', fstype, blk_device])
259
254
 
260
255
 
261
256
def place_data_on_block_device(blk_device, data_src_dst):
262
 
    ''' Migrate data in data_src_dst to blk_device and then remount '''
 
257
    """Migrate data in data_src_dst to blk_device and then remount."""
263
258
    # mount block device into /mnt
264
259
    mount(blk_device, '/mnt')
265
260
    # copy data to /mnt
279
274
 
280
275
# TODO: re-use
281
276
def modprobe(module):
282
 
    ''' Load a kernel module and configure for auto-load on reboot '''
283
 
    log('ceph: Loading kernel module', level=INFO)
 
277
    """Load a kernel module and configure for auto-load on reboot."""
 
278
    log('Loading kernel module', level=INFO)
284
279
    cmd = ['modprobe', module]
285
280
    check_call(cmd)
286
281
    with open('/etc/modules', 'r+') as modules:
289
284
 
290
285
 
291
286
def copy_files(src, dst, symlinks=False, ignore=None):
292
 
    ''' Copy files from src to dst '''
 
287
    """Copy files from src to dst."""
293
288
    for item in os.listdir(src):
294
289
        s = os.path.join(src, item)
295
290
        d = os.path.join(dst, item)
302
297
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
303
298
                        blk_device, fstype, system_services=[],
304
299
                        replicas=3):
305
 
    """
306
 
    NOTE: This function must only be called from a single service unit for
 
300
    """NOTE: This function must only be called from a single service unit for
307
301
    the same rbd_img otherwise data loss will occur.
308
302
 
309
303
    Ensures given pool and RBD image exists, is mapped to a block device,
317
311
    """
318
312
    # Ensure pool, RBD image, RBD mappings are in place.
319
313
    if not pool_exists(service, pool):
320
 
        log('ceph: Creating new pool {}.'.format(pool))
 
314
        log('Creating new pool {}.'.format(pool), level=INFO)
321
315
        create_pool(service, pool, replicas=replicas)
322
316
 
323
317
    if not rbd_exists(service, pool, rbd_img):
324
 
        log('ceph: Creating RBD image ({}).'.format(rbd_img))
 
318
        log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
325
319
        create_rbd_image(service, pool, rbd_img, sizemb)
326
320
 
327
321
    if not image_mapped(rbd_img):
328
 
        log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img))
 
322
        log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
 
323
            level=INFO)
329
324
        map_block_storage(service, pool, rbd_img)
330
325
 
331
326
    # make file system
340
335
 
341
336
        for svc in system_services:
342
337
            if service_running(svc):
343
 
                log('ceph: Stopping services {} prior to migrating data.'
344
 
                    .format(svc))
 
338
                log('Stopping services {} prior to migrating data.'
 
339
                    .format(svc), level=DEBUG)
345
340
                service_stop(svc)
346
341
 
347
342
        place_data_on_block_device(blk_device, mount_point)
348
343
 
349
344
        for svc in system_services:
350
 
            log('ceph: Starting service {} after migrating data.'
351
 
                .format(svc))
 
345
            log('Starting service {} after migrating data.'
 
346
                .format(svc), level=DEBUG)
352
347
            service_start(svc)
353
348
 
354
349
 
355
350
def ensure_ceph_keyring(service, user=None, group=None):
356
 
    '''
357
 
    Ensures a ceph keyring is created for a named service
358
 
    and optionally ensures user and group ownership.
 
351
    """Ensures a ceph keyring is created for a named service and optionally
 
352
    ensures user and group ownership.
359
353
 
360
354
    Returns False if no ceph key is available in relation state.
361
 
    '''
 
355
    """
362
356
    key = None
363
357
    for rid in relation_ids('ceph'):
364
358
        for unit in related_units(rid):
365
359
            key = relation_get('key', rid=rid, unit=unit)
366
360
            if key:
367
361
                break
 
362
 
368
363
    if not key:
369
364
        return False
 
365
 
370
366
    create_keyring(service=service, key=key)
371
367
    keyring = _keyring_path(service)
372
368
    if user and group:
373
369
        check_call(['chown', '%s.%s' % (user, group), keyring])
 
370
 
374
371
    return True
375
372
 
376
373
 
377
374
def ceph_version():
378
 
    ''' Retrieve the local version of ceph '''
 
375
    """Retrieve the local version of ceph."""
379
376
    if os.path.exists('/usr/bin/ceph'):
380
377
        cmd = ['ceph', '-v']
381
 
        output = check_output(cmd)
 
378
        output = check_output(cmd).decode('US-ASCII')
382
379
        output = output.split()
383
380
        if len(output) > 3:
384
381
            return output[2]
386
383
            return None
387
384
    else:
388
385
        return None
 
386
 
 
387
 
 
388
class CephBrokerRq(object):
 
389
    """Ceph broker request.
 
390
 
 
391
    Multiple operations can be added to a request and sent to the Ceph broker
 
392
    to be executed.
 
393
 
 
394
    Request is json-encoded for sending over the wire.
 
395
 
 
396
    The API is versioned and defaults to version 1.
 
397
    """
 
398
    def __init__(self, api_version=1):
 
399
        self.api_version = api_version
 
400
        self.ops = []
 
401
 
 
402
    def add_op_create_pool(self, name, replica_count=3):
 
403
        self.ops.append({'op': 'create-pool', 'name': name,
 
404
                         'replicas': replica_count})
 
405
 
 
406
    @property
 
407
    def request(self):
 
408
        return json.dumps({'api-version': self.api_version, 'ops': self.ops})
 
409
 
 
410
 
 
411
class CephBrokerRsp(object):
 
412
    """Ceph broker response.
 
413
 
 
414
    Response is json-decoded and contents provided as methods/properties.
 
415
 
 
416
    The API is versioned and defaults to version 1.
 
417
    """
 
418
    def __init__(self, encoded_rsp):
 
419
        self.api_version = None
 
420
        self.rsp = json.loads(encoded_rsp)
 
421
 
 
422
    @property
 
423
    def exit_code(self):
 
424
        return self.rsp.get('exit-code')
 
425
 
 
426
    @property
 
427
    def exit_msg(self):
 
428
        return self.rsp.get('stderr')