~openstack-charmers/charms/trusty/ceph-mon/next

« back to all changes in this revision

Viewing changes to lib/ceph/ceph/ceph_broker.py

  • Committer: Gerrit Code Review
  • Author(s): Jenkins
  • Date: 2016-08-04 17:46:50 UTC
  • mfrom: (183.1.1 trunk)
  • Revision ID: review@openstack.org-20160804174650-v1n3kie2ag313uah
Merge "Migrate to shared lib"

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
#
 
3
# Copyright 2016 Canonical Ltd
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License");
 
6
# you may not use this file except in compliance with the License.
 
7
# You may obtain a copy of the License at
 
8
#
 
9
#  http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS,
 
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
# See the License for the specific language governing permissions and
 
15
# limitations under the License.
 
16
 
 
17
import json
 
18
 
 
19
from charmhelpers.core.hookenv import (
 
20
    log,
 
21
    DEBUG,
 
22
    INFO,
 
23
    ERROR,
 
24
)
 
25
from charmhelpers.contrib.storage.linux.ceph  import (
 
26
    create_erasure_profile,
 
27
    delete_pool,
 
28
    erasure_profile_exists,
 
29
    pool_exists,
 
30
    pool_set,
 
31
    remove_pool_snapshot,
 
32
    rename_pool,
 
33
    set_pool_quota,
 
34
    snapshot_pool,
 
35
    validator,
 
36
    ErasurePool,
 
37
    Pool,
 
38
    ReplicatedPool,
 
39
)
 
40
 
 
41
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
 
42
# This should do a decent job of preventing people from passing in bad values.
 
43
# It will give a useful error message
 
44
POOL_KEYS = {
 
45
    # "Ceph Key Name": [Python type, [Valid Range]]
 
46
    "size": [int],
 
47
    "min_size": [int],
 
48
    "crash_replay_interval": [int],
 
49
    "pgp_num": [int],  # = or < pg_num
 
50
    "crush_ruleset": [int],
 
51
    "hashpspool": [bool],
 
52
    "nodelete": [bool],
 
53
    "nopgchange": [bool],
 
54
    "nosizechange": [bool],
 
55
    "write_fadvise_dontneed": [bool],
 
56
    "noscrub": [bool],
 
57
    "nodeep-scrub": [bool],
 
58
    "hit_set_type": [basestring, ["bloom", "explicit_hash",
 
59
                                  "explicit_object"]],
 
60
    "hit_set_count": [int, [1, 1]],
 
61
    "hit_set_period": [int],
 
62
    "hit_set_fpp": [float, [0.0, 1.0]],
 
63
    "cache_target_dirty_ratio": [float],
 
64
    "cache_target_dirty_high_ratio": [float],
 
65
    "cache_target_full_ratio": [float],
 
66
    "target_max_bytes": [int],
 
67
    "target_max_objects": [int],
 
68
    "cache_min_flush_age": [int],
 
69
    "cache_min_evict_age": [int],
 
70
    "fast_read": [bool],
 
71
}
 
72
 
 
73
CEPH_BUCKET_TYPES = [
 
74
    'osd',
 
75
    'host',
 
76
    'chassis',
 
77
    'rack',
 
78
    'row',
 
79
    'pdu',
 
80
    'pod',
 
81
    'room',
 
82
    'datacenter',
 
83
    'region',
 
84
    'root'
 
85
]
 
86
 
 
87
 
 
88
def decode_req_encode_rsp(f):
 
89
    """Decorator to decode incoming requests and encode responses."""
 
90
 
 
91
    def decode_inner(req):
 
92
        return json.dumps(f(json.loads(req)))
 
93
 
 
94
    return decode_inner
 
95
 
 
96
 
 
97
@decode_req_encode_rsp
 
98
def process_requests(reqs):
 
99
    """Process Ceph broker request(s).
 
100
 
 
101
    This is a versioned api. API version must be supplied by the client making
 
102
    the request.
 
103
    """
 
104
    request_id = reqs.get('request-id')
 
105
    try:
 
106
        version = reqs.get('api-version')
 
107
        if version == 1:
 
108
            log('Processing request {}'.format(request_id), level=DEBUG)
 
109
            resp = process_requests_v1(reqs['ops'])
 
110
            if request_id:
 
111
                resp['request-id'] = request_id
 
112
 
 
113
            return resp
 
114
 
 
115
    except Exception as exc:
 
116
        log(str(exc), level=ERROR)
 
117
        msg = ("Unexpected error occurred while processing requests: %s" %
 
118
               reqs)
 
119
        log(msg, level=ERROR)
 
120
        return {'exit-code': 1, 'stderr': msg}
 
121
 
 
122
    msg = ("Missing or invalid api version (%s)" % version)
 
123
    resp = {'exit-code': 1, 'stderr': msg}
 
124
    if request_id:
 
125
        resp['request-id'] = request_id
 
126
 
 
127
    return resp
 
128
 
 
129
 
 
130
def handle_create_erasure_profile(request, service):
 
131
    # "local" | "shec" or it defaults to "jerasure"
 
132
    erasure_type = request.get('erasure-type')
 
133
    # "host" | "rack" or it defaults to "host"  # Any valid Ceph bucket
 
134
    failure_domain = request.get('failure-domain')
 
135
    name = request.get('name')
 
136
    k = request.get('k')
 
137
    m = request.get('m')
 
138
    l = request.get('l')
 
139
 
 
140
    if failure_domain not in CEPH_BUCKET_TYPES:
 
141
        msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
 
142
        log(msg, level=ERROR)
 
143
        return {'exit-code': 1, 'stderr': msg}
 
144
 
 
145
    create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
 
146
                           profile_name=name, failure_domain=failure_domain,
 
147
                           data_chunks=k, coding_chunks=m, locality=l)
 
148
 
 
149
 
 
150
def handle_erasure_pool(request, service):
 
151
    pool_name = request.get('name')
 
152
    erasure_profile = request.get('erasure-profile')
 
153
    quota = request.get('max-bytes')
 
154
    weight = request.get('weight')
 
155
 
 
156
    if erasure_profile is None:
 
157
        erasure_profile = "default-canonical"
 
158
 
 
159
    # Check for missing params
 
160
    if pool_name is None:
 
161
        msg = "Missing parameter. name is required for the pool"
 
162
        log(msg, level=ERROR)
 
163
        return {'exit-code': 1, 'stderr': msg}
 
164
 
 
165
    # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
 
166
    if not erasure_profile_exists(service=service, name=erasure_profile):
 
167
        # TODO: Fail and tell them to create the profile or default
 
168
        msg = ("erasure-profile {} does not exist.  Please create it with: "
 
169
               "create-erasure-profile".format(erasure_profile))
 
170
        log(msg, level=ERROR)
 
171
        return {'exit-code': 1, 'stderr': msg}
 
172
 
 
173
    pool = ErasurePool(service=service, name=pool_name,
 
174
                       erasure_code_profile=erasure_profile,
 
175
                       percent_data=weight)
 
176
    # Ok make the erasure pool
 
177
    if not pool_exists(service=service, name=pool_name):
 
178
        log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
 
179
                                                         erasure_profile),
 
180
            level=INFO)
 
181
        pool.create()
 
182
 
 
183
    # Set a quota if requested
 
184
    if quota is not None:
 
185
        set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
 
186
 
 
187
 
 
188
def handle_replicated_pool(request, service):
 
189
    pool_name = request.get('name')
 
190
    replicas = request.get('replicas')
 
191
    quota = request.get('max-bytes')
 
192
    weight = request.get('weight')
 
193
    pg_num = request.get('pg_num')
 
194
 
 
195
    # Check for missing params
 
196
    if pool_name is None or replicas is None:
 
197
        msg = "Missing parameter. name and replicas are required"
 
198
        log(msg, level=ERROR)
 
199
        return {'exit-code': 1, 'stderr': msg}
 
200
 
 
201
    kwargs = {}
 
202
    if pg_num:
 
203
        kwargs['pg_num'] = pg_num
 
204
    if weight:
 
205
        kwargs['percent_data'] = weight
 
206
    if replicas:
 
207
        kwargs['replicas'] = replicas
 
208
 
 
209
    pool = ReplicatedPool(service=service,
 
210
                          name=pool_name, **kwargs)
 
211
    if not pool_exists(service=service, name=pool_name):
 
212
        log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
 
213
            level=INFO)
 
214
        pool.create()
 
215
    else:
 
216
        log("Pool '%s' already exists - skipping create" % pool.name,
 
217
            level=DEBUG)
 
218
 
 
219
    # Set a quota if requested
 
220
    if quota is not None:
 
221
        set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
 
222
 
 
223
 
 
224
def handle_create_cache_tier(request, service):
 
225
    # mode = "writeback" | "readonly"
 
226
    storage_pool = request.get('cold-pool')
 
227
    cache_pool = request.get('hot-pool')
 
228
    cache_mode = request.get('mode')
 
229
 
 
230
    if cache_mode is None:
 
231
        cache_mode = "writeback"
 
232
 
 
233
    # cache and storage pool must exist first
 
234
    if not pool_exists(service=service, name=storage_pool) or not pool_exists(
 
235
            service=service, name=cache_pool):
 
236
        msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
 
237
               "them first".format(storage_pool, cache_pool))
 
238
        log(msg, level=ERROR)
 
239
        return {'exit-code': 1, 'stderr': msg}
 
240
 
 
241
    p = Pool(service=service, name=storage_pool)
 
242
    p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
 
243
 
 
244
 
 
245
def handle_remove_cache_tier(request, service):
 
246
    storage_pool = request.get('cold-pool')
 
247
    cache_pool = request.get('hot-pool')
 
248
    # cache and storage pool must exist first
 
249
    if not pool_exists(service=service, name=storage_pool) or not pool_exists(
 
250
            service=service, name=cache_pool):
 
251
        msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
 
252
               "deleting cache tier".format(storage_pool, cache_pool))
 
253
        log(msg, level=ERROR)
 
254
        return {'exit-code': 1, 'stderr': msg}
 
255
 
 
256
    pool = Pool(name=storage_pool, service=service)
 
257
    pool.remove_cache_tier(cache_pool=cache_pool)
 
258
 
 
259
 
 
260
def handle_set_pool_value(request, service):
 
261
    # Set arbitrary pool values
 
262
    params = {'pool': request.get('name'),
 
263
              'key': request.get('key'),
 
264
              'value': request.get('value')}
 
265
    if params['key'] not in POOL_KEYS:
 
266
        msg = "Invalid key '%s'" % params['key']
 
267
        log(msg, level=ERROR)
 
268
        return {'exit-code': 1, 'stderr': msg}
 
269
 
 
270
    # Get the validation method
 
271
    validator_params = POOL_KEYS[params['key']]
 
272
    if len(validator_params) is 1:
 
273
        # Validate that what the user passed is actually legal per Ceph's rules
 
274
        validator(params['value'], validator_params[0])
 
275
    else:
 
276
        # Validate that what the user passed is actually legal per Ceph's rules
 
277
        validator(params['value'], validator_params[0], validator_params[1])
 
278
 
 
279
    # Set the value
 
280
    pool_set(service=service, pool_name=params['pool'], key=params['key'],
 
281
             value=params['value'])
 
282
 
 
283
 
 
284
def process_requests_v1(reqs):
 
285
    """Process v1 requests.
 
286
 
 
287
    Takes a list of requests (dicts) and processes each one. If an error is
 
288
    found, processing stops and the client is notified in the response.
 
289
 
 
290
    Returns a response dict containing the exit code (non-zero if any
 
291
    operation failed along with an explanation).
 
292
    """
 
293
    ret = None
 
294
    log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
 
295
    for req in reqs:
 
296
        op = req.get('op')
 
297
        log("Processing op='%s'" % op, level=DEBUG)
 
298
        # Use admin client since we do not have other client key locations
 
299
        # setup to use them for these operations.
 
300
        svc = 'admin'
 
301
        if op == "create-pool":
 
302
            pool_type = req.get('pool-type')  # "replicated" | "erasure"
 
303
 
 
304
            # Default to replicated if pool_type isn't given
 
305
            if pool_type == 'erasure':
 
306
                ret = handle_erasure_pool(request=req, service=svc)
 
307
            else:
 
308
                ret = handle_replicated_pool(request=req, service=svc)
 
309
 
 
310
        elif op == "create-cache-tier":
 
311
            ret = handle_create_cache_tier(request=req, service=svc)
 
312
        elif op == "remove-cache-tier":
 
313
            ret = handle_remove_cache_tier(request=req, service=svc)
 
314
        elif op == "create-erasure-profile":
 
315
            ret = handle_create_erasure_profile(request=req, service=svc)
 
316
        elif op == "delete-pool":
 
317
            pool = req.get('name')
 
318
            ret = delete_pool(service=svc, name=pool)
 
319
        elif op == "rename-pool":
 
320
            old_name = req.get('name')
 
321
            new_name = req.get('new-name')
 
322
            ret = rename_pool(service=svc, old_name=old_name,
 
323
                              new_name=new_name)
 
324
        elif op == "snapshot-pool":
 
325
            pool = req.get('name')
 
326
            snapshot_name = req.get('snapshot-name')
 
327
            ret = snapshot_pool(service=svc, pool_name=pool,
 
328
                                snapshot_name=snapshot_name)
 
329
        elif op == "remove-pool-snapshot":
 
330
            pool = req.get('name')
 
331
            snapshot_name = req.get('snapshot-name')
 
332
            ret = remove_pool_snapshot(service=svc, pool_name=pool,
 
333
                                       snapshot_name=snapshot_name)
 
334
        elif op == "set-pool-value":
 
335
            ret = handle_set_pool_value(request=req, service=svc)
 
336
        else:
 
337
            msg = "Unknown operation '%s'" % op
 
338
            log(msg, level=ERROR)
 
339
            return {'exit-code': 1, 'stderr': msg}
 
340
 
 
341
    if type(ret) == dict and 'exit-code' in ret:
 
342
        return ret
 
343
 
 
344
    return {'exit-code': 0}