~pmdj/ubuntu/trusty/qemu/2.9+applesmc+fadtv3

« back to all changes in this revision

Viewing changes to util/qemu-coroutine-lock.c

  • Committer: Phil Dennis-Jordan
  • Date: 2017-07-21 08:03:43 UTC
  • mfrom: (1.1.1)
  • Revision ID: phil@philjordan.eu-20170721080343-2yr2vdj7713czahv
New upstream release 2.9.0.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
21
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22
22
 * THE SOFTWARE.
 
23
 *
 
24
 * The lock-free mutex implementation is based on OSv
 
25
 * (core/lfmutex.cc, include/lockfree/mutex.hh).
 
26
 * Copyright (C) 2013 Cloudius Systems, Ltd.
23
27
 */
24
28
 
25
29
#include "qemu/osdep.h"
26
30
#include "qemu-common.h"
27
31
#include "qemu/coroutine.h"
28
32
#include "qemu/coroutine_int.h"
 
33
#include "qemu/processor.h"
29
34
#include "qemu/queue.h"
 
35
#include "block/aio.h"
30
36
#include "trace.h"
31
37
 
32
38
void qemu_co_queue_init(CoQueue *queue)
34
40
    QSIMPLEQ_INIT(&queue->entries);
35
41
}
36
42
 
37
 
void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
 
43
void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex)
38
44
{
39
45
    Coroutine *self = qemu_coroutine_self();
40
46
    QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
 
47
 
 
48
    if (mutex) {
 
49
        qemu_co_mutex_unlock(mutex);
 
50
    }
 
51
 
 
52
    /* There is no race condition here.  Other threads will call
 
53
     * aio_co_schedule on our AioContext, which can reenter this
 
54
     * coroutine but only after this yield and after the main loop
 
55
     * has gone through the next iteration.
 
56
     */
41
57
    qemu_coroutine_yield();
42
58
    assert(qemu_in_coroutine());
 
59
 
 
60
    /* TODO: OSv implements wait morphing here, where the wakeup
 
61
     * primitive automatically places the woken coroutine on the
 
62
     * mutex's queue.  This avoids the thundering herd effect.
 
63
     */
 
64
    if (mutex) {
 
65
        qemu_co_mutex_lock(mutex);
 
66
    }
43
67
}
44
68
 
45
69
/**
63
87
 
64
88
static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
65
89
{
66
 
    Coroutine *self = qemu_coroutine_self();
67
90
    Coroutine *next;
68
91
 
69
92
    if (QSIMPLEQ_EMPTY(&queue->entries)) {
72
95
 
73
96
    while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
74
97
        QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
75
 
        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
76
 
        trace_qemu_co_queue_next(next);
 
98
        aio_co_wake(next);
77
99
        if (single) {
78
100
            break;
79
101
        }
112
134
    return QSIMPLEQ_FIRST(&queue->entries) == NULL;
113
135
}
114
136
 
 
137
/* The wait records are handled with a multiple-producer, single-consumer
 
138
 * lock-free queue.  There cannot be two concurrent pop_waiter() calls
 
139
 * because pop_waiter() can only be called while mutex->handoff is zero.
 
140
 * This can happen in three cases:
 
141
 * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
 
142
 *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
 
143
 *   not take part in the handoff.
 
144
 * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
 
145
 *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
 
146
 *   the cmpxchg (it will see either 0 or the next sequence value) and
 
147
 *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
 
148
 *   woken up someone.
 
149
 * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
 
150
 *   In this case another iteration starts with mutex->handoff == 0;
 
151
 *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
 
152
 *   qemu_co_mutex_unlock will go back to case (1).
 
153
 *
 
154
 * The following functions manage this queue.
 
155
 */
 
156
typedef struct CoWaitRecord {
 
157
    Coroutine *co;
 
158
    QSLIST_ENTRY(CoWaitRecord) next;
 
159
} CoWaitRecord;
 
160
 
 
161
static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
 
162
{
 
163
    w->co = qemu_coroutine_self();
 
164
    QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
 
165
}
 
166
 
 
167
static void move_waiters(CoMutex *mutex)
 
168
{
 
169
    QSLIST_HEAD(, CoWaitRecord) reversed;
 
170
    QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
 
171
    while (!QSLIST_EMPTY(&reversed)) {
 
172
        CoWaitRecord *w = QSLIST_FIRST(&reversed);
 
173
        QSLIST_REMOVE_HEAD(&reversed, next);
 
174
        QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
 
175
    }
 
176
}
 
177
 
 
178
static CoWaitRecord *pop_waiter(CoMutex *mutex)
 
179
{
 
180
    CoWaitRecord *w;
 
181
 
 
182
    if (QSLIST_EMPTY(&mutex->to_pop)) {
 
183
        move_waiters(mutex);
 
184
        if (QSLIST_EMPTY(&mutex->to_pop)) {
 
185
            return NULL;
 
186
        }
 
187
    }
 
188
    w = QSLIST_FIRST(&mutex->to_pop);
 
189
    QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
 
190
    return w;
 
191
}
 
192
 
 
193
static bool has_waiters(CoMutex *mutex)
 
194
{
 
195
    return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
 
196
}
 
197
 
115
198
void qemu_co_mutex_init(CoMutex *mutex)
116
199
{
117
200
    memset(mutex, 0, sizeof(*mutex));
118
 
    qemu_co_queue_init(&mutex->queue);
 
201
}
 
202
 
 
203
static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
 
204
{
 
205
    /* Read co before co->ctx; pairs with smp_wmb() in
 
206
     * qemu_coroutine_enter().
 
207
     */
 
208
    smp_read_barrier_depends();
 
209
    mutex->ctx = co->ctx;
 
210
    aio_co_wake(co);
 
211
}
 
212
 
 
213
static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
 
214
                                                     CoMutex *mutex)
 
215
{
 
216
    Coroutine *self = qemu_coroutine_self();
 
217
    CoWaitRecord w;
 
218
    unsigned old_handoff;
 
219
 
 
220
    trace_qemu_co_mutex_lock_entry(mutex, self);
 
221
    w.co = self;
 
222
    push_waiter(mutex, &w);
 
223
 
 
224
    /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
 
225
     * a concurrent unlock() the responsibility of waking somebody up.
 
226
     */
 
227
    old_handoff = atomic_mb_read(&mutex->handoff);
 
228
    if (old_handoff &&
 
229
        has_waiters(mutex) &&
 
230
        atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
 
231
        /* There can be no concurrent pops, because there can be only
 
232
         * one active handoff at a time.
 
233
         */
 
234
        CoWaitRecord *to_wake = pop_waiter(mutex);
 
235
        Coroutine *co = to_wake->co;
 
236
        if (co == self) {
 
237
            /* We got the lock ourselves!  */
 
238
            assert(to_wake == &w);
 
239
            mutex->ctx = ctx;
 
240
            return;
 
241
        }
 
242
 
 
243
        qemu_co_mutex_wake(mutex, co);
 
244
    }
 
245
 
 
246
    qemu_coroutine_yield();
 
247
    trace_qemu_co_mutex_lock_return(mutex, self);
119
248
}
120
249
 
121
250
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
122
251
{
 
252
    AioContext *ctx = qemu_get_current_aio_context();
123
253
    Coroutine *self = qemu_coroutine_self();
124
 
 
125
 
    trace_qemu_co_mutex_lock_entry(mutex, self);
126
 
 
127
 
    while (mutex->locked) {
128
 
        qemu_co_queue_wait(&mutex->queue);
129
 
    }
130
 
 
131
 
    mutex->locked = true;
 
254
    int waiters, i;
 
255
 
 
256
    /* Running a very small critical section on pthread_mutex_t and CoMutex
 
257
     * shows that pthread_mutex_t is much faster because it doesn't actually
 
258
     * go to sleep.  What happens is that the critical section is shorter
 
259
     * than the latency of entering the kernel and thus FUTEX_WAIT always
 
260
     * fails.  With CoMutex there is no such latency but you still want to
 
261
     * avoid wait and wakeup.  So introduce it artificially.
 
262
     */
 
263
    i = 0;
 
264
retry_fast_path:
 
265
    waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
 
266
    if (waiters != 0) {
 
267
        while (waiters == 1 && ++i < 1000) {
 
268
            if (atomic_read(&mutex->ctx) == ctx) {
 
269
                break;
 
270
            }
 
271
            if (atomic_read(&mutex->locked) == 0) {
 
272
                goto retry_fast_path;
 
273
            }
 
274
            cpu_relax();
 
275
        }
 
276
        waiters = atomic_fetch_inc(&mutex->locked);
 
277
    }
 
278
 
 
279
    if (waiters == 0) {
 
280
        /* Uncontended.  */
 
281
        trace_qemu_co_mutex_lock_uncontended(mutex, self);
 
282
        mutex->ctx = ctx;
 
283
    } else {
 
284
        qemu_co_mutex_lock_slowpath(ctx, mutex);
 
285
    }
132
286
    mutex->holder = self;
133
287
    self->locks_held++;
134
 
 
135
 
    trace_qemu_co_mutex_lock_return(mutex, self);
136
288
}
137
289
 
138
290
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
141
293
 
142
294
    trace_qemu_co_mutex_unlock_entry(mutex, self);
143
295
 
144
 
    assert(mutex->locked == true);
 
296
    assert(mutex->locked);
145
297
    assert(mutex->holder == self);
146
298
    assert(qemu_in_coroutine());
147
299
 
148
 
    mutex->locked = false;
 
300
    mutex->ctx = NULL;
149
301
    mutex->holder = NULL;
150
302
    self->locks_held--;
151
 
    qemu_co_queue_next(&mutex->queue);
 
303
    if (atomic_fetch_dec(&mutex->locked) == 1) {
 
304
        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
 
305
        return;
 
306
    }
 
307
 
 
308
    for (;;) {
 
309
        CoWaitRecord *to_wake = pop_waiter(mutex);
 
310
        unsigned our_handoff;
 
311
 
 
312
        if (to_wake) {
 
313
            qemu_co_mutex_wake(mutex, to_wake->co);
 
314
            break;
 
315
        }
 
316
 
 
317
        /* Some concurrent lock() is in progress (we know this because
 
318
         * mutex->locked was >1) but it hasn't yet put itself on the wait
 
319
         * queue.  Pick a sequence number for the handoff protocol (not 0).
 
320
         */
 
321
        if (++mutex->sequence == 0) {
 
322
            mutex->sequence = 1;
 
323
        }
 
324
 
 
325
        our_handoff = mutex->sequence;
 
326
        atomic_mb_set(&mutex->handoff, our_handoff);
 
327
        if (!has_waiters(mutex)) {
 
328
            /* The concurrent lock has not added itself yet, so it
 
329
             * will be able to pick our handoff.
 
330
             */
 
331
            break;
 
332
        }
 
333
 
 
334
        /* Try to do the handoff protocol ourselves; if somebody else has
 
335
         * already taken it, however, we're done and they're responsible.
 
336
         */
 
337
        if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
 
338
            break;
 
339
        }
 
340
    }
152
341
 
153
342
    trace_qemu_co_mutex_unlock_return(mutex, self);
154
343
}
157
346
{
158
347
    memset(lock, 0, sizeof(*lock));
159
348
    qemu_co_queue_init(&lock->queue);
 
349
    qemu_co_mutex_init(&lock->mutex);
160
350
}
161
351
 
162
352
void qemu_co_rwlock_rdlock(CoRwlock *lock)
163
353
{
164
354
    Coroutine *self = qemu_coroutine_self();
165
355
 
166
 
    while (lock->writer) {
167
 
        qemu_co_queue_wait(&lock->queue);
 
356
    qemu_co_mutex_lock(&lock->mutex);
 
357
    /* For fairness, wait if a writer is in line.  */
 
358
    while (lock->pending_writer) {
 
359
        qemu_co_queue_wait(&lock->queue, &lock->mutex);
168
360
    }
169
361
    lock->reader++;
 
362
    qemu_co_mutex_unlock(&lock->mutex);
 
363
 
 
364
    /* The rest of the read-side critical section is run without the mutex.  */
170
365
    self->locks_held++;
171
366
}
172
367
 
175
370
    Coroutine *self = qemu_coroutine_self();
176
371
 
177
372
    assert(qemu_in_coroutine());
178
 
    if (lock->writer) {
179
 
        lock->writer = false;
 
373
    if (!lock->reader) {
 
374
        /* The critical section started in qemu_co_rwlock_wrlock.  */
180
375
        qemu_co_queue_restart_all(&lock->queue);
181
376
    } else {
 
377
        self->locks_held--;
 
378
 
 
379
        qemu_co_mutex_lock(&lock->mutex);
182
380
        lock->reader--;
183
381
        assert(lock->reader >= 0);
184
382
        /* Wakeup only one waiting writer */
186
384
            qemu_co_queue_next(&lock->queue);
187
385
        }
188
386
    }
189
 
    self->locks_held--;
 
387
    qemu_co_mutex_unlock(&lock->mutex);
190
388
}
191
389
 
192
390
void qemu_co_rwlock_wrlock(CoRwlock *lock)
193
391
{
194
 
    Coroutine *self = qemu_coroutine_self();
 
392
    qemu_co_mutex_lock(&lock->mutex);
 
393
    lock->pending_writer++;
 
394
    while (lock->reader) {
 
395
        qemu_co_queue_wait(&lock->queue, &lock->mutex);
 
396
    }
 
397
    lock->pending_writer--;
195
398
 
196
 
    while (lock->writer || lock->reader) {
197
 
        qemu_co_queue_wait(&lock->queue);
198
 
    }
199
 
    lock->writer = true;
200
 
    self->locks_held++;
 
399
    /* The rest of the write-side critical section is run with
 
400
     * the mutex taken, so that lock->reader remains zero.
 
401
     * There is no need to update self->locks_held.
 
402
     */
201
403
}