133
134
int children_read; /* how many children have been read from */
136
static bool quorum_vote(QuorumAIOCB *acb);
138
static void quorum_aio_cancel(BlockAIOCB *blockacb)
140
QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
141
BDRVQuorumState *s = acb->common.bs->opaque;
144
/* cancel all callbacks */
145
for (i = 0; i < s->num_children; i++) {
146
if (acb->qcrs[i].aiocb) {
147
bdrv_aio_cancel_async(acb->qcrs[i].aiocb);
152
static AIOCBInfo quorum_aiocb_info = {
153
.aiocb_size = sizeof(QuorumAIOCB),
154
.cancel_async = quorum_aio_cancel,
137
typedef struct QuorumCo {
157
142
static void quorum_aio_finalize(QuorumAIOCB *acb)
159
acb->common.cb(acb->common.opaque, acb->vote_ret);
160
144
g_free(acb->qcrs);
164
148
static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
171
155
return a->l == b->l;
174
static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
175
BlockDriverState *bs,
158
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
176
159
QEMUIOVector *qiov,
179
BlockCompletionFunc *cb,
182
QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
163
BDRVQuorumState *s = bs->opaque;
164
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
185
acb->common.bs->opaque = s;
186
acb->sector_num = sector_num;
187
acb->nb_sectors = nb_sectors;
167
*acb = (QuorumAIOCB) {
168
.co = qemu_coroutine_self(),
173
.votes.compare = quorum_sha256_compare,
174
.votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
189
177
acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
191
acb->success_count = 0;
192
acb->rewrite_count = 0;
193
acb->votes.compare = quorum_sha256_compare;
194
QLIST_INIT(&acb->votes.vote_list);
195
acb->is_read = false;
198
178
for (i = 0; i < s->num_children; i++) {
199
179
acb->qcrs[i].buf = NULL;
200
180
acb->qcrs[i].ret = 0;
207
static void quorum_report_bad(QuorumOpType type, uint64_t sector_num,
208
int nb_sectors, char *node_name, int ret)
187
static void quorum_report_bad(QuorumOpType type, uint64_t offset,
188
uint64_t bytes, char *node_name, int ret)
210
190
const char *msg = NULL;
191
int64_t start_sector = offset / BDRV_SECTOR_SIZE;
192
int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
212
195
msg = strerror(-ret);
215
qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name,
216
sector_num, nb_sectors, &error_abort);
198
qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
199
end_sector - start_sector, &error_abort);
219
202
static void quorum_report_failure(QuorumAIOCB *acb)
221
const char *reference = bdrv_get_device_or_node_name(acb->common.bs);
222
qapi_event_send_quorum_failure(reference, acb->sector_num,
223
acb->nb_sectors, &error_abort);
204
const char *reference = bdrv_get_device_or_node_name(acb->bs);
205
int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
206
int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
209
qapi_event_send_quorum_failure(reference, start_sector,
210
end_sector - start_sector, &error_abort);
226
213
static int quorum_vote_error(QuorumAIOCB *acb);
228
215
static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
230
BDRVQuorumState *s = acb->common.bs->opaque;
217
BDRVQuorumState *s = acb->bs->opaque;
232
219
if (acb->success_count < s->threshold) {
233
220
acb->vote_ret = quorum_vote_error(acb);
273
245
QuorumAIOCB *acb = sacb->parent;
274
246
QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
275
quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
276
sacb->aiocb->bs->node_name, ret);
279
static void quorum_fifo_aio_cb(void *opaque, int ret)
281
QuorumChildRequest *sacb = opaque;
282
QuorumAIOCB *acb = sacb->parent;
283
BDRVQuorumState *s = acb->common.bs->opaque;
285
assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
288
quorum_report_bad_acb(sacb, ret);
290
/* We try to read next child in FIFO order if we fail to read */
291
if (acb->children_read < s->num_children) {
292
read_fifo_child(acb);
299
/* FIXME: rewrite failed children if acb->children_read > 1? */
300
quorum_aio_finalize(acb);
303
static void quorum_aio_cb(void *opaque, int ret)
305
QuorumChildRequest *sacb = opaque;
306
QuorumAIOCB *acb = sacb->parent;
307
BDRVQuorumState *s = acb->common.bs->opaque;
308
bool rewrite = false;
313
acb->success_count++;
315
quorum_report_bad_acb(sacb, ret);
318
assert(acb->count <= s->num_children);
319
assert(acb->success_count <= s->num_children);
320
if (acb->count < s->num_children) {
324
/* Do the vote on read */
326
rewrite = quorum_vote(acb);
327
for (i = 0; i < s->num_children; i++) {
328
qemu_vfree(acb->qcrs[i].buf);
329
qemu_iovec_destroy(&acb->qcrs[i].qiov);
332
quorum_has_too_much_io_failed(acb);
335
/* if no rewrite is done the code will finish right away */
337
quorum_aio_finalize(acb);
247
quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
341
250
static void quorum_report_bad_versions(BDRVQuorumState *s,
352
261
QLIST_FOREACH(item, &version->items, next) {
353
quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num,
262
quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
355
263
s->children[item->index]->bs->node_name, 0);
360
static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
268
static void quorum_rewrite_entry(void *opaque)
270
QuorumCo *co = opaque;
271
QuorumAIOCB *acb = co->acb;
272
BDRVQuorumState *s = acb->bs->opaque;
274
/* Ignore any errors, it's just a correction attempt for already
276
bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
279
/* Wake up the caller after the last rewrite */
280
acb->rewrite_count--;
281
if (!acb->rewrite_count) {
282
qemu_coroutine_enter_if_inactive(acb->co);
286
static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
361
287
QuorumVoteValue *value)
363
289
QuorumVoteVersion *version;
642
571
/* corruption correction is enabled */
643
572
if (s->rewrite_corrupted) {
644
rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value);
573
quorum_rewrite_bad_versions(acb, &winner->value);
649
578
quorum_free_vote_list(&acb->votes);
653
static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
655
BDRVQuorumState *s = acb->common.bs->opaque;
581
static void read_quorum_children_entry(void *opaque)
583
QuorumCo *co = opaque;
584
QuorumAIOCB *acb = co->acb;
585
BDRVQuorumState *s = acb->bs->opaque;
587
QuorumChildRequest *sacb = &acb->qcrs[i];
589
sacb->bs = s->children[i]->bs;
590
sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
591
&acb->qcrs[i].qiov, 0);
593
if (sacb->ret == 0) {
594
acb->success_count++;
596
quorum_report_bad_acb(sacb, sacb->ret);
600
assert(acb->count <= s->num_children);
601
assert(acb->success_count <= s->num_children);
603
/* Wake up the caller after the last read */
604
if (acb->count == s->num_children) {
605
qemu_coroutine_enter_if_inactive(acb->co);
609
static int read_quorum_children(QuorumAIOCB *acb)
611
BDRVQuorumState *s = acb->bs->opaque;
658
614
acb->children_read = s->num_children;
659
615
for (i = 0; i < s->num_children; i++) {
665
621
for (i = 0; i < s->num_children; i++) {
666
acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num,
667
&acb->qcrs[i].qiov, acb->nb_sectors,
668
quorum_aio_cb, &acb->qcrs[i]);
628
co = qemu_coroutine_create(read_quorum_children_entry, &data);
629
qemu_coroutine_enter(co);
632
while (acb->count < s->num_children) {
633
qemu_coroutine_yield();
636
/* Do the vote on read */
638
for (i = 0; i < s->num_children; i++) {
639
qemu_vfree(acb->qcrs[i].buf);
640
qemu_iovec_destroy(&acb->qcrs[i].qiov);
643
while (acb->rewrite_count) {
644
qemu_coroutine_yield();
674
static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
652
static int read_fifo_child(QuorumAIOCB *acb)
676
BDRVQuorumState *s = acb->common.bs->opaque;
677
int n = acb->children_read++;
679
acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num,
680
acb->qiov, acb->nb_sectors,
681
quorum_fifo_aio_cb, &acb->qcrs[n]);
654
BDRVQuorumState *s = acb->bs->opaque;
657
/* We try to read the next child in FIFO order if we failed to read */
659
n = acb->children_read++;
660
acb->qcrs[n].bs = s->children[n]->bs;
661
ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
664
quorum_report_bad_acb(&acb->qcrs[n], ret);
666
} while (ret < 0 && acb->children_read < s->num_children);
668
/* FIXME: rewrite failed children if acb->children_read > 1? */
686
static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs,
690
BlockCompletionFunc *cb,
673
static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
674
uint64_t bytes, QEMUIOVector *qiov, int flags)
693
676
BDRVQuorumState *s = bs->opaque;
694
QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
695
nb_sectors, cb, opaque);
677
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
696
680
acb->is_read = true;
697
681
acb->children_read = 0;
699
683
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
700
return read_quorum_children(acb);
703
return read_fifo_child(acb);
706
static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
710
BlockCompletionFunc *cb,
684
ret = read_quorum_children(acb);
686
ret = read_fifo_child(acb);
688
quorum_aio_finalize(acb);
693
static void write_quorum_entry(void *opaque)
695
QuorumCo *co = opaque;
696
QuorumAIOCB *acb = co->acb;
697
BDRVQuorumState *s = acb->bs->opaque;
699
QuorumChildRequest *sacb = &acb->qcrs[i];
701
sacb->bs = s->children[i]->bs;
702
sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
704
if (sacb->ret == 0) {
705
acb->success_count++;
707
quorum_report_bad_acb(sacb, sacb->ret);
710
assert(acb->count <= s->num_children);
711
assert(acb->success_count <= s->num_children);
713
/* Wake up the caller after the last write */
714
if (acb->count == s->num_children) {
715
qemu_coroutine_enter_if_inactive(acb->co);
719
static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
720
uint64_t bytes, QEMUIOVector *qiov, int flags)
713
722
BDRVQuorumState *s = bs->opaque;
714
QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
723
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
718
726
for (i = 0; i < s->num_children; i++) {
719
acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num,
720
qiov, nb_sectors, &quorum_aio_cb,
733
co = qemu_coroutine_create(write_quorum_entry, &data);
734
qemu_coroutine_enter(co);
737
while (acb->count < s->num_children) {
738
qemu_coroutine_yield();
741
quorum_has_too_much_io_failed(acb);
744
quorum_aio_finalize(acb);
727
749
static int64_t quorum_getlength(BlockDriverState *bs)