2
* QEMU posix-aio emulation
4
* Copyright IBM, Corp. 2008
7
* Anthony Liguori <aliguori@us.ibm.com>
9
* This work is licensed under the terms of the GNU GPL, version 2. See
10
* the COPYING file in the top-level directory.
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
24
#include "qemu-queue.h"
27
#include "qemu-common.h"
29
#include "block_int.h"
31
#include "block/raw-posix-aio.h"
33
static void do_spawn_thread(void);
36
BlockDriverAIOCB common;
39
struct iovec *aio_iov;
44
#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
47
QTAILQ_ENTRY(qemu_paiocb) node;
51
struct qemu_paiocb *next;
54
typedef struct PosixAioState {
56
struct qemu_paiocb *first_aio;
60
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
61
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
62
static pthread_t thread_id;
63
static pthread_attr_t attr;
64
static int max_threads = 64;
65
static int cur_threads = 0;
66
static int idle_threads = 0;
67
static int new_threads = 0; /* backlog of threads we need to create */
68
static int pending_threads = 0; /* threads created but not running yet */
69
static QEMUBH *new_thread_bh;
70
static QTAILQ_HEAD(, qemu_paiocb) request_list;
73
static int preadv_present = 1;
75
static int preadv_present = 0;
78
static void die2(int err, const char *what)
80
fprintf(stderr, "%s failed: %s\n", what, strerror(err));
84
static void die(const char *what)
89
static void mutex_lock(pthread_mutex_t *mutex)
91
int ret = pthread_mutex_lock(mutex);
92
if (ret) die2(ret, "pthread_mutex_lock");
95
static void mutex_unlock(pthread_mutex_t *mutex)
97
int ret = pthread_mutex_unlock(mutex);
98
if (ret) die2(ret, "pthread_mutex_unlock");
101
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
104
int ret = pthread_cond_timedwait(cond, mutex, ts);
105
if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
109
static void cond_signal(pthread_cond_t *cond)
111
int ret = pthread_cond_signal(cond);
112
if (ret) die2(ret, "pthread_cond_signal");
115
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
116
void *(*start_routine)(void*), void *arg)
118
int ret = pthread_create(thread, attr, start_routine, arg);
119
if (ret) die2(ret, "pthread_create");
122
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
126
ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
131
* This looks weird, but the aio code only consideres a request
132
* successful if it has written the number full number of bytes.
134
* Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
135
* so in fact we return the ioctl command here to make posix_aio_read()
138
return aiocb->aio_nbytes;
141
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
145
ret = qemu_fdatasync(aiocb->aio_fildes);
154
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
156
return preadv(fd, iov, nr_iov, offset);
160
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
162
return pwritev(fd, iov, nr_iov, offset);
168
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
174
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
181
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
186
if (aiocb->aio_type & QEMU_AIO_WRITE)
187
len = qemu_pwritev(aiocb->aio_fildes,
192
len = qemu_preadv(aiocb->aio_fildes,
196
} while (len == -1 && errno == EINTR);
204
* Read/writes the data to/from a given linear buffer.
206
* Returns the number of bytes handles or -errno in case of an error. Short
207
* reads are only returned if the end of the file is reached.
209
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
214
while (offset < aiocb->aio_nbytes) {
215
if (aiocb->aio_type & QEMU_AIO_WRITE)
216
len = pwrite(aiocb->aio_fildes,
217
(const char *)buf + offset,
218
aiocb->aio_nbytes - offset,
219
aiocb->aio_offset + offset);
221
len = pread(aiocb->aio_fildes,
223
aiocb->aio_nbytes - offset,
224
aiocb->aio_offset + offset);
226
if (len == -1 && errno == EINTR)
228
else if (len == -1) {
240
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
245
if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
247
* If there is just a single buffer, and it is properly aligned
248
* we can just use plain pread/pwrite without any problems.
250
if (aiocb->aio_niov == 1)
251
return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
254
* We have more than one iovec, and all are properly aligned.
256
* Try preadv/pwritev first and fall back to linearizing the
257
* buffer if it's not supported.
259
if (preadv_present) {
260
nbytes = handle_aiocb_rw_vector(aiocb);
261
if (nbytes == aiocb->aio_nbytes)
263
if (nbytes < 0 && nbytes != -ENOSYS)
269
* XXX(hch): short read/write. no easy way to handle the reminder
270
* using these interfaces. For now retry using plain
276
* Ok, we have to do it the hard way, copy all segments into
277
* a single aligned buffer.
279
buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
280
if (aiocb->aio_type & QEMU_AIO_WRITE) {
284
for (i = 0; i < aiocb->aio_niov; ++i) {
285
memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
286
p += aiocb->aio_iov[i].iov_len;
290
nbytes = handle_aiocb_rw_linear(aiocb, buf);
291
if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
293
size_t count = aiocb->aio_nbytes, copy;
296
for (i = 0; i < aiocb->aio_niov && count; ++i) {
298
if (copy > aiocb->aio_iov[i].iov_len)
299
copy = aiocb->aio_iov[i].iov_len;
300
memcpy(aiocb->aio_iov[i].iov_base, p, copy);
310
static void posix_aio_notify_event(void);
312
static void *aio_thread(void *unused)
320
struct qemu_paiocb *aiocb;
325
qemu_gettimeofday(&tv);
326
ts.tv_sec = tv.tv_sec + 10;
331
while (QTAILQ_EMPTY(&request_list) &&
332
!(ret == ETIMEDOUT)) {
334
ret = cond_timedwait(&cond, &lock, &ts);
338
if (QTAILQ_EMPTY(&request_list))
341
aiocb = QTAILQ_FIRST(&request_list);
342
QTAILQ_REMOVE(&request_list, aiocb, node);
346
switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
348
ret = handle_aiocb_rw(aiocb);
349
if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
350
/* A short read means that we have reached EOF. Pad the buffer
351
* with zeros for bytes after EOF. */
354
qemu_iovec_init_external(&qiov, aiocb->aio_iov,
356
qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
358
ret = aiocb->aio_nbytes;
362
ret = handle_aiocb_rw(aiocb);
365
ret = handle_aiocb_flush(aiocb);
368
ret = handle_aiocb_ioctl(aiocb);
371
fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
380
posix_aio_notify_event();
389
static void do_spawn_thread(void)
391
sigset_t set, oldset;
404
/* block all signals */
405
if (sigfillset(&set)) die("sigfillset");
406
if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
408
thread_create(&thread_id, &attr, aio_thread, NULL);
410
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
413
static void spawn_thread_bh_fn(void *opaque)
418
static void spawn_thread(void)
422
/* If there are threads being created, they will spawn new workers, so
423
* we don't spend time creating many threads in a loop holding a mutex or
424
* starving the current vcpu.
426
* If there are no idle threads, ask the main thread to create one, so we
427
* inherit the correct affinity instead of the vcpu affinity.
429
if (!pending_threads) {
430
qemu_bh_schedule(new_thread_bh);
434
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
436
aiocb->ret = -EINPROGRESS;
439
if (idle_threads == 0 && cur_threads < max_threads)
441
QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
446
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
457
static int qemu_paio_error(struct qemu_paiocb *aiocb)
459
ssize_t ret = qemu_paio_return(aiocb);
469
static int posix_aio_process_queue(void *opaque)
471
PosixAioState *s = opaque;
472
struct qemu_paiocb *acb, **pacb;
477
pacb = &s->first_aio;
483
ret = qemu_paio_error(acb);
484
if (ret == ECANCELED) {
485
/* remove the request */
487
qemu_aio_release(acb);
489
} else if (ret != EINPROGRESS) {
492
ret = qemu_paio_return(acb);
493
if (ret == acb->aio_nbytes)
501
trace_paio_complete(acb, acb->common.opaque, ret);
503
/* remove the request */
505
/* call the callback */
506
acb->common.cb(acb->common.opaque, ret);
507
qemu_aio_release(acb);
519
static void posix_aio_read(void *opaque)
521
PosixAioState *s = opaque;
524
/* read all bytes from signal pipe */
528
len = read(s->rfd, bytes, sizeof(bytes));
529
if (len == -1 && errno == EINTR)
530
continue; /* try again */
531
if (len == sizeof(bytes))
532
continue; /* more to read */
536
posix_aio_process_queue(s);
539
static int posix_aio_flush(void *opaque)
541
PosixAioState *s = opaque;
542
return !!s->first_aio;
545
static PosixAioState *posix_aio_state;
547
static void posix_aio_notify_event(void)
552
ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
553
if (ret < 0 && errno != EAGAIN)
557
static void paio_remove(struct qemu_paiocb *acb)
559
struct qemu_paiocb **pacb;
561
/* remove the callback from the queue */
562
pacb = &posix_aio_state->first_aio;
565
fprintf(stderr, "paio_remove: aio request not found!\n");
567
} else if (*pacb == acb) {
569
qemu_aio_release(acb);
572
pacb = &(*pacb)->next;
576
static void paio_cancel(BlockDriverAIOCB *blockacb)
578
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
581
trace_paio_cancel(acb, acb->common.opaque);
585
QTAILQ_REMOVE(&request_list, acb, node);
586
acb->ret = -ECANCELED;
587
} else if (acb->ret == -EINPROGRESS) {
593
/* fail safe: if the aio could not be canceled, we wait for
595
while (qemu_paio_error(acb) == EINPROGRESS)
602
static AIOPool raw_aio_pool = {
603
.aiocb_size = sizeof(struct qemu_paiocb),
604
.cancel = paio_cancel,
607
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
608
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
609
BlockDriverCompletionFunc *cb, void *opaque, int type)
611
struct qemu_paiocb *acb;
613
acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
616
acb->aio_type = type;
617
acb->aio_fildes = fd;
620
acb->aio_iov = qiov->iov;
621
acb->aio_niov = qiov->niov;
623
acb->aio_nbytes = nb_sectors * 512;
624
acb->aio_offset = sector_num * 512;
626
acb->next = posix_aio_state->first_aio;
627
posix_aio_state->first_aio = acb;
629
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
630
qemu_paio_submit(acb);
634
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
635
unsigned long int req, void *buf,
636
BlockDriverCompletionFunc *cb, void *opaque)
638
struct qemu_paiocb *acb;
640
acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
643
acb->aio_type = QEMU_AIO_IOCTL;
644
acb->aio_fildes = fd;
646
acb->aio_ioctl_buf = buf;
647
acb->aio_ioctl_cmd = req;
649
acb->next = posix_aio_state->first_aio;
650
posix_aio_state->first_aio = acb;
652
qemu_paio_submit(acb);
665
s = g_malloc(sizeof(PosixAioState));
668
if (qemu_pipe(fds) == -1) {
669
fprintf(stderr, "failed to create pipe\n");
677
fcntl(s->rfd, F_SETFL, O_NONBLOCK);
678
fcntl(s->wfd, F_SETFL, O_NONBLOCK);
680
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
681
posix_aio_process_queue, s);
683
ret = pthread_attr_init(&attr);
685
die2(ret, "pthread_attr_init");
687
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
689
die2(ret, "pthread_attr_setdetachstate");
691
QTAILQ_INIT(&request_list);
692
new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);