59
''' Basic Ceph client installation '''
57
"""Basic Ceph client installation."""
60
58
ceph_dir = "/etc/ceph"
61
59
if not os.path.exists(ceph_dir):
63
62
apt_install('ceph-common', fatal=True)
66
65
def rbd_exists(service, pool, rbd_img):
67
''' Check to see if a RADOS block device exists '''
66
"""Check to see if a RADOS block device exists."""
69
out = check_output(['rbd', 'list', '--id', service,
68
out = check_output(['rbd', 'list', '--id',
69
service, '--pool', pool]).decode('UTF-8')
71
70
except CalledProcessError:
77
76
def create_rbd_image(service, pool, image, sizemb):
78
''' Create a new RADOS block device '''
77
"""Create a new RADOS block device."""
78
cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
93
83
def pool_exists(service, name):
94
''' Check to see if a RADOS pool already exists '''
84
"""Check to see if a RADOS pool already exists."""
96
out = check_output(['rados', '--id', service, 'lspools'])
86
out = check_output(['rados', '--id', service,
87
'lspools']).decode('UTF-8')
97
88
except CalledProcessError:
103
94
def get_osds(service):
105
Return a list of all Ceph Object Storage Daemons
106
currently in the cluster
95
"""Return a list of all Ceph Object Storage Daemons currently in the
108
98
version = ceph_version()
109
99
if version and version >= '0.56':
110
100
return json.loads(check_output(['ceph', '--id', service,
111
'osd', 'ls', '--format=json']))
116
def create_pool(service, name, replicas=2):
117
''' Create a new RADOS pool '''
102
'--format=json']).decode('UTF-8'))
107
def create_pool(service, name, replicas=3):
108
"""Create a new RADOS pool."""
118
109
if pool_exists(service, name):
119
110
log("Ceph pool {} already exists, skipping creation".format(name),
122
114
# Calculate the number of placement groups based
123
115
# on upstream recommended best practices.
124
116
osds = get_osds(service)
126
pgnum = (len(osds) * 100 / replicas)
118
pgnum = (len(osds) * 100 // replicas)
128
120
# NOTE(james-page): Default to 200 for older ceph versions
129
121
# which don't support OSD query from cli
132
'ceph', '--id', service,
133
'osd', 'pool', 'create',
124
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
138
'ceph', '--id', service,
139
'osd', 'pool', 'set', name,
140
'size', str(replicas)
127
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
145
132
def delete_pool(service, name):
146
''' Delete a RADOS pool from ceph '''
148
'ceph', '--id', service,
149
'osd', 'pool', 'delete',
150
name, '--yes-i-really-really-mean-it'
133
"""Delete a RADOS pool from ceph."""
134
cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
135
'--yes-i-really-really-mean-it']
163
147
def create_keyring(service, key):
164
''' Create a new Ceph keyring containing key'''
148
"""Create a new Ceph keyring containing key."""
165
149
keyring = _keyring_path(service)
166
150
if os.path.exists(keyring):
167
log('ceph: Keyring exists at %s.' % keyring, level=WARNING)
151
log('Ceph keyring exists at %s.' % keyring, level=WARNING)
173
'--name=client.{}'.format(service),
174
'--add-key={}'.format(key)
154
cmd = ['ceph-authtool', keyring, '--create-keyring',
155
'--name=client.{}'.format(service), '--add-key={}'.format(key)]
177
log('ceph: Created new ring at %s.' % keyring, level=INFO)
157
log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
180
160
def create_key_file(service, key):
181
''' Create a file containing key '''
161
"""Create a file containing key."""
182
162
keyfile = _keyfile_path(service)
183
163
if os.path.exists(keyfile):
184
log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING)
164
log('Keyfile exists at %s.' % keyfile, level=WARNING)
186
167
with open(keyfile, 'w') as fd:
188
log('ceph: Created new keyfile at %s.' % keyfile, level=INFO)
170
log('Created new keyfile at %s.' % keyfile, level=INFO)
191
173
def get_ceph_nodes():
192
''' Query named relation 'ceph' to detemine current nodes '''
174
"""Query named relation 'ceph' to determine current nodes."""
194
176
for r_id in relation_ids('ceph'):
195
177
for unit in related_units(r_id):
196
178
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
200
183
def configure(service, key, auth, use_syslog):
201
''' Perform basic configuration of Ceph '''
184
"""Perform basic configuration of Ceph."""
202
185
create_keyring(service, key)
203
186
create_key_file(service, key)
204
187
hosts = get_ceph_nodes()
237
220
def filesystem_mounted(fs):
238
''' Determine whether a filesytems is already mounted '''
221
"""Determine whether a filesytems is already mounted."""
239
222
return fs in [f for f, m in mounts()]
242
225
def make_filesystem(blk_device, fstype='ext4', timeout=10):
243
''' Make a new filesystem on the specified block device '''
226
"""Make a new filesystem on the specified block device."""
245
228
e_noent = os.errno.ENOENT
246
229
while not os.path.exists(blk_device):
247
230
if count >= timeout:
248
log('ceph: gave up waiting on block device %s' % blk_device,
231
log('Gave up waiting on block device %s' % blk_device,
250
233
raise IOError(e_noent, os.strerror(e_noent), blk_device)
251
log('ceph: waiting for block device %s to appear' % blk_device,
235
log('Waiting for block device %s to appear' % blk_device,
256
log('ceph: Formatting block device %s as filesystem %s.' %
240
log('Formatting block device %s as filesystem %s.' %
257
241
(blk_device, fstype), level=INFO)
258
242
check_call(['mkfs', '-t', fstype, blk_device])
261
245
def place_data_on_block_device(blk_device, data_src_dst):
262
''' Migrate data in data_src_dst to blk_device and then remount '''
246
"""Migrate data in data_src_dst to blk_device and then remount."""
263
247
# mount block device into /mnt
264
248
mount(blk_device, '/mnt')
265
249
# copy data to /mnt
302
286
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
303
blk_device, fstype, system_services=[]):
305
NOTE: This function must only be called from a single service unit for
287
blk_device, fstype, system_services=[],
289
"""NOTE: This function must only be called from a single service unit for
306
290
the same rbd_img otherwise data loss will occur.
308
292
Ensures given pool and RBD image exists, is mapped to a block device,
317
301
# Ensure pool, RBD image, RBD mappings are in place.
318
302
if not pool_exists(service, pool):
319
log('ceph: Creating new pool {}.'.format(pool))
320
create_pool(service, pool)
303
log('Creating new pool {}.'.format(pool), level=INFO)
304
create_pool(service, pool, replicas=replicas)
322
306
if not rbd_exists(service, pool, rbd_img):
323
log('ceph: Creating RBD image ({}).'.format(rbd_img))
307
log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
324
308
create_rbd_image(service, pool, rbd_img, sizemb)
326
310
if not image_mapped(rbd_img):
327
log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img))
311
log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
328
313
map_block_storage(service, pool, rbd_img)
330
315
# make file system
340
325
for svc in system_services:
341
326
if service_running(svc):
342
log('ceph: Stopping services {} prior to migrating data.'
327
log('Stopping services {} prior to migrating data.'
328
.format(svc), level=DEBUG)
344
329
service_stop(svc)
346
331
place_data_on_block_device(blk_device, mount_point)
348
333
for svc in system_services:
349
log('ceph: Starting service {} after migrating data.'
334
log('Starting service {} after migrating data.'
335
.format(svc), level=DEBUG)
351
336
service_start(svc)
354
339
def ensure_ceph_keyring(service, user=None, group=None):
356
Ensures a ceph keyring is created for a named service
357
and optionally ensures user and group ownership.
340
"""Ensures a ceph keyring is created for a named service and optionally
341
ensures user and group ownership.
359
343
Returns False if no ceph key is available in relation state.
362
346
for rid in relation_ids('ceph'):
363
347
for unit in related_units(rid):
364
348
key = relation_get('key', rid=rid, unit=unit)
369
355
create_keyring(service=service, key=key)
370
356
keyring = _keyring_path(service)
371
357
if user and group:
372
358
check_call(['chown', '%s.%s' % (user, group), keyring])
376
363
def ceph_version():
377
''' Retrieve the local version of ceph '''
364
"""Retrieve the local version of ceph."""
378
365
if os.path.exists('/usr/bin/ceph'):
379
366
cmd = ['ceph', '-v']
380
output = check_output(cmd)
367
output = check_output(cmd).decode('US-ASCII')
381
368
output = output.split()
382
369
if len(output) > 3: