1
# Copyright 2014-2015 Canonical Limited.
3
# This file is part of charm-helpers.
5
# charm-helpers is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3 as
7
# published by the Free Software Foundation.
9
# charm-helpers is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
18
# Copyright 2012 Canonical Ltd.
20
# This file is sourced from lp:openstack-charm-helpers
23
# James Page <james.page@ubuntu.com>
24
# Adam Gandelman <adamg@ubuntu.com>
32
from subprocess import (
37
from charmhelpers.core.hookenv import (
47
from charmhelpers.core.host import (
55
from charmhelpers.fetch import (
59
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
60
KEYFILE = '/etc/ceph/ceph.client.{}.key'
62
CEPH_CONF = """[global]
63
auth supported = {auth}
65
mon host = {mon_hosts}
66
log to syslog = {use_syslog}
67
err to syslog = {use_syslog}
68
clog to syslog = {use_syslog}
73
"""Basic Ceph client installation."""
74
ceph_dir = "/etc/ceph"
75
if not os.path.exists(ceph_dir):
78
apt_install('ceph-common', fatal=True)
81
def rbd_exists(service, pool, rbd_img):
82
"""Check to see if a RADOS block device exists."""
84
out = check_output(['rbd', 'list', '--id',
85
service, '--pool', pool]).decode('UTF-8')
86
except CalledProcessError:
92
def create_rbd_image(service, pool, image, sizemb):
93
"""Create a new RADOS block device."""
94
cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
99
def pool_exists(service, name):
100
"""Check to see if a RADOS pool already exists."""
102
out = check_output(['rados', '--id', service,
103
'lspools']).decode('UTF-8')
104
except CalledProcessError:
110
def get_osds(service):
111
"""Return a list of all Ceph Object Storage Daemons currently in the
114
version = ceph_version()
115
if version and version >= '0.56':
116
return json.loads(check_output(['ceph', '--id', service,
118
'--format=json']).decode('UTF-8'))
123
def create_pool(service, name, replicas=3):
124
"""Create a new RADOS pool."""
125
if pool_exists(service, name):
126
log("Ceph pool {} already exists, skipping creation".format(name),
130
# Calculate the number of placement groups based
131
# on upstream recommended best practices.
132
osds = get_osds(service)
134
pgnum = (len(osds) * 100 // replicas)
136
# NOTE(james-page): Default to 200 for older ceph versions
137
# which don't support OSD query from cli
140
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
143
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
148
def delete_pool(service, name):
149
"""Delete a RADOS pool from ceph."""
150
cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
151
'--yes-i-really-really-mean-it']
155
def _keyfile_path(service):
156
return KEYFILE.format(service)
159
def _keyring_path(service):
160
return KEYRING.format(service)
163
def create_keyring(service, key):
164
"""Create a new Ceph keyring containing key."""
165
keyring = _keyring_path(service)
166
if os.path.exists(keyring):
167
log('Ceph keyring exists at %s.' % keyring, level=WARNING)
170
cmd = ['ceph-authtool', keyring, '--create-keyring',
171
'--name=client.{}'.format(service), '--add-key={}'.format(key)]
173
log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
176
def delete_keyring(service):
177
"""Delete an existing Ceph keyring."""
178
keyring = _keyring_path(service)
179
if not os.path.exists(keyring):
180
log('Keyring does not exist at %s' % keyring, level=WARNING)
184
log('Deleted ring at %s.' % keyring, level=INFO)
187
def create_key_file(service, key):
188
"""Create a file containing key."""
189
keyfile = _keyfile_path(service)
190
if os.path.exists(keyfile):
191
log('Keyfile exists at %s.' % keyfile, level=WARNING)
194
with open(keyfile, 'w') as fd:
197
log('Created new keyfile at %s.' % keyfile, level=INFO)
200
def get_ceph_nodes():
201
"""Query named relation 'ceph' to determine current nodes."""
203
for r_id in relation_ids('ceph'):
204
for unit in related_units(r_id):
205
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
210
def configure(service, key, auth, use_syslog):
211
"""Perform basic configuration of Ceph."""
212
create_keyring(service, key)
213
create_key_file(service, key)
214
hosts = get_ceph_nodes()
215
with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
216
ceph_conf.write(CEPH_CONF.format(auth=auth,
217
keyring=_keyring_path(service),
218
mon_hosts=",".join(map(str, hosts)),
219
use_syslog=use_syslog))
223
def image_mapped(name):
224
"""Determine whether a RADOS block device is mapped locally."""
226
out = check_output(['rbd', 'showmapped']).decode('UTF-8')
227
except CalledProcessError:
233
def map_block_storage(service, pool, image):
234
"""Map a RADOS block device for local use."""
238
'{}/{}'.format(pool, image),
242
_keyfile_path(service),
247
def filesystem_mounted(fs):
248
"""Determine whether a filesytems is already mounted."""
249
return fs in [f for f, m in mounts()]
252
def make_filesystem(blk_device, fstype='ext4', timeout=10):
253
"""Make a new filesystem on the specified block device."""
255
e_noent = os.errno.ENOENT
256
while not os.path.exists(blk_device):
258
log('Gave up waiting on block device %s' % blk_device,
260
raise IOError(e_noent, os.strerror(e_noent), blk_device)
262
log('Waiting for block device %s to appear' % blk_device,
267
log('Formatting block device %s as filesystem %s.' %
268
(blk_device, fstype), level=INFO)
269
check_call(['mkfs', '-t', fstype, blk_device])
272
def place_data_on_block_device(blk_device, data_src_dst):
273
"""Migrate data in data_src_dst to blk_device and then remount."""
274
# mount block device into /mnt
275
mount(blk_device, '/mnt')
277
copy_files(data_src_dst, '/mnt')
278
# umount block device
280
# Grab user/group ID's from original source
281
_dir = os.stat(data_src_dst)
284
# re-mount where the data should originally be
285
# TODO: persist is currently a NO-OP in core.host
286
mount(blk_device, data_src_dst, persist=True)
287
# ensure original ownership of new mount.
288
os.chown(data_src_dst, uid, gid)
292
def modprobe(module):
293
"""Load a kernel module and configure for auto-load on reboot."""
294
log('Loading kernel module', level=INFO)
295
cmd = ['modprobe', module]
297
with open('/etc/modules', 'r+') as modules:
298
if module not in modules.read():
299
modules.write(module)
302
def copy_files(src, dst, symlinks=False, ignore=None):
303
"""Copy files from src to dst."""
304
for item in os.listdir(src):
305
s = os.path.join(src, item)
306
d = os.path.join(dst, item)
308
shutil.copytree(s, d, symlinks, ignore)
313
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
314
blk_device, fstype, system_services=[],
316
"""NOTE: This function must only be called from a single service unit for
317
the same rbd_img otherwise data loss will occur.
319
Ensures given pool and RBD image exists, is mapped to a block device,
320
and the device is formatted and mounted at the given mount_point.
322
If formatting a device for the first time, data existing at mount_point
323
will be migrated to the RBD device before being re-mounted.
325
All services listed in system_services will be stopped prior to data
326
migration and restarted when complete.
328
# Ensure pool, RBD image, RBD mappings are in place.
329
if not pool_exists(service, pool):
330
log('Creating new pool {}.'.format(pool), level=INFO)
331
create_pool(service, pool, replicas=replicas)
333
if not rbd_exists(service, pool, rbd_img):
334
log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
335
create_rbd_image(service, pool, rbd_img, sizemb)
337
if not image_mapped(rbd_img):
338
log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
340
map_block_storage(service, pool, rbd_img)
343
# TODO: What happens if for whatever reason this is run again and
344
# the data is already in the rbd device and/or is mounted??
345
# When it is mounted already, it will fail to make the fs
346
# XXX: This is really sketchy! Need to at least add an fstab entry
347
# otherwise this hook will blow away existing data if its executed
349
if not filesystem_mounted(mount_point):
350
make_filesystem(blk_device, fstype)
352
for svc in system_services:
353
if service_running(svc):
354
log('Stopping services {} prior to migrating data.'
355
.format(svc), level=DEBUG)
358
place_data_on_block_device(blk_device, mount_point)
360
for svc in system_services:
361
log('Starting service {} after migrating data.'
362
.format(svc), level=DEBUG)
366
def ensure_ceph_keyring(service, user=None, group=None):
367
"""Ensures a ceph keyring is created for a named service and optionally
368
ensures user and group ownership.
370
Returns False if no ceph key is available in relation state.
373
for rid in relation_ids('ceph'):
374
for unit in related_units(rid):
375
key = relation_get('key', rid=rid, unit=unit)
382
create_keyring(service=service, key=key)
383
keyring = _keyring_path(service)
385
check_call(['chown', '%s.%s' % (user, group), keyring])
391
"""Retrieve the local version of ceph."""
392
if os.path.exists('/usr/bin/ceph'):
394
output = check_output(cmd).decode('US-ASCII')
395
output = output.split()
404
class CephBrokerRq(object):
405
"""Ceph broker request.
407
Multiple operations can be added to a request and sent to the Ceph broker
410
Request is json-encoded for sending over the wire.
412
The API is versioned and defaults to version 1.
414
def __init__(self, api_version=1):
415
self.api_version = api_version
418
def add_op_create_pool(self, name, replica_count=3):
419
self.ops.append({'op': 'create-pool', 'name': name,
420
'replicas': replica_count})
424
return json.dumps({'api-version': self.api_version, 'ops': self.ops})
427
class CephBrokerRsp(object):
428
"""Ceph broker response.
430
Response is json-decoded and contents provided as methods/properties.
432
The API is versioned and defaults to version 1.
434
def __init__(self, encoded_rsp):
435
self.api_version = None
436
self.rsp = json.loads(encoded_rsp)
440
return self.rsp.get('exit-code')
444
return self.rsp.get('stderr')