~pmdj/ubuntu/trusty/qemu/2.9+applesmc+fadtv3

« back to all changes in this revision

Viewing changes to block/quorum.c

  • Committer: Phil Dennis-Jordan
  • Date: 2017-07-21 08:03:43 UTC
  • mfrom: (1.1.1)
  • Revision ID: phil@philjordan.eu-20170721080343-2yr2vdj7713czahv
New upstream release 2.9.0.

Show diffs side-by-side

added added

removed removed

Lines of Context:
97
97
 * $children_count QuorumChildRequest.
98
98
 */
99
99
typedef struct QuorumChildRequest {
100
 
    BlockAIOCB *aiocb;
 
100
    BlockDriverState *bs;
101
101
    QEMUIOVector qiov;
102
102
    uint8_t *buf;
103
103
    int ret;
110
110
 * used to do operations on each children and track overall progress.
111
111
 */
112
112
struct QuorumAIOCB {
113
 
    BlockAIOCB common;
 
113
    BlockDriverState *bs;
 
114
    Coroutine *co;
114
115
 
115
116
    /* Request metadata */
116
 
    uint64_t sector_num;
117
 
    int nb_sectors;
 
117
    uint64_t offset;
 
118
    uint64_t bytes;
118
119
 
119
120
    QEMUIOVector *qiov;         /* calling IOV */
120
121
 
133
134
    int children_read;          /* how many children have been read from */
134
135
};
135
136
 
136
 
static bool quorum_vote(QuorumAIOCB *acb);
137
 
 
138
 
static void quorum_aio_cancel(BlockAIOCB *blockacb)
139
 
{
140
 
    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
141
 
    BDRVQuorumState *s = acb->common.bs->opaque;
142
 
    int i;
143
 
 
144
 
    /* cancel all callbacks */
145
 
    for (i = 0; i < s->num_children; i++) {
146
 
        if (acb->qcrs[i].aiocb) {
147
 
            bdrv_aio_cancel_async(acb->qcrs[i].aiocb);
148
 
        }
149
 
    }
150
 
}
151
 
 
152
 
static AIOCBInfo quorum_aiocb_info = {
153
 
    .aiocb_size         = sizeof(QuorumAIOCB),
154
 
    .cancel_async       = quorum_aio_cancel,
155
 
};
 
137
typedef struct QuorumCo {
 
138
    QuorumAIOCB *acb;
 
139
    int idx;
 
140
} QuorumCo;
156
141
 
157
142
static void quorum_aio_finalize(QuorumAIOCB *acb)
158
143
{
159
 
    acb->common.cb(acb->common.opaque, acb->vote_ret);
160
144
    g_free(acb->qcrs);
161
 
    qemu_aio_unref(acb);
 
145
    g_free(acb);
162
146
}
163
147
 
164
148
static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
171
155
    return a->l == b->l;
172
156
}
173
157
 
174
 
static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
175
 
                                   BlockDriverState *bs,
 
158
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
176
159
                                   QEMUIOVector *qiov,
177
 
                                   uint64_t sector_num,
178
 
                                   int nb_sectors,
179
 
                                   BlockCompletionFunc *cb,
180
 
                                   void *opaque)
 
160
                                   uint64_t offset,
 
161
                                   uint64_t bytes)
181
162
{
182
 
    QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
 
163
    BDRVQuorumState *s = bs->opaque;
 
164
    QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
183
165
    int i;
184
166
 
185
 
    acb->common.bs->opaque = s;
186
 
    acb->sector_num = sector_num;
187
 
    acb->nb_sectors = nb_sectors;
188
 
    acb->qiov = qiov;
 
167
    *acb = (QuorumAIOCB) {
 
168
        .co                 = qemu_coroutine_self(),
 
169
        .bs                 = bs,
 
170
        .offset             = offset,
 
171
        .bytes              = bytes,
 
172
        .qiov               = qiov,
 
173
        .votes.compare      = quorum_sha256_compare,
 
174
        .votes.vote_list    = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
 
175
    };
 
176
 
189
177
    acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
190
 
    acb->count = 0;
191
 
    acb->success_count = 0;
192
 
    acb->rewrite_count = 0;
193
 
    acb->votes.compare = quorum_sha256_compare;
194
 
    QLIST_INIT(&acb->votes.vote_list);
195
 
    acb->is_read = false;
196
 
    acb->vote_ret = 0;
197
 
 
198
178
    for (i = 0; i < s->num_children; i++) {
199
179
        acb->qcrs[i].buf = NULL;
200
180
        acb->qcrs[i].ret = 0;
204
184
    return acb;
205
185
}
206
186
 
207
 
static void quorum_report_bad(QuorumOpType type, uint64_t sector_num,
208
 
                              int nb_sectors, char *node_name, int ret)
 
187
static void quorum_report_bad(QuorumOpType type, uint64_t offset,
 
188
                              uint64_t bytes, char *node_name, int ret)
209
189
{
210
190
    const char *msg = NULL;
 
191
    int64_t start_sector = offset / BDRV_SECTOR_SIZE;
 
192
    int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
 
193
 
211
194
    if (ret < 0) {
212
195
        msg = strerror(-ret);
213
196
    }
214
197
 
215
 
    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name,
216
 
                                      sector_num, nb_sectors, &error_abort);
 
198
    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
 
199
                                      end_sector - start_sector, &error_abort);
217
200
}
218
201
 
219
202
static void quorum_report_failure(QuorumAIOCB *acb)
220
203
{
221
 
    const char *reference = bdrv_get_device_or_node_name(acb->common.bs);
222
 
    qapi_event_send_quorum_failure(reference, acb->sector_num,
223
 
                                   acb->nb_sectors, &error_abort);
 
204
    const char *reference = bdrv_get_device_or_node_name(acb->bs);
 
205
    int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
 
206
    int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
 
207
                                      BDRV_SECTOR_SIZE);
 
208
 
 
209
    qapi_event_send_quorum_failure(reference, start_sector,
 
210
                                   end_sector - start_sector, &error_abort);
224
211
}
225
212
 
226
213
static int quorum_vote_error(QuorumAIOCB *acb);
227
214
 
228
215
static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
229
216
{
230
 
    BDRVQuorumState *s = acb->common.bs->opaque;
 
217
    BDRVQuorumState *s = acb->bs->opaque;
231
218
 
232
219
    if (acb->success_count < s->threshold) {
233
220
        acb->vote_ret = quorum_vote_error(acb);
238
225
    return false;
239
226
}
240
227
 
241
 
static void quorum_rewrite_aio_cb(void *opaque, int ret)
242
 
{
243
 
    QuorumAIOCB *acb = opaque;
244
 
 
245
 
    /* one less rewrite to do */
246
 
    acb->rewrite_count--;
247
 
 
248
 
    /* wait until all rewrite callbacks have completed */
249
 
    if (acb->rewrite_count) {
250
 
        return;
251
 
    }
252
 
 
253
 
    quorum_aio_finalize(acb);
254
 
}
255
 
 
256
 
static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb);
 
228
static int read_fifo_child(QuorumAIOCB *acb);
257
229
 
258
230
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
259
231
{
272
244
{
273
245
    QuorumAIOCB *acb = sacb->parent;
274
246
    QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
275
 
    quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
276
 
                      sacb->aiocb->bs->node_name, ret);
277
 
}
278
 
 
279
 
static void quorum_fifo_aio_cb(void *opaque, int ret)
280
 
{
281
 
    QuorumChildRequest *sacb = opaque;
282
 
    QuorumAIOCB *acb = sacb->parent;
283
 
    BDRVQuorumState *s = acb->common.bs->opaque;
284
 
 
285
 
    assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
286
 
 
287
 
    if (ret < 0) {
288
 
        quorum_report_bad_acb(sacb, ret);
289
 
 
290
 
        /* We try to read next child in FIFO order if we fail to read */
291
 
        if (acb->children_read < s->num_children) {
292
 
            read_fifo_child(acb);
293
 
            return;
294
 
        }
295
 
    }
296
 
 
297
 
    acb->vote_ret = ret;
298
 
 
299
 
    /* FIXME: rewrite failed children if acb->children_read > 1? */
300
 
    quorum_aio_finalize(acb);
301
 
}
302
 
 
303
 
static void quorum_aio_cb(void *opaque, int ret)
304
 
{
305
 
    QuorumChildRequest *sacb = opaque;
306
 
    QuorumAIOCB *acb = sacb->parent;
307
 
    BDRVQuorumState *s = acb->common.bs->opaque;
308
 
    bool rewrite = false;
309
 
    int i;
310
 
 
311
 
    sacb->ret = ret;
312
 
    if (ret == 0) {
313
 
        acb->success_count++;
314
 
    } else {
315
 
        quorum_report_bad_acb(sacb, ret);
316
 
    }
317
 
    acb->count++;
318
 
    assert(acb->count <= s->num_children);
319
 
    assert(acb->success_count <= s->num_children);
320
 
    if (acb->count < s->num_children) {
321
 
        return;
322
 
    }
323
 
 
324
 
    /* Do the vote on read */
325
 
    if (acb->is_read) {
326
 
        rewrite = quorum_vote(acb);
327
 
        for (i = 0; i < s->num_children; i++) {
328
 
            qemu_vfree(acb->qcrs[i].buf);
329
 
            qemu_iovec_destroy(&acb->qcrs[i].qiov);
330
 
        }
331
 
    } else {
332
 
        quorum_has_too_much_io_failed(acb);
333
 
    }
334
 
 
335
 
    /* if no rewrite is done the code will finish right away */
336
 
    if (!rewrite) {
337
 
        quorum_aio_finalize(acb);
338
 
    }
 
247
    quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
339
248
}
340
249
 
341
250
static void quorum_report_bad_versions(BDRVQuorumState *s,
350
259
            continue;
351
260
        }
352
261
        QLIST_FOREACH(item, &version->items, next) {
353
 
            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num,
354
 
                              acb->nb_sectors,
 
262
            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
355
263
                              s->children[item->index]->bs->node_name, 0);
356
264
        }
357
265
    }
358
266
}
359
267
 
360
 
static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
 
268
static void quorum_rewrite_entry(void *opaque)
 
269
{
 
270
    QuorumCo *co = opaque;
 
271
    QuorumAIOCB *acb = co->acb;
 
272
    BDRVQuorumState *s = acb->bs->opaque;
 
273
 
 
274
    /* Ignore any errors, it's just a correction attempt for already
 
275
     * corrupted data. */
 
276
    bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
 
277
                    acb->qiov, 0);
 
278
 
 
279
    /* Wake up the caller after the last rewrite */
 
280
    acb->rewrite_count--;
 
281
    if (!acb->rewrite_count) {
 
282
        qemu_coroutine_enter_if_inactive(acb->co);
 
283
    }
 
284
}
 
285
 
 
286
static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
361
287
                                        QuorumVoteValue *value)
362
288
{
363
289
    QuorumVoteVersion *version;
376
302
        }
377
303
    }
378
304
 
379
 
    /* quorum_rewrite_aio_cb will count down this to zero */
 
305
    /* quorum_rewrite_entry will count down this to zero */
380
306
    acb->rewrite_count = count;
381
307
 
382
308
    /* now fire the correcting rewrites */
385
311
            continue;
386
312
        }
387
313
        QLIST_FOREACH(item, &version->items, next) {
388
 
            bdrv_aio_writev(s->children[item->index], acb->sector_num,
389
 
                            acb->qiov, acb->nb_sectors, quorum_rewrite_aio_cb,
390
 
                            acb);
 
314
            Coroutine *co;
 
315
            QuorumCo data = {
 
316
                .acb = acb,
 
317
                .idx = item->index,
 
318
            };
 
319
 
 
320
            co = qemu_coroutine_create(quorum_rewrite_entry, &data);
 
321
            qemu_coroutine_enter(co);
391
322
        }
392
323
    }
393
324
 
507
438
    va_list ap;
508
439
 
509
440
    va_start(ap, fmt);
510
 
    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
511
 
            acb->sector_num, acb->nb_sectors);
 
441
    fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 " ",
 
442
            acb->offset, acb->bytes);
512
443
    vfprintf(stderr, fmt, ap);
513
444
    fprintf(stderr, "\n");
514
445
    va_end(ap);
519
450
                           QEMUIOVector *a,
520
451
                           QEMUIOVector *b)
521
452
{
522
 
    BDRVQuorumState *s = acb->common.bs->opaque;
 
453
    BDRVQuorumState *s = acb->bs->opaque;
523
454
    ssize_t offset;
524
455
 
525
456
    /* This driver will replace blkverify in this particular case */
526
457
    if (s->is_blkverify) {
527
458
        offset = qemu_iovec_compare(a, b);
528
459
        if (offset != -1) {
529
 
            quorum_err(acb, "contents mismatch in sector %" PRId64,
530
 
                       acb->sector_num +
531
 
                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
 
460
            quorum_err(acb, "contents mismatch at offset %" PRIu64,
 
461
                       acb->offset + offset);
532
462
        }
533
463
        return true;
534
464
    }
539
469
/* Do a vote to get the error code */
540
470
static int quorum_vote_error(QuorumAIOCB *acb)
541
471
{
542
 
    BDRVQuorumState *s = acb->common.bs->opaque;
 
472
    BDRVQuorumState *s = acb->bs->opaque;
543
473
    QuorumVoteVersion *winner = NULL;
544
474
    QuorumVotes error_votes;
545
475
    QuorumVoteValue result_value;
568
498
    return ret;
569
499
}
570
500
 
571
 
static bool quorum_vote(QuorumAIOCB *acb)
 
501
static void quorum_vote(QuorumAIOCB *acb)
572
502
{
573
503
    bool quorum = true;
574
 
    bool rewrite = false;
575
504
    int i, j, ret;
576
505
    QuorumVoteValue hash;
577
 
    BDRVQuorumState *s = acb->common.bs->opaque;
 
506
    BDRVQuorumState *s = acb->bs->opaque;
578
507
    QuorumVoteVersion *winner;
579
508
 
580
509
    if (quorum_has_too_much_io_failed(acb)) {
581
 
        return false;
 
510
        return;
582
511
    }
583
512
 
584
513
    /* get the index of the first successful read */
606
535
    /* Every successful read agrees */
607
536
    if (quorum) {
608
537
        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
609
 
        return false;
 
538
        return;
610
539
    }
611
540
 
612
541
    /* compute hashes for each successful read, also store indexes */
641
570
 
642
571
    /* corruption correction is enabled */
643
572
    if (s->rewrite_corrupted) {
644
 
        rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value);
 
573
        quorum_rewrite_bad_versions(acb, &winner->value);
645
574
    }
646
575
 
647
576
free_exit:
648
577
    /* free lists */
649
578
    quorum_free_vote_list(&acb->votes);
650
 
    return rewrite;
651
 
}
652
 
 
653
 
static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
654
 
{
655
 
    BDRVQuorumState *s = acb->common.bs->opaque;
656
 
    int i;
 
579
}
 
580
 
 
581
static void read_quorum_children_entry(void *opaque)
 
582
{
 
583
    QuorumCo *co = opaque;
 
584
    QuorumAIOCB *acb = co->acb;
 
585
    BDRVQuorumState *s = acb->bs->opaque;
 
586
    int i = co->idx;
 
587
    QuorumChildRequest *sacb = &acb->qcrs[i];
 
588
 
 
589
    sacb->bs = s->children[i]->bs;
 
590
    sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
 
591
                               &acb->qcrs[i].qiov, 0);
 
592
 
 
593
    if (sacb->ret == 0) {
 
594
        acb->success_count++;
 
595
    } else {
 
596
        quorum_report_bad_acb(sacb, sacb->ret);
 
597
    }
 
598
 
 
599
    acb->count++;
 
600
    assert(acb->count <= s->num_children);
 
601
    assert(acb->success_count <= s->num_children);
 
602
 
 
603
    /* Wake up the caller after the last read */
 
604
    if (acb->count == s->num_children) {
 
605
        qemu_coroutine_enter_if_inactive(acb->co);
 
606
    }
 
607
}
 
608
 
 
609
static int read_quorum_children(QuorumAIOCB *acb)
 
610
{
 
611
    BDRVQuorumState *s = acb->bs->opaque;
 
612
    int i, ret;
657
613
 
658
614
    acb->children_read = s->num_children;
659
615
    for (i = 0; i < s->num_children; i++) {
663
619
    }
664
620
 
665
621
    for (i = 0; i < s->num_children; i++) {
666
 
        acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num,
667
 
                                            &acb->qcrs[i].qiov, acb->nb_sectors,
668
 
                                            quorum_aio_cb, &acb->qcrs[i]);
669
 
    }
670
 
 
671
 
    return &acb->common;
 
622
        Coroutine *co;
 
623
        QuorumCo data = {
 
624
            .acb = acb,
 
625
            .idx = i,
 
626
        };
 
627
 
 
628
        co = qemu_coroutine_create(read_quorum_children_entry, &data);
 
629
        qemu_coroutine_enter(co);
 
630
    }
 
631
 
 
632
    while (acb->count < s->num_children) {
 
633
        qemu_coroutine_yield();
 
634
    }
 
635
 
 
636
    /* Do the vote on read */
 
637
    quorum_vote(acb);
 
638
    for (i = 0; i < s->num_children; i++) {
 
639
        qemu_vfree(acb->qcrs[i].buf);
 
640
        qemu_iovec_destroy(&acb->qcrs[i].qiov);
 
641
    }
 
642
 
 
643
    while (acb->rewrite_count) {
 
644
        qemu_coroutine_yield();
 
645
    }
 
646
 
 
647
    ret = acb->vote_ret;
 
648
 
 
649
    return ret;
672
650
}
673
651
 
674
 
static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
 
652
static int read_fifo_child(QuorumAIOCB *acb)
675
653
{
676
 
    BDRVQuorumState *s = acb->common.bs->opaque;
677
 
    int n = acb->children_read++;
678
 
 
679
 
    acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num,
680
 
                                        acb->qiov, acb->nb_sectors,
681
 
                                        quorum_fifo_aio_cb, &acb->qcrs[n]);
682
 
 
683
 
    return &acb->common;
 
654
    BDRVQuorumState *s = acb->bs->opaque;
 
655
    int n, ret;
 
656
 
 
657
    /* We try to read the next child in FIFO order if we failed to read */
 
658
    do {
 
659
        n = acb->children_read++;
 
660
        acb->qcrs[n].bs = s->children[n]->bs;
 
661
        ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
 
662
                             acb->qiov, 0);
 
663
        if (ret < 0) {
 
664
            quorum_report_bad_acb(&acb->qcrs[n], ret);
 
665
        }
 
666
    } while (ret < 0 && acb->children_read < s->num_children);
 
667
 
 
668
    /* FIXME: rewrite failed children if acb->children_read > 1? */
 
669
 
 
670
    return ret;
684
671
}
685
672
 
686
 
static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs,
687
 
                                    int64_t sector_num,
688
 
                                    QEMUIOVector *qiov,
689
 
                                    int nb_sectors,
690
 
                                    BlockCompletionFunc *cb,
691
 
                                    void *opaque)
 
673
static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
 
674
                            uint64_t bytes, QEMUIOVector *qiov, int flags)
692
675
{
693
676
    BDRVQuorumState *s = bs->opaque;
694
 
    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
695
 
                                      nb_sectors, cb, opaque);
 
677
    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
 
678
    int ret;
 
679
 
696
680
    acb->is_read = true;
697
681
    acb->children_read = 0;
698
682
 
699
683
    if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
700
 
        return read_quorum_children(acb);
701
 
    }
702
 
 
703
 
    return read_fifo_child(acb);
704
 
}
705
 
 
706
 
static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
707
 
                                     int64_t sector_num,
708
 
                                     QEMUIOVector *qiov,
709
 
                                     int nb_sectors,
710
 
                                     BlockCompletionFunc *cb,
711
 
                                     void *opaque)
 
684
        ret = read_quorum_children(acb);
 
685
    } else {
 
686
        ret = read_fifo_child(acb);
 
687
    }
 
688
    quorum_aio_finalize(acb);
 
689
 
 
690
    return ret;
 
691
}
 
692
 
 
693
static void write_quorum_entry(void *opaque)
 
694
{
 
695
    QuorumCo *co = opaque;
 
696
    QuorumAIOCB *acb = co->acb;
 
697
    BDRVQuorumState *s = acb->bs->opaque;
 
698
    int i = co->idx;
 
699
    QuorumChildRequest *sacb = &acb->qcrs[i];
 
700
 
 
701
    sacb->bs = s->children[i]->bs;
 
702
    sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
 
703
                                acb->qiov, 0);
 
704
    if (sacb->ret == 0) {
 
705
        acb->success_count++;
 
706
    } else {
 
707
        quorum_report_bad_acb(sacb, sacb->ret);
 
708
    }
 
709
    acb->count++;
 
710
    assert(acb->count <= s->num_children);
 
711
    assert(acb->success_count <= s->num_children);
 
712
 
 
713
    /* Wake up the caller after the last write */
 
714
    if (acb->count == s->num_children) {
 
715
        qemu_coroutine_enter_if_inactive(acb->co);
 
716
    }
 
717
}
 
718
 
 
719
static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
 
720
                             uint64_t bytes, QEMUIOVector *qiov, int flags)
712
721
{
713
722
    BDRVQuorumState *s = bs->opaque;
714
 
    QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
715
 
                                      cb, opaque);
716
 
    int i;
 
723
    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
 
724
    int i, ret;
717
725
 
718
726
    for (i = 0; i < s->num_children; i++) {
719
 
        acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num,
720
 
                                             qiov, nb_sectors, &quorum_aio_cb,
721
 
                                             &acb->qcrs[i]);
722
 
    }
723
 
 
724
 
    return &acb->common;
 
727
        Coroutine *co;
 
728
        QuorumCo data = {
 
729
            .acb = acb,
 
730
            .idx = i,
 
731
        };
 
732
 
 
733
        co = qemu_coroutine_create(write_quorum_entry, &data);
 
734
        qemu_coroutine_enter(co);
 
735
    }
 
736
 
 
737
    while (acb->count < s->num_children) {
 
738
        qemu_coroutine_yield();
 
739
    }
 
740
 
 
741
    quorum_has_too_much_io_failed(acb);
 
742
 
 
743
    ret = acb->vote_ret;
 
744
    quorum_aio_finalize(acb);
 
745
 
 
746
    return ret;
725
747
}
726
748
 
727
749
static int64_t quorum_getlength(BlockDriverState *bs)
765
787
        result = bdrv_co_flush(s->children[i]->bs);
766
788
        if (result) {
767
789
            quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0,
768
 
                              bdrv_nb_sectors(s->children[i]->bs),
 
790
                              bdrv_getlength(s->children[i]->bs),
769
791
                              s->children[i]->bs->node_name, result);
770
792
            result_value.l = result;
771
793
            quorum_count_vote(&error_votes, &result_value, i);
1010
1032
 
1011
1033
    /* We can safely add the child now */
1012
1034
    bdrv_ref(child_bs);
1013
 
    child = bdrv_attach_child(bs, child_bs, indexstr, &child_format);
 
1035
 
 
1036
    child = bdrv_attach_child(bs, child_bs, indexstr, &child_format, errp);
 
1037
    if (child == NULL) {
 
1038
        s->next_child_index--;
 
1039
        bdrv_unref(child_bs);
 
1040
        goto out;
 
1041
    }
1014
1042
    s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
1015
1043
    s->children[s->num_children++] = child;
1016
1044
 
 
1045
out:
1017
1046
    bdrv_drained_end(bs);
1018
1047
}
1019
1048
 
1098
1127
 
1099
1128
    .bdrv_getlength                     = quorum_getlength,
1100
1129
 
1101
 
    .bdrv_aio_readv                     = quorum_aio_readv,
1102
 
    .bdrv_aio_writev                    = quorum_aio_writev,
 
1130
    .bdrv_co_preadv                     = quorum_co_preadv,
 
1131
    .bdrv_co_pwritev                    = quorum_co_pwritev,
1103
1132
 
1104
1133
    .bdrv_add_child                     = quorum_add_child,
1105
1134
    .bdrv_del_child                     = quorum_del_child,
1106
1135
 
 
1136
    .bdrv_child_perm                    = bdrv_filter_default_perms,
 
1137
 
1107
1138
    .is_filter                          = true,
1108
1139
    .bdrv_recurse_is_first_non_filter   = quorum_recurse_is_first_non_filter,
1109
1140
};