3
# Copyright 2016 Canonical Ltd
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
19
from charmhelpers.core.hookenv import (
25
from charmhelpers.contrib.storage.linux.ceph import (
26
create_erasure_profile,
28
erasure_profile_exists,
41
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
42
# This should do a decent job of preventing people from passing in bad values.
43
# It will give a useful error message
45
# "Ceph Key Name": [Python type, [Valid Range]]
48
"crash_replay_interval": [int],
49
"pgp_num": [int], # = or < pg_num
50
"crush_ruleset": [int],
54
"nosizechange": [bool],
55
"write_fadvise_dontneed": [bool],
57
"nodeep-scrub": [bool],
58
"hit_set_type": [basestring, ["bloom", "explicit_hash",
60
"hit_set_count": [int, [1, 1]],
61
"hit_set_period": [int],
62
"hit_set_fpp": [float, [0.0, 1.0]],
63
"cache_target_dirty_ratio": [float],
64
"cache_target_dirty_high_ratio": [float],
65
"cache_target_full_ratio": [float],
66
"target_max_bytes": [int],
67
"target_max_objects": [int],
68
"cache_min_flush_age": [int],
69
"cache_min_evict_age": [int],
88
def decode_req_encode_rsp(f):
89
"""Decorator to decode incoming requests and encode responses."""
91
def decode_inner(req):
92
return json.dumps(f(json.loads(req)))
97
@decode_req_encode_rsp
98
def process_requests(reqs):
99
"""Process Ceph broker request(s).
101
This is a versioned api. API version must be supplied by the client making
104
request_id = reqs.get('request-id')
106
version = reqs.get('api-version')
108
log('Processing request {}'.format(request_id), level=DEBUG)
109
resp = process_requests_v1(reqs['ops'])
111
resp['request-id'] = request_id
115
except Exception as exc:
116
log(str(exc), level=ERROR)
117
msg = ("Unexpected error occurred while processing requests: %s" %
119
log(msg, level=ERROR)
120
return {'exit-code': 1, 'stderr': msg}
122
msg = ("Missing or invalid api version (%s)" % version)
123
resp = {'exit-code': 1, 'stderr': msg}
125
resp['request-id'] = request_id
130
def handle_create_erasure_profile(request, service):
131
# "local" | "shec" or it defaults to "jerasure"
132
erasure_type = request.get('erasure-type')
133
# "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
134
failure_domain = request.get('failure-domain')
135
name = request.get('name')
140
if failure_domain not in CEPH_BUCKET_TYPES:
141
msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
142
log(msg, level=ERROR)
143
return {'exit-code': 1, 'stderr': msg}
145
create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
146
profile_name=name, failure_domain=failure_domain,
147
data_chunks=k, coding_chunks=m, locality=l)
150
def handle_erasure_pool(request, service):
151
pool_name = request.get('name')
152
erasure_profile = request.get('erasure-profile')
153
quota = request.get('max-bytes')
154
weight = request.get('weight')
156
if erasure_profile is None:
157
erasure_profile = "default-canonical"
159
# Check for missing params
160
if pool_name is None:
161
msg = "Missing parameter. name is required for the pool"
162
log(msg, level=ERROR)
163
return {'exit-code': 1, 'stderr': msg}
165
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
166
if not erasure_profile_exists(service=service, name=erasure_profile):
167
# TODO: Fail and tell them to create the profile or default
168
msg = ("erasure-profile {} does not exist. Please create it with: "
169
"create-erasure-profile".format(erasure_profile))
170
log(msg, level=ERROR)
171
return {'exit-code': 1, 'stderr': msg}
173
pool = ErasurePool(service=service, name=pool_name,
174
erasure_code_profile=erasure_profile,
176
# Ok make the erasure pool
177
if not pool_exists(service=service, name=pool_name):
178
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
183
# Set a quota if requested
184
if quota is not None:
185
set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
188
def handle_replicated_pool(request, service):
189
pool_name = request.get('name')
190
replicas = request.get('replicas')
191
quota = request.get('max-bytes')
192
weight = request.get('weight')
193
pg_num = request.get('pg_num')
195
# Check for missing params
196
if pool_name is None or replicas is None:
197
msg = "Missing parameter. name and replicas are required"
198
log(msg, level=ERROR)
199
return {'exit-code': 1, 'stderr': msg}
203
kwargs['pg_num'] = pg_num
205
kwargs['percent_data'] = weight
207
kwargs['replicas'] = replicas
209
pool = ReplicatedPool(service=service,
210
name=pool_name, **kwargs)
211
if not pool_exists(service=service, name=pool_name):
212
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
216
log("Pool '%s' already exists - skipping create" % pool.name,
219
# Set a quota if requested
220
if quota is not None:
221
set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
224
def handle_create_cache_tier(request, service):
225
# mode = "writeback" | "readonly"
226
storage_pool = request.get('cold-pool')
227
cache_pool = request.get('hot-pool')
228
cache_mode = request.get('mode')
230
if cache_mode is None:
231
cache_mode = "writeback"
233
# cache and storage pool must exist first
234
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
235
service=service, name=cache_pool):
236
msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
237
"them first".format(storage_pool, cache_pool))
238
log(msg, level=ERROR)
239
return {'exit-code': 1, 'stderr': msg}
241
p = Pool(service=service, name=storage_pool)
242
p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
245
def handle_remove_cache_tier(request, service):
246
storage_pool = request.get('cold-pool')
247
cache_pool = request.get('hot-pool')
248
# cache and storage pool must exist first
249
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
250
service=service, name=cache_pool):
251
msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
252
"deleting cache tier".format(storage_pool, cache_pool))
253
log(msg, level=ERROR)
254
return {'exit-code': 1, 'stderr': msg}
256
pool = Pool(name=storage_pool, service=service)
257
pool.remove_cache_tier(cache_pool=cache_pool)
260
def handle_set_pool_value(request, service):
261
# Set arbitrary pool values
262
params = {'pool': request.get('name'),
263
'key': request.get('key'),
264
'value': request.get('value')}
265
if params['key'] not in POOL_KEYS:
266
msg = "Invalid key '%s'" % params['key']
267
log(msg, level=ERROR)
268
return {'exit-code': 1, 'stderr': msg}
270
# Get the validation method
271
validator_params = POOL_KEYS[params['key']]
272
if len(validator_params) is 1:
273
# Validate that what the user passed is actually legal per Ceph's rules
274
validator(params['value'], validator_params[0])
276
# Validate that what the user passed is actually legal per Ceph's rules
277
validator(params['value'], validator_params[0], validator_params[1])
280
pool_set(service=service, pool_name=params['pool'], key=params['key'],
281
value=params['value'])
284
def process_requests_v1(reqs):
285
"""Process v1 requests.
287
Takes a list of requests (dicts) and processes each one. If an error is
288
found, processing stops and the client is notified in the response.
290
Returns a response dict containing the exit code (non-zero if any
291
operation failed along with an explanation).
294
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
297
log("Processing op='%s'" % op, level=DEBUG)
298
# Use admin client since we do not have other client key locations
299
# setup to use them for these operations.
301
if op == "create-pool":
302
pool_type = req.get('pool-type') # "replicated" | "erasure"
304
# Default to replicated if pool_type isn't given
305
if pool_type == 'erasure':
306
ret = handle_erasure_pool(request=req, service=svc)
308
ret = handle_replicated_pool(request=req, service=svc)
310
elif op == "create-cache-tier":
311
ret = handle_create_cache_tier(request=req, service=svc)
312
elif op == "remove-cache-tier":
313
ret = handle_remove_cache_tier(request=req, service=svc)
314
elif op == "create-erasure-profile":
315
ret = handle_create_erasure_profile(request=req, service=svc)
316
elif op == "delete-pool":
317
pool = req.get('name')
318
ret = delete_pool(service=svc, name=pool)
319
elif op == "rename-pool":
320
old_name = req.get('name')
321
new_name = req.get('new-name')
322
ret = rename_pool(service=svc, old_name=old_name,
324
elif op == "snapshot-pool":
325
pool = req.get('name')
326
snapshot_name = req.get('snapshot-name')
327
ret = snapshot_pool(service=svc, pool_name=pool,
328
snapshot_name=snapshot_name)
329
elif op == "remove-pool-snapshot":
330
pool = req.get('name')
331
snapshot_name = req.get('snapshot-name')
332
ret = remove_pool_snapshot(service=svc, pool_name=pool,
333
snapshot_name=snapshot_name)
334
elif op == "set-pool-value":
335
ret = handle_set_pool_value(request=req, service=svc)
337
msg = "Unknown operation '%s'" % op
338
log(msg, level=ERROR)
339
return {'exit-code': 1, 'stderr': msg}
341
if type(ret) == dict and 'exit-code' in ret:
344
return {'exit-code': 0}