~ubuntu-branches/ubuntu/trusty/qemu/trusty

« back to all changes in this revision

Viewing changes to .pc/1.6.1.patch/block/rbd.c

  • Committer: Package Import Robot
  • Author(s): Serge Hallyn
  • Date: 2013-10-22 22:47:07 UTC
  • mfrom: (1.8.3) (10.1.42 sid)
  • Revision ID: package-import@ubuntu.com-20131022224707-1lya34fw3k3f24tv
Tags: 1.6.0+dfsg-2ubuntu1
* Merge 1.6.0~rc0+dfsg-2exp from debian experimental.  Remaining changes:
  - debian/control
    * update maintainer
    * remove libiscsi, usb-redir, vde, vnc-jpeg, and libssh2-1-dev
      from build-deps
    * enable rbd
    * add qemu-system and qemu-common B/R to qemu-keymaps
    * add D:udev, R:qemu, R:qemu-common and B:qemu-common to
      qemu-system-common
    * qemu-system-arm, qemu-system-ppc, qemu-system-sparc:
      - add qemu-kvm to Provides
      - add qemu-common, qemu-kvm, kvm to B/R
      - remove openbios-sparc from qemu-system-sparc D
      - drop openbios-ppc and openhackware Depends to Suggests (for now)
    * qemu-system-x86:
      - add qemu-common to Breaks/Replaces.
      - add cpu-checker to Recommends.
    * qemu-user: add B/R:qemu-kvm
    * qemu-kvm:
      - add armhf armel powerpc sparc to Architecture
      - C/R/P: qemu-kvm-spice
    * add qemu-common package
    * drop qemu-slof which is not packaged in ubuntu
  - add qemu-system-common.links for tap ifup/down scripts and OVMF link.
  - qemu-system-x86.links:
    * remove pxe rom links which are in kvm-ipxe
    * add symlink for kvm.1 manpage
  - debian/rules
    * add kvm-spice symlink to qemu-kvm
    * call dh_installmodules for qemu-system-x86
    * update dh_installinit to install upstart script
    * run dh_installman (Closes: #709241) (cherrypicked from 1.5.0+dfsg-2)
  - Add qemu-utils.links for kvm-* symlinks.
  - Add qemu-system-x86.qemu-kvm.upstart and .default
  - Add qemu-system-x86.modprobe to set nesting=1
  - Add qemu-system-common.preinst to add kvm group
  - qemu-system-common.postinst: remove bad group acl if there, then have
    udev relabel /dev/kvm.
  - New linaro patches from qemu-linaro rebasing branch
  - Dropped patches:
    * xen-simplify-xen_enabled.patch
    * sparc-linux-user-fix-missing-symbols-in-.rel-.rela.plt-sections.patch
    * main_loop-do-not-set-nonblocking-if-xen_enabled.patch
    * xen_machine_pv-do-not-create-a-dummy-CPU-in-machine-.patch
    * virtio-rng-fix-crash
  - Kept patches:
    * expose_vms_qemu64cpu.patch - updated
    * linaro arm patches from qemu-linaro rebasing branch
  - New patches:
    * fix-pci-add: change CONFIG variable in ifdef to make sure that
      pci_add is defined.
* Add linaro patches
* Add experimental mach-virt patches for arm virtualization.
* qemu-system-common.install: add debian/tmp/usr/lib to install the
  qemu-bridge-helper

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * QEMU Block driver for RADOS (Ceph)
 
3
 *
 
4
 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
 
5
 *                         Josh Durgin <josh.durgin@dreamhost.com>
 
6
 *
 
7
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 
8
 * the COPYING file in the top-level directory.
 
9
 *
 
10
 * Contributions after 2012-01-13 are licensed under the terms of the
 
11
 * GNU GPL, version 2 or (at your option) any later version.
 
12
 */
 
13
 
 
14
#include <inttypes.h>
 
15
 
 
16
#include "qemu-common.h"
 
17
#include "qemu/error-report.h"
 
18
#include "block/block_int.h"
 
19
 
 
20
#include <rbd/librbd.h>
 
21
 
 
22
/*
 
23
 * When specifying the image filename use:
 
24
 *
 
25
 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
 
26
 *
 
27
 * poolname must be the name of an existing rados pool.
 
28
 *
 
29
 * devicename is the name of the rbd image.
 
30
 *
 
31
 * Each option given is used to configure rados, and may be any valid
 
32
 * Ceph option, "id", or "conf".
 
33
 *
 
34
 * The "id" option indicates what user we should authenticate as to
 
35
 * the Ceph cluster.  If it is excluded we will use the Ceph default
 
36
 * (normally 'admin').
 
37
 *
 
38
 * The "conf" option specifies a Ceph configuration file to read.  If
 
39
 * it is not specified, we will read from the default Ceph locations
 
40
 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
 
41
 * file, specify conf=/dev/null.
 
42
 *
 
43
 * Configuration values containing :, @, or = can be escaped with a
 
44
 * leading "\".
 
45
 */
 
46
 
 
47
/* rbd_aio_discard added in 0.1.2 */
 
48
#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
 
49
#define LIBRBD_SUPPORTS_DISCARD
 
50
#else
 
51
#undef LIBRBD_SUPPORTS_DISCARD
 
52
#endif
 
53
 
 
54
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
55
 
 
56
#define RBD_MAX_CONF_NAME_SIZE 128
 
57
#define RBD_MAX_CONF_VAL_SIZE 512
 
58
#define RBD_MAX_CONF_SIZE 1024
 
59
#define RBD_MAX_POOL_NAME_SIZE 128
 
60
#define RBD_MAX_SNAP_NAME_SIZE 128
 
61
#define RBD_MAX_SNAPS 100
 
62
 
 
63
typedef enum {
 
64
    RBD_AIO_READ,
 
65
    RBD_AIO_WRITE,
 
66
    RBD_AIO_DISCARD,
 
67
    RBD_AIO_FLUSH
 
68
} RBDAIOCmd;
 
69
 
 
70
typedef struct RBDAIOCB {
 
71
    BlockDriverAIOCB common;
 
72
    QEMUBH *bh;
 
73
    int64_t ret;
 
74
    QEMUIOVector *qiov;
 
75
    char *bounce;
 
76
    RBDAIOCmd cmd;
 
77
    int64_t sector_num;
 
78
    int error;
 
79
    struct BDRVRBDState *s;
 
80
    int cancelled;
 
81
    int status;
 
82
} RBDAIOCB;
 
83
 
 
84
typedef struct RADOSCB {
 
85
    int rcbid;
 
86
    RBDAIOCB *acb;
 
87
    struct BDRVRBDState *s;
 
88
    int done;
 
89
    int64_t size;
 
90
    char *buf;
 
91
    int64_t ret;
 
92
} RADOSCB;
 
93
 
 
94
#define RBD_FD_READ 0
 
95
#define RBD_FD_WRITE 1
 
96
 
 
97
typedef struct BDRVRBDState {
 
98
    int fds[2];
 
99
    rados_t cluster;
 
100
    rados_ioctx_t io_ctx;
 
101
    rbd_image_t image;
 
102
    char name[RBD_MAX_IMAGE_NAME_SIZE];
 
103
    int qemu_aio_count;
 
104
    char *snap;
 
105
    int event_reader_pos;
 
106
    RADOSCB *event_rcb;
 
107
} BDRVRBDState;
 
108
 
 
109
static void rbd_aio_bh_cb(void *opaque);
 
110
 
 
111
static int qemu_rbd_next_tok(char *dst, int dst_len,
 
112
                             char *src, char delim,
 
113
                             const char *name,
 
114
                             char **p)
 
115
{
 
116
    int l;
 
117
    char *end;
 
118
 
 
119
    *p = NULL;
 
120
 
 
121
    if (delim != '\0') {
 
122
        for (end = src; *end; ++end) {
 
123
            if (*end == delim) {
 
124
                break;
 
125
            }
 
126
            if (*end == '\\' && end[1] != '\0') {
 
127
                end++;
 
128
            }
 
129
        }
 
130
        if (*end == delim) {
 
131
            *p = end + 1;
 
132
            *end = '\0';
 
133
        }
 
134
    }
 
135
    l = strlen(src);
 
136
    if (l >= dst_len) {
 
137
        error_report("%s too long", name);
 
138
        return -EINVAL;
 
139
    } else if (l == 0) {
 
140
        error_report("%s too short", name);
 
141
        return -EINVAL;
 
142
    }
 
143
 
 
144
    pstrcpy(dst, dst_len, src);
 
145
 
 
146
    return 0;
 
147
}
 
148
 
 
149
static void qemu_rbd_unescape(char *src)
 
150
{
 
151
    char *p;
 
152
 
 
153
    for (p = src; *src; ++src, ++p) {
 
154
        if (*src == '\\' && src[1] != '\0') {
 
155
            src++;
 
156
        }
 
157
        *p = *src;
 
158
    }
 
159
    *p = '\0';
 
160
}
 
161
 
 
162
static int qemu_rbd_parsename(const char *filename,
 
163
                              char *pool, int pool_len,
 
164
                              char *snap, int snap_len,
 
165
                              char *name, int name_len,
 
166
                              char *conf, int conf_len)
 
167
{
 
168
    const char *start;
 
169
    char *p, *buf;
 
170
    int ret;
 
171
 
 
172
    if (!strstart(filename, "rbd:", &start)) {
 
173
        return -EINVAL;
 
174
    }
 
175
 
 
176
    buf = g_strdup(start);
 
177
    p = buf;
 
178
    *snap = '\0';
 
179
    *conf = '\0';
 
180
 
 
181
    ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
 
182
    if (ret < 0 || !p) {
 
183
        ret = -EINVAL;
 
184
        goto done;
 
185
    }
 
186
    qemu_rbd_unescape(pool);
 
187
 
 
188
    if (strchr(p, '@')) {
 
189
        ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
 
190
        if (ret < 0) {
 
191
            goto done;
 
192
        }
 
193
        ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
 
194
        qemu_rbd_unescape(snap);
 
195
    } else {
 
196
        ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
 
197
    }
 
198
    qemu_rbd_unescape(name);
 
199
    if (ret < 0 || !p) {
 
200
        goto done;
 
201
    }
 
202
 
 
203
    ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
 
204
 
 
205
done:
 
206
    g_free(buf);
 
207
    return ret;
 
208
}
 
209
 
 
210
static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
 
211
{
 
212
    const char *p = conf;
 
213
 
 
214
    while (*p) {
 
215
        int len;
 
216
        const char *end = strchr(p, ':');
 
217
 
 
218
        if (end) {
 
219
            len = end - p;
 
220
        } else {
 
221
            len = strlen(p);
 
222
        }
 
223
 
 
224
        if (strncmp(p, "id=", 3) == 0) {
 
225
            len -= 3;
 
226
            strncpy(clientname, p + 3, len);
 
227
            clientname[len] = '\0';
 
228
            return clientname;
 
229
        }
 
230
        if (end == NULL) {
 
231
            break;
 
232
        }
 
233
        p = end + 1;
 
234
    }
 
235
    return NULL;
 
236
}
 
237
 
 
238
static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
 
239
{
 
240
    char *p, *buf;
 
241
    char name[RBD_MAX_CONF_NAME_SIZE];
 
242
    char value[RBD_MAX_CONF_VAL_SIZE];
 
243
    int ret = 0;
 
244
 
 
245
    buf = g_strdup(conf);
 
246
    p = buf;
 
247
 
 
248
    while (p) {
 
249
        ret = qemu_rbd_next_tok(name, sizeof(name), p,
 
250
                                '=', "conf option name", &p);
 
251
        if (ret < 0) {
 
252
            break;
 
253
        }
 
254
        qemu_rbd_unescape(name);
 
255
 
 
256
        if (!p) {
 
257
            error_report("conf option %s has no value", name);
 
258
            ret = -EINVAL;
 
259
            break;
 
260
        }
 
261
 
 
262
        ret = qemu_rbd_next_tok(value, sizeof(value), p,
 
263
                                ':', "conf option value", &p);
 
264
        if (ret < 0) {
 
265
            break;
 
266
        }
 
267
        qemu_rbd_unescape(value);
 
268
 
 
269
        if (strcmp(name, "conf") == 0) {
 
270
            ret = rados_conf_read_file(cluster, value);
 
271
            if (ret < 0) {
 
272
                error_report("error reading conf file %s", value);
 
273
                break;
 
274
            }
 
275
        } else if (strcmp(name, "id") == 0) {
 
276
            /* ignore, this is parsed by qemu_rbd_parse_clientname() */
 
277
        } else {
 
278
            ret = rados_conf_set(cluster, name, value);
 
279
            if (ret < 0) {
 
280
                error_report("invalid conf option %s", name);
 
281
                ret = -EINVAL;
 
282
                break;
 
283
            }
 
284
        }
 
285
    }
 
286
 
 
287
    g_free(buf);
 
288
    return ret;
 
289
}
 
290
 
 
291
static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
 
292
{
 
293
    int64_t bytes = 0;
 
294
    int64_t objsize;
 
295
    int obj_order = 0;
 
296
    char pool[RBD_MAX_POOL_NAME_SIZE];
 
297
    char name[RBD_MAX_IMAGE_NAME_SIZE];
 
298
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 
299
    char conf[RBD_MAX_CONF_SIZE];
 
300
    char clientname_buf[RBD_MAX_CONF_SIZE];
 
301
    char *clientname;
 
302
    rados_t cluster;
 
303
    rados_ioctx_t io_ctx;
 
304
    int ret;
 
305
 
 
306
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 
307
                           snap_buf, sizeof(snap_buf),
 
308
                           name, sizeof(name),
 
309
                           conf, sizeof(conf)) < 0) {
 
310
        return -EINVAL;
 
311
    }
 
312
 
 
313
    /* Read out options */
 
314
    while (options && options->name) {
 
315
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
 
316
            bytes = options->value.n;
 
317
        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
 
318
            if (options->value.n) {
 
319
                objsize = options->value.n;
 
320
                if ((objsize - 1) & objsize) {    /* not a power of 2? */
 
321
                    error_report("obj size needs to be power of 2");
 
322
                    return -EINVAL;
 
323
                }
 
324
                if (objsize < 4096) {
 
325
                    error_report("obj size too small");
 
326
                    return -EINVAL;
 
327
                }
 
328
                obj_order = ffs(objsize) - 1;
 
329
            }
 
330
        }
 
331
        options++;
 
332
    }
 
333
 
 
334
    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 
335
    if (rados_create(&cluster, clientname) < 0) {
 
336
        error_report("error initializing");
 
337
        return -EIO;
 
338
    }
 
339
 
 
340
    if (strstr(conf, "conf=") == NULL) {
 
341
        /* try default location, but ignore failure */
 
342
        rados_conf_read_file(cluster, NULL);
 
343
    }
 
344
 
 
345
    if (conf[0] != '\0' &&
 
346
        qemu_rbd_set_conf(cluster, conf) < 0) {
 
347
        error_report("error setting config options");
 
348
        rados_shutdown(cluster);
 
349
        return -EIO;
 
350
    }
 
351
 
 
352
    if (rados_connect(cluster) < 0) {
 
353
        error_report("error connecting");
 
354
        rados_shutdown(cluster);
 
355
        return -EIO;
 
356
    }
 
357
 
 
358
    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
 
359
        error_report("error opening pool %s", pool);
 
360
        rados_shutdown(cluster);
 
361
        return -EIO;
 
362
    }
 
363
 
 
364
    ret = rbd_create(io_ctx, name, bytes, &obj_order);
 
365
    rados_ioctx_destroy(io_ctx);
 
366
    rados_shutdown(cluster);
 
367
 
 
368
    return ret;
 
369
}
 
370
 
 
371
/*
 
372
 * This aio completion is being called from qemu_rbd_aio_event_reader()
 
373
 * and runs in qemu context. It schedules a bh, but just in case the aio
 
374
 * was not cancelled before.
 
375
 */
 
376
static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
377
{
 
378
    RBDAIOCB *acb = rcb->acb;
 
379
    int64_t r;
 
380
 
 
381
    r = rcb->ret;
 
382
 
 
383
    if (acb->cmd != RBD_AIO_READ) {
 
384
        if (r < 0) {
 
385
            acb->ret = r;
 
386
            acb->error = 1;
 
387
        } else if (!acb->error) {
 
388
            acb->ret = rcb->size;
 
389
        }
 
390
    } else {
 
391
        if (r < 0) {
 
392
            memset(rcb->buf, 0, rcb->size);
 
393
            acb->ret = r;
 
394
            acb->error = 1;
 
395
        } else if (r < rcb->size) {
 
396
            memset(rcb->buf + r, 0, rcb->size - r);
 
397
            if (!acb->error) {
 
398
                acb->ret = rcb->size;
 
399
            }
 
400
        } else if (!acb->error) {
 
401
            acb->ret = r;
 
402
        }
 
403
    }
 
404
    /* Note that acb->bh can be NULL in case where the aio was cancelled */
 
405
    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
 
406
    qemu_bh_schedule(acb->bh);
 
407
    g_free(rcb);
 
408
}
 
409
 
 
410
/*
 
411
 * aio fd read handler. It runs in the qemu context and calls the
 
412
 * completion handling of completed rados aio operations.
 
413
 */
 
414
static void qemu_rbd_aio_event_reader(void *opaque)
 
415
{
 
416
    BDRVRBDState *s = opaque;
 
417
 
 
418
    ssize_t ret;
 
419
 
 
420
    do {
 
421
        char *p = (char *)&s->event_rcb;
 
422
 
 
423
        /* now read the rcb pointer that was sent from a non qemu thread */
 
424
        ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
 
425
                   sizeof(s->event_rcb) - s->event_reader_pos);
 
426
        if (ret > 0) {
 
427
            s->event_reader_pos += ret;
 
428
            if (s->event_reader_pos == sizeof(s->event_rcb)) {
 
429
                s->event_reader_pos = 0;
 
430
                qemu_rbd_complete_aio(s->event_rcb);
 
431
                s->qemu_aio_count--;
 
432
            }
 
433
        }
 
434
    } while (ret < 0 && errno == EINTR);
 
435
}
 
436
 
 
437
static int qemu_rbd_aio_flush_cb(void *opaque)
 
438
{
 
439
    BDRVRBDState *s = opaque;
 
440
 
 
441
    return (s->qemu_aio_count > 0);
 
442
}
 
443
 
 
444
/* TODO Convert to fine grained options */
 
445
static QemuOptsList runtime_opts = {
 
446
    .name = "rbd",
 
447
    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
 
448
    .desc = {
 
449
        {
 
450
            .name = "filename",
 
451
            .type = QEMU_OPT_STRING,
 
452
            .help = "Specification of the rbd image",
 
453
        },
 
454
        { /* end of list */ }
 
455
    },
 
456
};
 
457
 
 
458
static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags)
 
459
{
 
460
    BDRVRBDState *s = bs->opaque;
 
461
    char pool[RBD_MAX_POOL_NAME_SIZE];
 
462
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
 
463
    char conf[RBD_MAX_CONF_SIZE];
 
464
    char clientname_buf[RBD_MAX_CONF_SIZE];
 
465
    char *clientname;
 
466
    QemuOpts *opts;
 
467
    Error *local_err = NULL;
 
468
    const char *filename;
 
469
    int r;
 
470
 
 
471
    opts = qemu_opts_create_nofail(&runtime_opts);
 
472
    qemu_opts_absorb_qdict(opts, options, &local_err);
 
473
    if (error_is_set(&local_err)) {
 
474
        qerror_report_err(local_err);
 
475
        error_free(local_err);
 
476
        qemu_opts_del(opts);
 
477
        return -EINVAL;
 
478
    }
 
479
 
 
480
    filename = qemu_opt_get(opts, "filename");
 
481
 
 
482
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
 
483
                           snap_buf, sizeof(snap_buf),
 
484
                           s->name, sizeof(s->name),
 
485
                           conf, sizeof(conf)) < 0) {
 
486
        r = -EINVAL;
 
487
        goto failed_opts;
 
488
    }
 
489
 
 
490
    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 
491
    r = rados_create(&s->cluster, clientname);
 
492
    if (r < 0) {
 
493
        error_report("error initializing");
 
494
        goto failed_opts;
 
495
    }
 
496
 
 
497
    s->snap = NULL;
 
498
    if (snap_buf[0] != '\0') {
 
499
        s->snap = g_strdup(snap_buf);
 
500
    }
 
501
 
 
502
    /*
 
503
     * Fallback to more conservative semantics if setting cache
 
504
     * options fails. Ignore errors from setting rbd_cache because the
 
505
     * only possible error is that the option does not exist, and
 
506
     * librbd defaults to no caching. If write through caching cannot
 
507
     * be set up, fall back to no caching.
 
508
     */
 
509
    if (flags & BDRV_O_NOCACHE) {
 
510
        rados_conf_set(s->cluster, "rbd_cache", "false");
 
511
    } else {
 
512
        rados_conf_set(s->cluster, "rbd_cache", "true");
 
513
    }
 
514
 
 
515
    if (strstr(conf, "conf=") == NULL) {
 
516
        /* try default location, but ignore failure */
 
517
        rados_conf_read_file(s->cluster, NULL);
 
518
    }
 
519
 
 
520
    if (conf[0] != '\0') {
 
521
        r = qemu_rbd_set_conf(s->cluster, conf);
 
522
        if (r < 0) {
 
523
            error_report("error setting config options");
 
524
            goto failed_shutdown;
 
525
        }
 
526
    }
 
527
 
 
528
    r = rados_connect(s->cluster);
 
529
    if (r < 0) {
 
530
        error_report("error connecting");
 
531
        goto failed_shutdown;
 
532
    }
 
533
 
 
534
    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
 
535
    if (r < 0) {
 
536
        error_report("error opening pool %s", pool);
 
537
        goto failed_shutdown;
 
538
    }
 
539
 
 
540
    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
 
541
    if (r < 0) {
 
542
        error_report("error reading header from %s", s->name);
 
543
        goto failed_open;
 
544
    }
 
545
 
 
546
    bs->read_only = (s->snap != NULL);
 
547
 
 
548
    s->event_reader_pos = 0;
 
549
    r = qemu_pipe(s->fds);
 
550
    if (r < 0) {
 
551
        error_report("error opening eventfd");
 
552
        goto failed;
 
553
    }
 
554
    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
 
555
    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
 
556
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
 
557
                            NULL, qemu_rbd_aio_flush_cb, s);
 
558
 
 
559
 
 
560
    qemu_opts_del(opts);
 
561
    return 0;
 
562
 
 
563
failed:
 
564
    rbd_close(s->image);
 
565
failed_open:
 
566
    rados_ioctx_destroy(s->io_ctx);
 
567
failed_shutdown:
 
568
    rados_shutdown(s->cluster);
 
569
    g_free(s->snap);
 
570
failed_opts:
 
571
    qemu_opts_del(opts);
 
572
    return r;
 
573
}
 
574
 
 
575
static void qemu_rbd_close(BlockDriverState *bs)
 
576
{
 
577
    BDRVRBDState *s = bs->opaque;
 
578
 
 
579
    close(s->fds[0]);
 
580
    close(s->fds[1]);
 
581
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
 
582
 
 
583
    rbd_close(s->image);
 
584
    rados_ioctx_destroy(s->io_ctx);
 
585
    g_free(s->snap);
 
586
    rados_shutdown(s->cluster);
 
587
}
 
588
 
 
589
/*
 
590
 * Cancel aio. Since we don't reference acb in a non qemu threads,
 
591
 * it is safe to access it here.
 
592
 */
 
593
static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
 
594
{
 
595
    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
 
596
    acb->cancelled = 1;
 
597
 
 
598
    while (acb->status == -EINPROGRESS) {
 
599
        qemu_aio_wait();
 
600
    }
 
601
 
 
602
    qemu_aio_release(acb);
 
603
}
 
604
 
 
605
static const AIOCBInfo rbd_aiocb_info = {
 
606
    .aiocb_size = sizeof(RBDAIOCB),
 
607
    .cancel = qemu_rbd_aio_cancel,
 
608
};
 
609
 
 
610
static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
 
611
{
 
612
    int ret = 0;
 
613
    while (1) {
 
614
        fd_set wfd;
 
615
        int fd = s->fds[RBD_FD_WRITE];
 
616
 
 
617
        /* send the op pointer to the qemu thread that is responsible
 
618
           for the aio/op completion. Must do it in a qemu thread context */
 
619
        ret = write(fd, (void *)&rcb, sizeof(rcb));
 
620
        if (ret >= 0) {
 
621
            break;
 
622
        }
 
623
        if (errno == EINTR) {
 
624
            continue;
 
625
        }
 
626
        if (errno != EAGAIN) {
 
627
            break;
 
628
        }
 
629
 
 
630
        FD_ZERO(&wfd);
 
631
        FD_SET(fd, &wfd);
 
632
        do {
 
633
            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
 
634
        } while (ret < 0 && errno == EINTR);
 
635
    }
 
636
 
 
637
    return ret;
 
638
}
 
639
 
 
640
/*
 
641
 * This is the callback function for rbd_aio_read and _write
 
642
 *
 
643
 * Note: this function is being called from a non qemu thread so
 
644
 * we need to be careful about what we do here. Generally we only
 
645
 * write to the block notification pipe, and do the rest of the
 
646
 * io completion handling from qemu_rbd_aio_event_reader() which
 
647
 * runs in a qemu context.
 
648
 */
 
649
static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 
650
{
 
651
    int ret;
 
652
    rcb->ret = rbd_aio_get_return_value(c);
 
653
    rbd_aio_release(c);
 
654
    ret = qemu_rbd_send_pipe(rcb->s, rcb);
 
655
    if (ret < 0) {
 
656
        error_report("failed writing to acb->s->fds");
 
657
        g_free(rcb);
 
658
    }
 
659
}
 
660
 
 
661
/* Callback when all queued rbd_aio requests are complete */
 
662
 
 
663
static void rbd_aio_bh_cb(void *opaque)
 
664
{
 
665
    RBDAIOCB *acb = opaque;
 
666
 
 
667
    if (acb->cmd == RBD_AIO_READ) {
 
668
        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 
669
    }
 
670
    qemu_vfree(acb->bounce);
 
671
    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
672
    qemu_bh_delete(acb->bh);
 
673
    acb->bh = NULL;
 
674
    acb->status = 0;
 
675
 
 
676
    if (!acb->cancelled) {
 
677
        qemu_aio_release(acb);
 
678
    }
 
679
}
 
680
 
 
681
static int rbd_aio_discard_wrapper(rbd_image_t image,
 
682
                                   uint64_t off,
 
683
                                   uint64_t len,
 
684
                                   rbd_completion_t comp)
 
685
{
 
686
#ifdef LIBRBD_SUPPORTS_DISCARD
 
687
    return rbd_aio_discard(image, off, len, comp);
 
688
#else
 
689
    return -ENOTSUP;
 
690
#endif
 
691
}
 
692
 
 
693
static int rbd_aio_flush_wrapper(rbd_image_t image,
 
694
                                 rbd_completion_t comp)
 
695
{
 
696
#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 
697
    return rbd_aio_flush(image, comp);
 
698
#else
 
699
    return -ENOTSUP;
 
700
#endif
 
701
}
 
702
 
 
703
static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
 
704
                                       int64_t sector_num,
 
705
                                       QEMUIOVector *qiov,
 
706
                                       int nb_sectors,
 
707
                                       BlockDriverCompletionFunc *cb,
 
708
                                       void *opaque,
 
709
                                       RBDAIOCmd cmd)
 
710
{
 
711
    RBDAIOCB *acb;
 
712
    RADOSCB *rcb;
 
713
    rbd_completion_t c;
 
714
    int64_t off, size;
 
715
    char *buf;
 
716
    int r;
 
717
 
 
718
    BDRVRBDState *s = bs->opaque;
 
719
 
 
720
    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
 
721
    acb->cmd = cmd;
 
722
    acb->qiov = qiov;
 
723
    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 
724
        acb->bounce = NULL;
 
725
    } else {
 
726
        acb->bounce = qemu_blockalign(bs, qiov->size);
 
727
    }
 
728
    acb->ret = 0;
 
729
    acb->error = 0;
 
730
    acb->s = s;
 
731
    acb->cancelled = 0;
 
732
    acb->bh = NULL;
 
733
    acb->status = -EINPROGRESS;
 
734
 
 
735
    if (cmd == RBD_AIO_WRITE) {
 
736
        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 
737
    }
 
738
 
 
739
    buf = acb->bounce;
 
740
 
 
741
    off = sector_num * BDRV_SECTOR_SIZE;
 
742
    size = nb_sectors * BDRV_SECTOR_SIZE;
 
743
 
 
744
    s->qemu_aio_count++; /* All the RADOSCB */
 
745
 
 
746
    rcb = g_malloc(sizeof(RADOSCB));
 
747
    rcb->done = 0;
 
748
    rcb->acb = acb;
 
749
    rcb->buf = buf;
 
750
    rcb->s = acb->s;
 
751
    rcb->size = size;
 
752
    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
 
753
    if (r < 0) {
 
754
        goto failed;
 
755
    }
 
756
 
 
757
    switch (cmd) {
 
758
    case RBD_AIO_WRITE:
 
759
        r = rbd_aio_write(s->image, off, size, buf, c);
 
760
        break;
 
761
    case RBD_AIO_READ:
 
762
        r = rbd_aio_read(s->image, off, size, buf, c);
 
763
        break;
 
764
    case RBD_AIO_DISCARD:
 
765
        r = rbd_aio_discard_wrapper(s->image, off, size, c);
 
766
        break;
 
767
    case RBD_AIO_FLUSH:
 
768
        r = rbd_aio_flush_wrapper(s->image, c);
 
769
        break;
 
770
    default:
 
771
        r = -EINVAL;
 
772
    }
 
773
 
 
774
    if (r < 0) {
 
775
        goto failed;
 
776
    }
 
777
 
 
778
    return &acb->common;
 
779
 
 
780
failed:
 
781
    g_free(rcb);
 
782
    s->qemu_aio_count--;
 
783
    qemu_aio_release(acb);
 
784
    return NULL;
 
785
}
 
786
 
 
787
static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
 
788
                                            int64_t sector_num,
 
789
                                            QEMUIOVector *qiov,
 
790
                                            int nb_sectors,
 
791
                                            BlockDriverCompletionFunc *cb,
 
792
                                            void *opaque)
 
793
{
 
794
    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
 
795
                         RBD_AIO_READ);
 
796
}
 
797
 
 
798
static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
 
799
                                             int64_t sector_num,
 
800
                                             QEMUIOVector *qiov,
 
801
                                             int nb_sectors,
 
802
                                             BlockDriverCompletionFunc *cb,
 
803
                                             void *opaque)
 
804
{
 
805
    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
 
806
                         RBD_AIO_WRITE);
 
807
}
 
808
 
 
809
#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 
810
static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
 
811
                                            BlockDriverCompletionFunc *cb,
 
812
                                            void *opaque)
 
813
{
 
814
    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
 
815
}
 
816
 
 
817
#else
 
818
 
 
819
static int qemu_rbd_co_flush(BlockDriverState *bs)
 
820
{
 
821
#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
 
822
    /* rbd_flush added in 0.1.1 */
 
823
    BDRVRBDState *s = bs->opaque;
 
824
    return rbd_flush(s->image);
 
825
#else
 
826
    return 0;
 
827
#endif
 
828
}
 
829
#endif
 
830
 
 
831
static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 
832
{
 
833
    BDRVRBDState *s = bs->opaque;
 
834
    rbd_image_info_t info;
 
835
    int r;
 
836
 
 
837
    r = rbd_stat(s->image, &info, sizeof(info));
 
838
    if (r < 0) {
 
839
        return r;
 
840
    }
 
841
 
 
842
    bdi->cluster_size = info.obj_size;
 
843
    return 0;
 
844
}
 
845
 
 
846
static int64_t qemu_rbd_getlength(BlockDriverState *bs)
 
847
{
 
848
    BDRVRBDState *s = bs->opaque;
 
849
    rbd_image_info_t info;
 
850
    int r;
 
851
 
 
852
    r = rbd_stat(s->image, &info, sizeof(info));
 
853
    if (r < 0) {
 
854
        return r;
 
855
    }
 
856
 
 
857
    return info.size;
 
858
}
 
859
 
 
860
static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
 
861
{
 
862
    BDRVRBDState *s = bs->opaque;
 
863
    int r;
 
864
 
 
865
    r = rbd_resize(s->image, offset);
 
866
    if (r < 0) {
 
867
        return r;
 
868
    }
 
869
 
 
870
    return 0;
 
871
}
 
872
 
 
873
static int qemu_rbd_snap_create(BlockDriverState *bs,
 
874
                                QEMUSnapshotInfo *sn_info)
 
875
{
 
876
    BDRVRBDState *s = bs->opaque;
 
877
    int r;
 
878
 
 
879
    if (sn_info->name[0] == '\0') {
 
880
        return -EINVAL; /* we need a name for rbd snapshots */
 
881
    }
 
882
 
 
883
    /*
 
884
     * rbd snapshots are using the name as the user controlled unique identifier
 
885
     * we can't use the rbd snapid for that purpose, as it can't be set
 
886
     */
 
887
    if (sn_info->id_str[0] != '\0' &&
 
888
        strcmp(sn_info->id_str, sn_info->name) != 0) {
 
889
        return -EINVAL;
 
890
    }
 
891
 
 
892
    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
 
893
        return -ERANGE;
 
894
    }
 
895
 
 
896
    r = rbd_snap_create(s->image, sn_info->name);
 
897
    if (r < 0) {
 
898
        error_report("failed to create snap: %s", strerror(-r));
 
899
        return r;
 
900
    }
 
901
 
 
902
    return 0;
 
903
}
 
904
 
 
905
static int qemu_rbd_snap_remove(BlockDriverState *bs,
 
906
                                const char *snapshot_name)
 
907
{
 
908
    BDRVRBDState *s = bs->opaque;
 
909
    int r;
 
910
 
 
911
    r = rbd_snap_remove(s->image, snapshot_name);
 
912
    return r;
 
913
}
 
914
 
 
915
static int qemu_rbd_snap_rollback(BlockDriverState *bs,
 
916
                                  const char *snapshot_name)
 
917
{
 
918
    BDRVRBDState *s = bs->opaque;
 
919
    int r;
 
920
 
 
921
    r = rbd_snap_rollback(s->image, snapshot_name);
 
922
    return r;
 
923
}
 
924
 
 
925
static int qemu_rbd_snap_list(BlockDriverState *bs,
 
926
                              QEMUSnapshotInfo **psn_tab)
 
927
{
 
928
    BDRVRBDState *s = bs->opaque;
 
929
    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
 
930
    int i, snap_count;
 
931
    rbd_snap_info_t *snaps;
 
932
    int max_snaps = RBD_MAX_SNAPS;
 
933
 
 
934
    do {
 
935
        snaps = g_malloc(sizeof(*snaps) * max_snaps);
 
936
        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
 
937
        if (snap_count < 0) {
 
938
            g_free(snaps);
 
939
        }
 
940
    } while (snap_count == -ERANGE);
 
941
 
 
942
    if (snap_count <= 0) {
 
943
        goto done;
 
944
    }
 
945
 
 
946
    sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
 
947
 
 
948
    for (i = 0; i < snap_count; i++) {
 
949
        const char *snap_name = snaps[i].name;
 
950
 
 
951
        sn_info = sn_tab + i;
 
952
        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
 
953
        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
 
954
 
 
955
        sn_info->vm_state_size = snaps[i].size;
 
956
        sn_info->date_sec = 0;
 
957
        sn_info->date_nsec = 0;
 
958
        sn_info->vm_clock_nsec = 0;
 
959
    }
 
960
    rbd_snap_list_end(snaps);
 
961
 
 
962
 done:
 
963
    *psn_tab = sn_tab;
 
964
    return snap_count;
 
965
}
 
966
 
 
967
#ifdef LIBRBD_SUPPORTS_DISCARD
 
968
static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
 
969
                                              int64_t sector_num,
 
970
                                              int nb_sectors,
 
971
                                              BlockDriverCompletionFunc *cb,
 
972
                                              void *opaque)
 
973
{
 
974
    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
 
975
                         RBD_AIO_DISCARD);
 
976
}
 
977
#endif
 
978
 
 
979
static QEMUOptionParameter qemu_rbd_create_options[] = {
 
980
    {
 
981
     .name = BLOCK_OPT_SIZE,
 
982
     .type = OPT_SIZE,
 
983
     .help = "Virtual disk size"
 
984
    },
 
985
    {
 
986
     .name = BLOCK_OPT_CLUSTER_SIZE,
 
987
     .type = OPT_SIZE,
 
988
     .help = "RBD object size"
 
989
    },
 
990
    {NULL}
 
991
};
 
992
 
 
993
static BlockDriver bdrv_rbd = {
 
994
    .format_name        = "rbd",
 
995
    .instance_size      = sizeof(BDRVRBDState),
 
996
    .bdrv_file_open     = qemu_rbd_open,
 
997
    .bdrv_close         = qemu_rbd_close,
 
998
    .bdrv_create        = qemu_rbd_create,
 
999
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
 
1000
    .bdrv_get_info      = qemu_rbd_getinfo,
 
1001
    .create_options     = qemu_rbd_create_options,
 
1002
    .bdrv_getlength     = qemu_rbd_getlength,
 
1003
    .bdrv_truncate      = qemu_rbd_truncate,
 
1004
    .protocol_name      = "rbd",
 
1005
 
 
1006
    .bdrv_aio_readv         = qemu_rbd_aio_readv,
 
1007
    .bdrv_aio_writev        = qemu_rbd_aio_writev,
 
1008
 
 
1009
#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
 
1010
    .bdrv_aio_flush         = qemu_rbd_aio_flush,
 
1011
#else
 
1012
    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
 
1013
#endif
 
1014
 
 
1015
#ifdef LIBRBD_SUPPORTS_DISCARD
 
1016
    .bdrv_aio_discard       = qemu_rbd_aio_discard,
 
1017
#endif
 
1018
 
 
1019
    .bdrv_snapshot_create   = qemu_rbd_snap_create,
 
1020
    .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
 
1021
    .bdrv_snapshot_list     = qemu_rbd_snap_list,
 
1022
    .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
 
1023
};
 
1024
 
 
1025
static void bdrv_rbd_init(void)
 
1026
{
 
1027
    bdrv_register(&bdrv_rbd);
 
1028
}
 
1029
 
 
1030
block_init(bdrv_rbd_init);