~james-page/charms/trusty/swift-proxy/lp1531102-trunk

« back to all changes in this revision

Viewing changes to charmhelpers/contrib/storage/linux/ceph.py

  • Committer: James Page
  • Date: 2015-10-22 13:24:57 UTC
  • Revision ID: james.page@ubuntu.com-20151022132457-4p14oifelnzjz5n3
Tags: 15.10
15.10 Charm release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2014-2015 Canonical Limited.
 
2
#
 
3
# This file is part of charm-helpers.
 
4
#
 
5
# charm-helpers is free software: you can redistribute it and/or modify
 
6
# it under the terms of the GNU Lesser General Public License version 3 as
 
7
# published by the Free Software Foundation.
 
8
#
 
9
# charm-helpers is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU Lesser General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU Lesser General Public License
 
15
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.
 
16
 
 
17
#
 
18
# Copyright 2012 Canonical Ltd.
 
19
#
 
20
# This file is sourced from lp:openstack-charm-helpers
 
21
#
 
22
# Authors:
 
23
#  James Page <james.page@ubuntu.com>
 
24
#  Adam Gandelman <adamg@ubuntu.com>
 
25
#
 
26
 
 
27
import os
 
28
import shutil
 
29
import json
 
30
import time
 
31
import uuid
 
32
 
 
33
from subprocess import (
 
34
    check_call,
 
35
    check_output,
 
36
    CalledProcessError,
 
37
)
 
38
from charmhelpers.core.hookenv import (
 
39
    local_unit,
 
40
    relation_get,
 
41
    relation_ids,
 
42
    relation_set,
 
43
    related_units,
 
44
    log,
 
45
    DEBUG,
 
46
    INFO,
 
47
    WARNING,
 
48
    ERROR,
 
49
)
 
50
from charmhelpers.core.host import (
 
51
    mount,
 
52
    mounts,
 
53
    service_start,
 
54
    service_stop,
 
55
    service_running,
 
56
    umount,
 
57
)
 
58
from charmhelpers.fetch import (
 
59
    apt_install,
 
60
)
 
61
 
 
62
from charmhelpers.core.kernel import modprobe
 
63
 
 
64
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
 
65
KEYFILE = '/etc/ceph/ceph.client.{}.key'
 
66
 
 
67
CEPH_CONF = """[global]
 
68
auth supported = {auth}
 
69
keyring = {keyring}
 
70
mon host = {mon_hosts}
 
71
log to syslog = {use_syslog}
 
72
err to syslog = {use_syslog}
 
73
clog to syslog = {use_syslog}
 
74
"""
 
75
 
 
76
 
 
77
def install():
 
78
    """Basic Ceph client installation."""
 
79
    ceph_dir = "/etc/ceph"
 
80
    if not os.path.exists(ceph_dir):
 
81
        os.mkdir(ceph_dir)
 
82
 
 
83
    apt_install('ceph-common', fatal=True)
 
84
 
 
85
 
 
86
def rbd_exists(service, pool, rbd_img):
 
87
    """Check to see if a RADOS block device exists."""
 
88
    try:
 
89
        out = check_output(['rbd', 'list', '--id',
 
90
                            service, '--pool', pool]).decode('UTF-8')
 
91
    except CalledProcessError:
 
92
        return False
 
93
 
 
94
    return rbd_img in out
 
95
 
 
96
 
 
97
def create_rbd_image(service, pool, image, sizemb):
 
98
    """Create a new RADOS block device."""
 
99
    cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
 
100
           '--pool', pool]
 
101
    check_call(cmd)
 
102
 
 
103
 
 
104
def pool_exists(service, name):
 
105
    """Check to see if a RADOS pool already exists."""
 
106
    try:
 
107
        out = check_output(['rados', '--id', service,
 
108
                            'lspools']).decode('UTF-8')
 
109
    except CalledProcessError:
 
110
        return False
 
111
 
 
112
    return name in out
 
113
 
 
114
 
 
115
def get_osds(service):
 
116
    """Return a list of all Ceph Object Storage Daemons currently in the
 
117
    cluster.
 
118
    """
 
119
    version = ceph_version()
 
120
    if version and version >= '0.56':
 
121
        return json.loads(check_output(['ceph', '--id', service,
 
122
                                        'osd', 'ls',
 
123
                                        '--format=json']).decode('UTF-8'))
 
124
 
 
125
    return None
 
126
 
 
127
 
 
128
def create_pool(service, name, replicas=3):
 
129
    """Create a new RADOS pool."""
 
130
    if pool_exists(service, name):
 
131
        log("Ceph pool {} already exists, skipping creation".format(name),
 
132
            level=WARNING)
 
133
        return
 
134
 
 
135
    # Calculate the number of placement groups based
 
136
    # on upstream recommended best practices.
 
137
    osds = get_osds(service)
 
138
    if osds:
 
139
        pgnum = (len(osds) * 100 // replicas)
 
140
    else:
 
141
        # NOTE(james-page): Default to 200 for older ceph versions
 
142
        # which don't support OSD query from cli
 
143
        pgnum = 200
 
144
 
 
145
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
 
146
    check_call(cmd)
 
147
 
 
148
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
 
149
           str(replicas)]
 
150
    check_call(cmd)
 
151
 
 
152
 
 
153
def delete_pool(service, name):
 
154
    """Delete a RADOS pool from ceph."""
 
155
    cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
 
156
           '--yes-i-really-really-mean-it']
 
157
    check_call(cmd)
 
158
 
 
159
 
 
160
def _keyfile_path(service):
 
161
    return KEYFILE.format(service)
 
162
 
 
163
 
 
164
def _keyring_path(service):
 
165
    return KEYRING.format(service)
 
166
 
 
167
 
 
168
def create_keyring(service, key):
 
169
    """Create a new Ceph keyring containing key."""
 
170
    keyring = _keyring_path(service)
 
171
    if os.path.exists(keyring):
 
172
        log('Ceph keyring exists at %s.' % keyring, level=WARNING)
 
173
        return
 
174
 
 
175
    cmd = ['ceph-authtool', keyring, '--create-keyring',
 
176
           '--name=client.{}'.format(service), '--add-key={}'.format(key)]
 
177
    check_call(cmd)
 
178
    log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
 
179
 
 
180
 
 
181
def delete_keyring(service):
 
182
    """Delete an existing Ceph keyring."""
 
183
    keyring = _keyring_path(service)
 
184
    if not os.path.exists(keyring):
 
185
        log('Keyring does not exist at %s' % keyring, level=WARNING)
 
186
        return
 
187
 
 
188
    os.remove(keyring)
 
189
    log('Deleted ring at %s.' % keyring, level=INFO)
 
190
 
 
191
 
 
192
def create_key_file(service, key):
 
193
    """Create a file containing key."""
 
194
    keyfile = _keyfile_path(service)
 
195
    if os.path.exists(keyfile):
 
196
        log('Keyfile exists at %s.' % keyfile, level=WARNING)
 
197
        return
 
198
 
 
199
    with open(keyfile, 'w') as fd:
 
200
        fd.write(key)
 
201
 
 
202
    log('Created new keyfile at %s.' % keyfile, level=INFO)
 
203
 
 
204
 
 
205
def get_ceph_nodes():
 
206
    """Query named relation 'ceph' to determine current nodes."""
 
207
    hosts = []
 
208
    for r_id in relation_ids('ceph'):
 
209
        for unit in related_units(r_id):
 
210
            hosts.append(relation_get('private-address', unit=unit, rid=r_id))
 
211
 
 
212
    return hosts
 
213
 
 
214
 
 
215
def configure(service, key, auth, use_syslog):
 
216
    """Perform basic configuration of Ceph."""
 
217
    create_keyring(service, key)
 
218
    create_key_file(service, key)
 
219
    hosts = get_ceph_nodes()
 
220
    with open('/etc/ceph/ceph.conf', 'w') as ceph_conf:
 
221
        ceph_conf.write(CEPH_CONF.format(auth=auth,
 
222
                                         keyring=_keyring_path(service),
 
223
                                         mon_hosts=",".join(map(str, hosts)),
 
224
                                         use_syslog=use_syslog))
 
225
    modprobe('rbd')
 
226
 
 
227
 
 
228
def image_mapped(name):
 
229
    """Determine whether a RADOS block device is mapped locally."""
 
230
    try:
 
231
        out = check_output(['rbd', 'showmapped']).decode('UTF-8')
 
232
    except CalledProcessError:
 
233
        return False
 
234
 
 
235
    return name in out
 
236
 
 
237
 
 
238
def map_block_storage(service, pool, image):
 
239
    """Map a RADOS block device for local use."""
 
240
    cmd = [
 
241
        'rbd',
 
242
        'map',
 
243
        '{}/{}'.format(pool, image),
 
244
        '--user',
 
245
        service,
 
246
        '--secret',
 
247
        _keyfile_path(service),
 
248
    ]
 
249
    check_call(cmd)
 
250
 
 
251
 
 
252
def filesystem_mounted(fs):
 
253
    """Determine whether a filesytems is already mounted."""
 
254
    return fs in [f for f, m in mounts()]
 
255
 
 
256
 
 
257
def make_filesystem(blk_device, fstype='ext4', timeout=10):
 
258
    """Make a new filesystem on the specified block device."""
 
259
    count = 0
 
260
    e_noent = os.errno.ENOENT
 
261
    while not os.path.exists(blk_device):
 
262
        if count >= timeout:
 
263
            log('Gave up waiting on block device %s' % blk_device,
 
264
                level=ERROR)
 
265
            raise IOError(e_noent, os.strerror(e_noent), blk_device)
 
266
 
 
267
        log('Waiting for block device %s to appear' % blk_device,
 
268
            level=DEBUG)
 
269
        count += 1
 
270
        time.sleep(1)
 
271
    else:
 
272
        log('Formatting block device %s as filesystem %s.' %
 
273
            (blk_device, fstype), level=INFO)
 
274
        check_call(['mkfs', '-t', fstype, blk_device])
 
275
 
 
276
 
 
277
def place_data_on_block_device(blk_device, data_src_dst):
 
278
    """Migrate data in data_src_dst to blk_device and then remount."""
 
279
    # mount block device into /mnt
 
280
    mount(blk_device, '/mnt')
 
281
    # copy data to /mnt
 
282
    copy_files(data_src_dst, '/mnt')
 
283
    # umount block device
 
284
    umount('/mnt')
 
285
    # Grab user/group ID's from original source
 
286
    _dir = os.stat(data_src_dst)
 
287
    uid = _dir.st_uid
 
288
    gid = _dir.st_gid
 
289
    # re-mount where the data should originally be
 
290
    # TODO: persist is currently a NO-OP in core.host
 
291
    mount(blk_device, data_src_dst, persist=True)
 
292
    # ensure original ownership of new mount.
 
293
    os.chown(data_src_dst, uid, gid)
 
294
 
 
295
 
 
296
def copy_files(src, dst, symlinks=False, ignore=None):
 
297
    """Copy files from src to dst."""
 
298
    for item in os.listdir(src):
 
299
        s = os.path.join(src, item)
 
300
        d = os.path.join(dst, item)
 
301
        if os.path.isdir(s):
 
302
            shutil.copytree(s, d, symlinks, ignore)
 
303
        else:
 
304
            shutil.copy2(s, d)
 
305
 
 
306
 
 
307
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
 
308
                        blk_device, fstype, system_services=[],
 
309
                        replicas=3):
 
310
    """NOTE: This function must only be called from a single service unit for
 
311
    the same rbd_img otherwise data loss will occur.
 
312
 
 
313
    Ensures given pool and RBD image exists, is mapped to a block device,
 
314
    and the device is formatted and mounted at the given mount_point.
 
315
 
 
316
    If formatting a device for the first time, data existing at mount_point
 
317
    will be migrated to the RBD device before being re-mounted.
 
318
 
 
319
    All services listed in system_services will be stopped prior to data
 
320
    migration and restarted when complete.
 
321
    """
 
322
    # Ensure pool, RBD image, RBD mappings are in place.
 
323
    if not pool_exists(service, pool):
 
324
        log('Creating new pool {}.'.format(pool), level=INFO)
 
325
        create_pool(service, pool, replicas=replicas)
 
326
 
 
327
    if not rbd_exists(service, pool, rbd_img):
 
328
        log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
 
329
        create_rbd_image(service, pool, rbd_img, sizemb)
 
330
 
 
331
    if not image_mapped(rbd_img):
 
332
        log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
 
333
            level=INFO)
 
334
        map_block_storage(service, pool, rbd_img)
 
335
 
 
336
    # make file system
 
337
    # TODO: What happens if for whatever reason this is run again and
 
338
    # the data is already in the rbd device and/or is mounted??
 
339
    # When it is mounted already, it will fail to make the fs
 
340
    # XXX: This is really sketchy!  Need to at least add an fstab entry
 
341
    #      otherwise this hook will blow away existing data if its executed
 
342
    #      after a reboot.
 
343
    if not filesystem_mounted(mount_point):
 
344
        make_filesystem(blk_device, fstype)
 
345
 
 
346
        for svc in system_services:
 
347
            if service_running(svc):
 
348
                log('Stopping services {} prior to migrating data.'
 
349
                    .format(svc), level=DEBUG)
 
350
                service_stop(svc)
 
351
 
 
352
        place_data_on_block_device(blk_device, mount_point)
 
353
 
 
354
        for svc in system_services:
 
355
            log('Starting service {} after migrating data.'
 
356
                .format(svc), level=DEBUG)
 
357
            service_start(svc)
 
358
 
 
359
 
 
360
def ensure_ceph_keyring(service, user=None, group=None):
 
361
    """Ensures a ceph keyring is created for a named service and optionally
 
362
    ensures user and group ownership.
 
363
 
 
364
    Returns False if no ceph key is available in relation state.
 
365
    """
 
366
    key = None
 
367
    for rid in relation_ids('ceph'):
 
368
        for unit in related_units(rid):
 
369
            key = relation_get('key', rid=rid, unit=unit)
 
370
            if key:
 
371
                break
 
372
 
 
373
    if not key:
 
374
        return False
 
375
 
 
376
    create_keyring(service=service, key=key)
 
377
    keyring = _keyring_path(service)
 
378
    if user and group:
 
379
        check_call(['chown', '%s.%s' % (user, group), keyring])
 
380
 
 
381
    return True
 
382
 
 
383
 
 
384
def ceph_version():
 
385
    """Retrieve the local version of ceph."""
 
386
    if os.path.exists('/usr/bin/ceph'):
 
387
        cmd = ['ceph', '-v']
 
388
        output = check_output(cmd).decode('US-ASCII')
 
389
        output = output.split()
 
390
        if len(output) > 3:
 
391
            return output[2]
 
392
        else:
 
393
            return None
 
394
    else:
 
395
        return None
 
396
 
 
397
 
 
398
class CephBrokerRq(object):
 
399
    """Ceph broker request.
 
400
 
 
401
    Multiple operations can be added to a request and sent to the Ceph broker
 
402
    to be executed.
 
403
 
 
404
    Request is json-encoded for sending over the wire.
 
405
 
 
406
    The API is versioned and defaults to version 1.
 
407
    """
 
408
    def __init__(self, api_version=1, request_id=None):
 
409
        self.api_version = api_version
 
410
        if request_id:
 
411
            self.request_id = request_id
 
412
        else:
 
413
            self.request_id = str(uuid.uuid1())
 
414
        self.ops = []
 
415
 
 
416
    def add_op_create_pool(self, name, replica_count=3):
 
417
        self.ops.append({'op': 'create-pool', 'name': name,
 
418
                         'replicas': replica_count})
 
419
 
 
420
    def set_ops(self, ops):
 
421
        """Set request ops to provided value.
 
422
 
 
423
        Useful for injecting ops that come from a previous request
 
424
        to allow comparisons to ensure validity.
 
425
        """
 
426
        self.ops = ops
 
427
 
 
428
    @property
 
429
    def request(self):
 
430
        return json.dumps({'api-version': self.api_version, 'ops': self.ops,
 
431
                           'request-id': self.request_id})
 
432
 
 
433
    def _ops_equal(self, other):
 
434
        if len(self.ops) == len(other.ops):
 
435
            for req_no in range(0, len(self.ops)):
 
436
                for key in ['replicas', 'name', 'op']:
 
437
                    if self.ops[req_no][key] != other.ops[req_no][key]:
 
438
                        return False
 
439
        else:
 
440
            return False
 
441
        return True
 
442
 
 
443
    def __eq__(self, other):
 
444
        if not isinstance(other, self.__class__):
 
445
            return False
 
446
        if self.api_version == other.api_version and \
 
447
                self._ops_equal(other):
 
448
            return True
 
449
        else:
 
450
            return False
 
451
 
 
452
    def __ne__(self, other):
 
453
        return not self.__eq__(other)
 
454
 
 
455
 
 
456
class CephBrokerRsp(object):
 
457
    """Ceph broker response.
 
458
 
 
459
    Response is json-decoded and contents provided as methods/properties.
 
460
 
 
461
    The API is versioned and defaults to version 1.
 
462
    """
 
463
 
 
464
    def __init__(self, encoded_rsp):
 
465
        self.api_version = None
 
466
        self.rsp = json.loads(encoded_rsp)
 
467
 
 
468
    @property
 
469
    def request_id(self):
 
470
        return self.rsp.get('request-id')
 
471
 
 
472
    @property
 
473
    def exit_code(self):
 
474
        return self.rsp.get('exit-code')
 
475
 
 
476
    @property
 
477
    def exit_msg(self):
 
478
        return self.rsp.get('stderr')
 
479
 
 
480
 
 
481
# Ceph Broker Conversation:
 
482
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
 
483
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
 
484
# unique id so that the client can identity which CephBrokerRsp is associated
 
485
# with the request. Ceph will also respond to each client unit individually
 
486
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
 
487
# via key broker-rsp-glance-0
 
488
#
 
489
# To use this the charm can just do something like:
 
490
#
 
491
# from charmhelpers.contrib.storage.linux.ceph import (
 
492
#     send_request_if_needed,
 
493
#     is_request_complete,
 
494
#     CephBrokerRq,
 
495
# )
 
496
#
 
497
# @hooks.hook('ceph-relation-changed')
 
498
# def ceph_changed():
 
499
#     rq = CephBrokerRq()
 
500
#     rq.add_op_create_pool(name='poolname', replica_count=3)
 
501
#
 
502
#     if is_request_complete(rq):
 
503
#         <Request complete actions>
 
504
#     else:
 
505
#         send_request_if_needed(get_ceph_request())
 
506
#
 
507
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
 
508
# of glance having sent a request to ceph which ceph has successfully processed
 
509
#  'ceph:8': {
 
510
#      'ceph/0': {
 
511
#          'auth': 'cephx',
 
512
#          'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
 
513
#          'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
 
514
#          'ceph-public-address': '10.5.44.103',
 
515
#          'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
 
516
#          'private-address': '10.5.44.103',
 
517
#      },
 
518
#      'glance/0': {
 
519
#          'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
 
520
#                         '"ops": [{"replicas": 3, "name": "glance", '
 
521
#                         '"op": "create-pool"}]}'),
 
522
#          'private-address': '10.5.44.109',
 
523
#      },
 
524
#  }
 
525
 
 
526
def get_previous_request(rid):
 
527
    """Return the last ceph broker request sent on a given relation
 
528
 
 
529
    @param rid: Relation id to query for request
 
530
    """
 
531
    request = None
 
532
    broker_req = relation_get(attribute='broker_req', rid=rid,
 
533
                              unit=local_unit())
 
534
    if broker_req:
 
535
        request_data = json.loads(broker_req)
 
536
        request = CephBrokerRq(api_version=request_data['api-version'],
 
537
                               request_id=request_data['request-id'])
 
538
        request.set_ops(request_data['ops'])
 
539
 
 
540
    return request
 
541
 
 
542
 
 
543
def get_request_states(request):
 
544
    """Return a dict of requests per relation id with their corresponding
 
545
       completion state.
 
546
 
 
547
    This allows a charm, which has a request for ceph, to see whether there is
 
548
    an equivalent request already being processed and if so what state that
 
549
    request is in.
 
550
 
 
551
    @param request: A CephBrokerRq object
 
552
    """
 
553
    complete = []
 
554
    requests = {}
 
555
    for rid in relation_ids('ceph'):
 
556
        complete = False
 
557
        previous_request = get_previous_request(rid)
 
558
        if request == previous_request:
 
559
            sent = True
 
560
            complete = is_request_complete_for_rid(previous_request, rid)
 
561
        else:
 
562
            sent = False
 
563
            complete = False
 
564
 
 
565
        requests[rid] = {
 
566
            'sent': sent,
 
567
            'complete': complete,
 
568
        }
 
569
 
 
570
    return requests
 
571
 
 
572
 
 
573
def is_request_sent(request):
 
574
    """Check to see if a functionally equivalent request has already been sent
 
575
 
 
576
    Returns True if a similair request has been sent
 
577
 
 
578
    @param request: A CephBrokerRq object
 
579
    """
 
580
    states = get_request_states(request)
 
581
    for rid in states.keys():
 
582
        if not states[rid]['sent']:
 
583
            return False
 
584
 
 
585
    return True
 
586
 
 
587
 
 
588
def is_request_complete(request):
 
589
    """Check to see if a functionally equivalent request has already been
 
590
    completed
 
591
 
 
592
    Returns True if a similair request has been completed
 
593
 
 
594
    @param request: A CephBrokerRq object
 
595
    """
 
596
    states = get_request_states(request)
 
597
    for rid in states.keys():
 
598
        if not states[rid]['complete']:
 
599
            return False
 
600
 
 
601
    return True
 
602
 
 
603
 
 
604
def is_request_complete_for_rid(request, rid):
 
605
    """Check if a given request has been completed on the given relation
 
606
 
 
607
    @param request: A CephBrokerRq object
 
608
    @param rid: Relation ID
 
609
    """
 
610
    broker_key = get_broker_rsp_key()
 
611
    for unit in related_units(rid):
 
612
        rdata = relation_get(rid=rid, unit=unit)
 
613
        if rdata.get(broker_key):
 
614
            rsp = CephBrokerRsp(rdata.get(broker_key))
 
615
            if rsp.request_id == request.request_id:
 
616
                if not rsp.exit_code:
 
617
                    return True
 
618
        else:
 
619
            # The remote unit sent no reply targeted at this unit so either the
 
620
            # remote ceph cluster does not support unit targeted replies or it
 
621
            # has not processed our request yet.
 
622
            if rdata.get('broker_rsp'):
 
623
                request_data = json.loads(rdata['broker_rsp'])
 
624
                if request_data.get('request-id'):
 
625
                    log('Ignoring legacy broker_rsp without unit key as remote '
 
626
                        'service supports unit specific replies', level=DEBUG)
 
627
                else:
 
628
                    log('Using legacy broker_rsp as remote service does not '
 
629
                        'supports unit specific replies', level=DEBUG)
 
630
                    rsp = CephBrokerRsp(rdata['broker_rsp'])
 
631
                    if not rsp.exit_code:
 
632
                        return True
 
633
 
 
634
    return False
 
635
 
 
636
 
 
637
def get_broker_rsp_key():
 
638
    """Return broker response key for this unit
 
639
 
 
640
    This is the key that ceph is going to use to pass request status
 
641
    information back to this unit
 
642
    """
 
643
    return 'broker-rsp-' + local_unit().replace('/', '-')
 
644
 
 
645
 
 
646
def send_request_if_needed(request):
 
647
    """Send broker request if an equivalent request has not already been sent
 
648
 
 
649
    @param request: A CephBrokerRq object
 
650
    """
 
651
    if is_request_sent(request):
 
652
        log('Request already sent but not complete, not sending new request',
 
653
            level=DEBUG)
 
654
    else:
 
655
        for rid in relation_ids('ceph'):
 
656
            log('Sending request {}'.format(request.request_id), level=DEBUG)
 
657
            relation_set(relation_id=rid, broker_req=request.request)