~james-page/ubuntu/saucy/openvswitch/1.12-snapshot

« back to all changes in this revision

Viewing changes to lib/async-append-aio.c

  • Committer: James Page
  • Date: 2013-08-21 10:16:57 UTC
  • mfrom: (1.1.20)
  • Revision ID: james.page@canonical.com-20130821101657-3o0z0qeiv5zkwlzi
New upstream snapshot

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2013 Nicira, Inc.
 
2
 *
 
3
 * Licensed under the Apache License, Version 2.0 (the "License");
 
4
 * you may not use this file except in compliance with the License.
 
5
 * You may obtain a copy of the License at:
 
6
 *
 
7
 *     http://www.apache.org/licenses/LICENSE-2.0
 
8
 *
 
9
 * Unless required by applicable law or agreed to in writing, software
 
10
 * distributed under the License is distributed on an "AS IS" BASIS,
 
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
 * See the License for the specific language governing permissions and
 
13
 * limitations under the License.
 
14
 */
 
15
 
 
16
#include <config.h>
 
17
 
 
18
/* This implementation of the async-append.h interface uses the POSIX
 
19
 * asynchronous I/O interface.  */
 
20
 
 
21
#include "async-append.h"
 
22
 
 
23
#include <aio.h>
 
24
#include <errno.h>
 
25
#include <stdlib.h>
 
26
#include <unistd.h>
 
27
 
 
28
#include "byteq.h"
 
29
#include "ovs-thread.h"
 
30
#include "util.h"
 
31
 
 
32
/* Maximum number of bytes of buffered data. */
 
33
enum { BUFFER_SIZE = 65536 };
 
34
 
 
35
/* Maximum number of aiocbs to use.
 
36
 *
 
37
 * aiocbs are big (144 bytes with glibc 2.11 on i386) so we try to allow for a
 
38
 * reasonable number by basing the number we allocate on the amount of buffer
 
39
 * space. */
 
40
enum { MAX_CBS = ROUND_DOWN_POW2(BUFFER_SIZE / sizeof(struct aiocb)) };
 
41
BUILD_ASSERT_DECL(IS_POW2(MAX_CBS));
 
42
 
 
43
struct async_append {
 
44
    int fd;
 
45
 
 
46
    struct aiocb *aiocbs;
 
47
    unsigned int aiocb_head, aiocb_tail;
 
48
 
 
49
    uint8_t *buffer;
 
50
    struct byteq byteq;
 
51
};
 
52
 
 
53
static bool async_append_enabled;
 
54
 
 
55
void
 
56
async_append_enable(void)
 
57
{
 
58
    assert_single_threaded();
 
59
    forbid_forking("async i/o enabled");
 
60
    async_append_enabled = true;
 
61
}
 
62
 
 
63
struct async_append *
 
64
async_append_create(int fd)
 
65
{
 
66
    struct async_append *ap;
 
67
 
 
68
    ap = xmalloc(sizeof *ap);
 
69
    ap->fd = fd;
 
70
    ap->aiocbs = xmalloc(MAX_CBS * sizeof *ap->aiocbs);
 
71
    ap->aiocb_head = ap->aiocb_tail = 0;
 
72
    ap->buffer = xmalloc(BUFFER_SIZE);
 
73
    byteq_init(&ap->byteq, ap->buffer, BUFFER_SIZE);
 
74
 
 
75
    return ap;
 
76
}
 
77
 
 
78
void
 
79
async_append_destroy(struct async_append *ap)
 
80
{
 
81
    if (ap) {
 
82
        async_append_flush(ap);
 
83
        free(ap->aiocbs);
 
84
        free(ap->buffer);
 
85
        free(ap);
 
86
    }
 
87
}
 
88
 
 
89
static bool
 
90
async_append_is_full(const struct async_append *ap)
 
91
{
 
92
    return (ap->aiocb_head - ap->aiocb_tail >= MAX_CBS
 
93
            || byteq_is_full(&ap->byteq));
 
94
}
 
95
 
 
96
static bool
 
97
async_append_is_empty(const struct async_append *ap)
 
98
{
 
99
    return byteq_is_empty(&ap->byteq);
 
100
}
 
101
 
 
102
static void
 
103
async_append_wait(struct async_append *ap)
 
104
{
 
105
    int n = 0;
 
106
 
 
107
    while (!async_append_is_empty(ap)) {
 
108
        struct aiocb *aiocb = &ap->aiocbs[ap->aiocb_tail & (MAX_CBS - 1)];
 
109
        int error = aio_error(aiocb);
 
110
 
 
111
        if (error == EINPROGRESS) {
 
112
            const struct aiocb *p = aiocb;
 
113
            if (n > 0) {
 
114
                return;
 
115
            }
 
116
            aio_suspend(&p, 1, NULL);
 
117
        } else {
 
118
            ignore(aio_return(aiocb));
 
119
            ap->aiocb_tail++;
 
120
            byteq_advance_tail(&ap->byteq, aiocb->aio_nbytes);
 
121
            n++;
 
122
        }
 
123
    }
 
124
}
 
125
 
 
126
void
 
127
async_append_write(struct async_append *ap, const void *data_, size_t size)
 
128
{
 
129
    const uint8_t *data = data_;
 
130
 
 
131
    if (!async_append_enabled) {
 
132
        ignore(write(ap->fd, data, size));
 
133
        return;
 
134
    }
 
135
 
 
136
    while (size > 0) {
 
137
        struct aiocb *aiocb;
 
138
        size_t chunk_size;
 
139
        void *chunk;
 
140
 
 
141
        while (async_append_is_full(ap)) {
 
142
            async_append_wait(ap);
 
143
        }
 
144
 
 
145
        chunk = byteq_head(&ap->byteq);
 
146
        chunk_size = byteq_headroom(&ap->byteq);
 
147
        if (chunk_size > size) {
 
148
            chunk_size = size;
 
149
        }
 
150
        memcpy(chunk, data, chunk_size);
 
151
 
 
152
        aiocb = &ap->aiocbs[ap->aiocb_head & (MAX_CBS - 1)];
 
153
        memset(aiocb, 0, sizeof *aiocb);
 
154
        aiocb->aio_fildes = ap->fd;
 
155
        aiocb->aio_offset = 0;
 
156
        aiocb->aio_buf = chunk;
 
157
        aiocb->aio_nbytes = chunk_size;
 
158
        aiocb->aio_sigevent.sigev_notify = SIGEV_NONE;
 
159
        if (aio_write(aiocb) == -1) {
 
160
            async_append_flush(ap);
 
161
            ignore(write(ap->fd, data, size));
 
162
            return;
 
163
        }
 
164
 
 
165
        data += chunk_size;
 
166
        size -= chunk_size;
 
167
        byteq_advance_head(&ap->byteq, chunk_size);
 
168
        ap->aiocb_head++;
 
169
    }
 
170
}
 
171
 
 
172
void
 
173
async_append_flush(struct async_append *ap)
 
174
{
 
175
    while (!async_append_is_empty(ap)) {
 
176
        async_append_wait(ap);
 
177
    }
 
178
}