72
76
err to syslog = {use_syslog}
73
77
clog to syslog = {use_syslog}
79
# For 50 < osds < 240,000 OSDs (Roughly 1 Exabyte at 6T OSDs)
80
powers_of_two = [8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608]
83
def validator(value, valid_type, valid_range=None):
85
Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values
90
This says I'm testing value=1. It must be an int inclusive in [0,2]
92
:param value: The value to validate
93
:param valid_type: The type that value should be.
94
:param valid_range: A range of values that value can assume.
97
assert isinstance(value, valid_type), "{} is not a {}".format(
100
if valid_range is not None:
101
assert isinstance(valid_range, list), \
102
"valid_range must be a list, was given {}".format(valid_range)
103
# If we're dealing with strings
104
if valid_type is six.string_types:
105
assert value in valid_range, \
106
"{} is not in the list {}".format(value, valid_range)
107
# Integer, float should have a min and max
109
if len(valid_range) != 2:
111
"Invalid valid_range list of {} for {}. "
112
"List must be [min,max]".format(valid_range, value))
113
assert value >= valid_range[0], \
114
"{} is less than minimum allowed value of {}".format(
115
value, valid_range[0])
116
assert value <= valid_range[1], \
117
"{} is greater than maximum allowed value of {}".format(
118
value, valid_range[1])
121
class PoolCreationError(Exception):
123
A custom error to inform the caller that a pool creation failed. Provides an error message
126
def __init__(self, message):
127
super(PoolCreationError, self).__init__(message)
132
An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool.
133
Do not call create() on this base class as it will not do anything. Instantiate a child class and call create().
136
def __init__(self, service, name):
137
self.service = service
140
# Create the pool if it doesn't exist already
141
# To be implemented by subclasses
145
def add_cache_tier(self, cache_pool, mode):
147
Adds a new cache tier to an existing pool.
148
:param cache_pool: six.string_types. The cache tier pool name to add.
149
:param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"]
152
# Check the input types and values
153
validator(value=cache_pool, valid_type=six.string_types)
154
validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"])
156
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool])
157
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode])
158
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool])
159
check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom'])
161
def remove_cache_tier(self, cache_pool):
163
Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete.
164
:param cache_pool: six.string_types. The cache tier pool name to remove.
167
# read-only is easy, writeback is much harder
168
mode = get_cache_mode(self.service, cache_pool)
169
if mode == 'readonly':
170
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
171
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
173
elif mode == 'writeback':
174
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
175
# Flush the cache and wait for it to return
176
check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
177
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
178
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
180
def get_pgs(self, pool_size):
182
:param pool_size: int. pool_size is either the number of replicas for replicated pools or the K+M sum for
184
:return: int. The number of pgs to use.
186
validator(value=pool_size, valid_type=int)
187
osd_list = get_osds(self.service)
189
# NOTE(james-page): Default to 200 for older ceph versions
190
# which don't support OSD query from cli
193
osd_list_length = len(osd_list)
194
# Calculate based on Ceph best practices
195
if osd_list_length < 5:
197
elif 5 < osd_list_length < 10:
199
elif 10 < osd_list_length < 50:
202
estimate = (osd_list_length * 100) / pool_size
203
# Return the next nearest power of 2
204
index = bisect.bisect_right(powers_of_two, estimate)
205
return powers_of_two[index]
208
class ReplicatedPool(Pool):
209
def __init__(self, service, name, pg_num=None, replicas=2):
210
super(ReplicatedPool, self).__init__(service=service, name=name)
211
self.replicas = replicas
213
self.pg_num = self.get_pgs(self.replicas)
218
if not pool_exists(self.service, self.name):
220
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create',
221
self.name, str(self.pg_num)]
224
except CalledProcessError:
228
# Default jerasure erasure coded pool
229
class ErasurePool(Pool):
230
def __init__(self, service, name, erasure_code_profile="default"):
231
super(ErasurePool, self).__init__(service=service, name=name)
232
self.erasure_code_profile = erasure_code_profile
235
if not pool_exists(self.service, self.name):
236
# Try to find the erasure profile information so we can properly size the pgs
237
erasure_profile = get_erasure_profile(service=self.service, name=self.erasure_code_profile)
240
if erasure_profile is None:
241
log(message='Failed to discover erasure_profile named={}'.format(self.erasure_code_profile),
243
raise PoolCreationError(message='unable to find erasure profile {}'.format(self.erasure_code_profile))
244
if 'k' not in erasure_profile or 'm' not in erasure_profile:
246
log(message='Unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile),
248
raise PoolCreationError(
249
message='unable to find k (data chunks) or m (coding chunks) in {}'.format(erasure_profile))
251
pgs = self.get_pgs(int(erasure_profile['k']) + int(erasure_profile['m']))
253
cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs),
254
'erasure', self.erasure_code_profile]
257
except CalledProcessError:
260
"""Get an existing erasure code profile if it already exists.
261
Returns json formatted output"""
264
def get_mon_map(service):
266
Returns the current monitor map.
267
:param service: six.string_types. The Ceph user name to run the command under
268
:return: json string. :raise: ValueError if the monmap fails to parse.
269
Also raises CalledProcessError if our ceph command fails
272
mon_status = check_output(
273
['ceph', '--id', service,
274
'mon_status', '--format=json'])
276
return json.loads(mon_status)
277
except ValueError as v:
278
log("Unable to parse mon_status json: {}. Error: {}".format(
279
mon_status, v.message))
281
except CalledProcessError as e:
282
log("mon_status command failed with message: {}".format(
287
def hash_monitor_names(service):
289
Uses the get_mon_map() function to get information about the monitor
291
Hash the name of each monitor. Return a sorted list of monitor hashes
292
in an ascending order.
293
:param service: six.string_types. The Ceph user name to run the command under
294
:rtype : dict. json dict of monitor name, ip address and rank
296
'name': 'ip-172-31-13-165',
298
'addr': '172.31.13.165:6789/0'}
302
monitor_list = get_mon_map(service=service)
303
if monitor_list['monmap']['mons']:
304
for mon in monitor_list['monmap']['mons']:
306
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
307
return sorted(hash_list)
310
except (ValueError, CalledProcessError):
314
def monitor_key_delete(service, key):
316
Delete a key and value pair from the monitor cluster
317
:param service: six.string_types. The Ceph user name to run the command under
318
Deletes a key value pair on the monitor cluster.
319
:param key: six.string_types. The key to delete.
323
['ceph', '--id', service,
324
'config-key', 'del', str(key)])
325
except CalledProcessError as e:
326
log("Monitor config-key put failed with message: {}".format(
331
def monitor_key_set(service, key, value):
333
Sets a key value pair on the monitor cluster.
334
:param service: six.string_types. The Ceph user name to run the command under
335
:param key: six.string_types. The key to set.
336
:param value: The value to set. This will be converted to a string
341
['ceph', '--id', service,
342
'config-key', 'put', str(key), str(value)])
343
except CalledProcessError as e:
344
log("Monitor config-key put failed with message: {}".format(
349
def monitor_key_get(service, key):
351
Gets the value of an existing key in the monitor cluster.
352
:param service: six.string_types. The Ceph user name to run the command under
353
:param key: six.string_types. The key to search for.
354
:return: Returns the value of that key or None if not found.
357
output = check_output(
358
['ceph', '--id', service,
359
'config-key', 'get', str(key)])
361
except CalledProcessError as e:
362
log("Monitor config-key get failed with message: {}".format(
367
def monitor_key_exists(service, key):
369
Searches for the existence of a key in the monitor cluster.
370
:param service: six.string_types. The Ceph user name to run the command under
371
:param key: six.string_types. The key to search for
372
:return: Returns True if the key exists, False if not and raises an
373
exception if an unknown error occurs. :raise: CalledProcessError if
374
an unknown error occurs
378
['ceph', '--id', service,
379
'config-key', 'exists', str(key)])
380
# I can return true here regardless because Ceph returns
381
# ENOENT if the key wasn't found
383
except CalledProcessError as e:
384
if e.returncode == errno.ENOENT:
387
log("Unknown error from ceph config-get exists: {} {}".format(
388
e.returncode, e.output))
392
def get_erasure_profile(service, name):
394
:param service: six.string_types. The Ceph user name to run the command under
399
out = check_output(['ceph', '--id', service,
400
'osd', 'erasure-code-profile', 'get',
401
name, '--format=json'])
402
return json.loads(out)
403
except (CalledProcessError, OSError, ValueError):
407
def pool_set(service, pool_name, key, value):
409
Sets a value for a RADOS pool in ceph.
410
:param service: six.string_types. The Ceph user name to run the command under
411
:param pool_name: six.string_types
412
:param key: six.string_types
414
:return: None. Can raise CalledProcessError
416
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value]
419
except CalledProcessError:
423
def snapshot_pool(service, pool_name, snapshot_name):
425
Snapshots a RADOS pool in ceph.
426
:param service: six.string_types. The Ceph user name to run the command under
427
:param pool_name: six.string_types
428
:param snapshot_name: six.string_types
429
:return: None. Can raise CalledProcessError
431
cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name]
434
except CalledProcessError:
438
def remove_pool_snapshot(service, pool_name, snapshot_name):
440
Remove a snapshot from a RADOS pool in ceph.
441
:param service: six.string_types. The Ceph user name to run the command under
442
:param pool_name: six.string_types
443
:param snapshot_name: six.string_types
444
:return: None. Can raise CalledProcessError
446
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name]
449
except CalledProcessError:
453
# max_bytes should be an int or long
454
def set_pool_quota(service, pool_name, max_bytes):
456
:param service: six.string_types. The Ceph user name to run the command under
457
:param pool_name: six.string_types
458
:param max_bytes: int or long
459
:return: None. Can raise CalledProcessError
461
# Set a byte quota on a RADOS pool in ceph.
462
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name,
463
'max_bytes', str(max_bytes)]
466
except CalledProcessError:
470
def remove_pool_quota(service, pool_name):
472
Set a byte quota on a RADOS pool in ceph.
473
:param service: six.string_types. The Ceph user name to run the command under
474
:param pool_name: six.string_types
475
:return: None. Can raise CalledProcessError
477
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0']
480
except CalledProcessError:
484
def remove_erasure_profile(service, profile_name):
486
Create a new erasure code profile if one does not already exist for it. Updates
487
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
489
:param service: six.string_types. The Ceph user name to run the command under
490
:param profile_name: six.string_types
491
:return: None. Can raise CalledProcessError
493
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm',
497
except CalledProcessError:
501
def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure',
502
failure_domain='host',
503
data_chunks=2, coding_chunks=1,
504
locality=None, durability_estimator=None):
506
Create a new erasure code profile if one does not already exist for it. Updates
507
the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/
509
:param service: six.string_types. The Ceph user name to run the command under
510
:param profile_name: six.string_types
511
:param erasure_plugin_name: six.string_types
512
:param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region',
513
'room', 'root', 'row'])
514
:param data_chunks: int
515
:param coding_chunks: int
517
:param durability_estimator: int
518
:return: None. Can raise CalledProcessError
520
# Ensure this failure_domain is allowed by Ceph
521
validator(failure_domain, six.string_types,
522
['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'])
524
cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name,
525
'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks),
526
'ruleset_failure_domain=' + failure_domain]
527
if locality is not None and durability_estimator is not None:
528
raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.")
530
# Add plugin specific information
531
if locality is not None:
532
# For local erasure codes
533
cmd.append('l=' + str(locality))
534
if durability_estimator is not None:
535
# For Shec erasure codes
536
cmd.append('c=' + str(durability_estimator))
538
if erasure_profile_exists(service, profile_name):
539
cmd.append('--force')
543
except CalledProcessError:
547
def rename_pool(service, old_name, new_name):
549
Rename a Ceph pool from old_name to new_name
550
:param service: six.string_types. The Ceph user name to run the command under
551
:param old_name: six.string_types
552
:param new_name: six.string_types
555
validator(value=old_name, valid_type=six.string_types)
556
validator(value=new_name, valid_type=six.string_types)
558
cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name]
562
def erasure_profile_exists(service, name):
564
Check to see if an Erasure code profile already exists.
565
:param service: six.string_types. The Ceph user name to run the command under
566
:param name: six.string_types
569
validator(value=name, valid_type=six.string_types)
571
check_call(['ceph', '--id', service,
572
'osd', 'erasure-code-profile', 'get',
575
except CalledProcessError:
579
def get_cache_mode(service, pool_name):
581
Find the current caching mode of the pool_name given.
582
:param service: six.string_types. The Ceph user name to run the command under
583
:param pool_name: six.string_types
586
validator(value=service, valid_type=six.string_types)
587
validator(value=pool_name, valid_type=six.string_types)
588
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
590
osd_json = json.loads(out)
591
for pool in osd_json['pools']:
592
if pool['pool_name'] == pool_name:
593
return pool['cache_mode']
599
def pool_exists(service, name):
600
"""Check to see if a RADOS pool already exists."""
602
out = check_output(['rados', '--id', service,
603
'lspools']).decode('UTF-8')
604
except CalledProcessError:
610
def get_osds(service):
611
"""Return a list of all Ceph Object Storage Daemons currently in the
614
version = ceph_version()
615
if version and version >= '0.56':
616
return json.loads(check_output(['ceph', '--id', service,
618
'--format=json']).decode('UTF-8'))