~ubuntu-branches/ubuntu/wily/ust/wily-proposed

« back to all changes in this revision

Viewing changes to libringbuffer/frontend_api.h

  • Committer: Package Import Robot
  • Author(s): Jon Bernard
  • Date: 2012-06-29 16:47:49 UTC
  • mto: (11.2.1 sid) (1.2.1)
  • mto: This revision was merged to the branch mainline in revision 19.
  • Revision ID: package-import@ubuntu.com-20120629164749-mcca8f7w9ovcktj2
Tags: upstream-2.0.4
ImportĀ upstreamĀ versionĀ 2.0.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#ifndef _LTTNG_RING_BUFFER_FRONTEND_API_H
 
2
#define _LTTNG_RING_BUFFER_FRONTEND_API_H
 
3
 
 
4
/*
 
5
 * libringbuffer/frontend_api.h
 
6
 *
 
7
 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
 
8
 *
 
9
 * This library is free software; you can redistribute it and/or
 
10
 * modify it under the terms of the GNU Lesser General Public
 
11
 * License as published by the Free Software Foundation; only
 
12
 * version 2.1 of the License.
 
13
 *
 
14
 * This library is distributed in the hope that it will be useful,
 
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
17
 * Lesser General Public License for more details.
 
18
 *
 
19
 * You should have received a copy of the GNU Lesser General Public
 
20
 * License along with this library; if not, write to the Free Software
 
21
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
22
 *
 
23
 * Ring Buffer Library Synchronization Header (buffer write API).
 
24
 *
 
25
 * Author:
 
26
 *      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
 
27
 *
 
28
 * See ring_buffer_frontend.c for more information on wait-free
 
29
 * algorithms.
 
30
 * See frontend.h for channel allocation and read-side API.
 
31
 */
 
32
 
 
33
#include "frontend.h"
 
34
#include <urcu-bp.h>
 
35
#include <urcu/compiler.h>
 
36
 
 
37
/**
 
38
 * lib_ring_buffer_get_cpu - Precedes ring buffer reserve/commit.
 
39
 *
 
40
 * Grabs RCU read-side lock and keeps a ring buffer nesting count as
 
41
 * supplementary safety net to ensure tracer client code will never
 
42
 * trigger an endless recursion. Returns the processor ID on success,
 
43
 * -EPERM on failure (nesting count too high).
 
44
 *
 
45
 * asm volatile and "memory" clobber prevent the compiler from moving
 
46
 * instructions out of the ring buffer nesting count. This is required to ensure
 
47
 * that probe side-effects which can cause recursion (e.g. unforeseen traps,
 
48
 * divisions by 0, ...) are triggered within the incremented nesting count
 
49
 * section.
 
50
 */
 
51
static inline
 
52
int lib_ring_buffer_get_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
 
53
{
 
54
        int cpu, nesting;
 
55
 
 
56
        rcu_read_lock();
 
57
        cpu = lttng_ust_get_cpu();
 
58
        nesting = ++lib_ring_buffer_nesting;    /* TLS */
 
59
        cmm_barrier();
 
60
 
 
61
        if (caa_unlikely(nesting > 4)) {
 
62
                WARN_ON_ONCE(1);
 
63
                lib_ring_buffer_nesting--;      /* TLS */
 
64
                rcu_read_unlock();
 
65
                return -EPERM;
 
66
        } else
 
67
                return cpu;
 
68
}
 
69
 
 
70
/**
 
71
 * lib_ring_buffer_put_cpu - Follows ring buffer reserve/commit.
 
72
 */
 
73
static inline
 
74
void lib_ring_buffer_put_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
 
75
{
 
76
        cmm_barrier();
 
77
        lib_ring_buffer_nesting--;              /* TLS */
 
78
        rcu_read_unlock();
 
79
}
 
80
 
 
81
/*
 
82
 * lib_ring_buffer_try_reserve is called by lib_ring_buffer_reserve(). It is not
 
83
 * part of the API per se.
 
84
 *
 
85
 * returns 0 if reserve ok, or 1 if the slow path must be taken.
 
86
 */
 
87
static inline
 
88
int lib_ring_buffer_try_reserve(const struct lttng_ust_lib_ring_buffer_config *config,
 
89
                                struct lttng_ust_lib_ring_buffer_ctx *ctx,
 
90
                                unsigned long *o_begin, unsigned long *o_end,
 
91
                                unsigned long *o_old, size_t *before_hdr_pad)
 
92
{
 
93
        struct channel *chan = ctx->chan;
 
94
        struct lttng_ust_lib_ring_buffer *buf = ctx->buf;
 
95
        *o_begin = v_read(config, &buf->offset);
 
96
        *o_old = *o_begin;
 
97
 
 
98
        ctx->tsc = lib_ring_buffer_clock_read(chan);
 
99
        if ((int64_t) ctx->tsc == -EIO)
 
100
                return 1;
 
101
 
 
102
        /*
 
103
         * Prefetch cacheline for read because we have to read the previous
 
104
         * commit counter to increment it and commit seq value to compare it to
 
105
         * the commit counter.
 
106
         */
 
107
        //prefetch(&buf->commit_hot[subbuf_index(*o_begin, chan)]);
 
108
 
 
109
        if (last_tsc_overflow(config, buf, ctx->tsc))
 
110
                ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
 
111
 
 
112
        if (caa_unlikely(subbuf_offset(*o_begin, chan) == 0))
 
113
                return 1;
 
114
 
 
115
        ctx->slot_size = record_header_size(config, chan, *o_begin,
 
116
                                            before_hdr_pad, ctx);
 
117
        ctx->slot_size +=
 
118
                lib_ring_buffer_align(*o_begin + ctx->slot_size,
 
119
                                      ctx->largest_align) + ctx->data_size;
 
120
        if (caa_unlikely((subbuf_offset(*o_begin, chan) + ctx->slot_size)
 
121
                     > chan->backend.subbuf_size))
 
122
                return 1;
 
123
 
 
124
        /*
 
125
         * Record fits in the current buffer and we are not on a switch
 
126
         * boundary. It's safe to write.
 
127
         */
 
128
        *o_end = *o_begin + ctx->slot_size;
 
129
 
 
130
        if (caa_unlikely((subbuf_offset(*o_end, chan)) == 0))
 
131
                /*
 
132
                 * The offset_end will fall at the very beginning of the next
 
133
                 * subbuffer.
 
134
                 */
 
135
                return 1;
 
136
 
 
137
        return 0;
 
138
}
 
139
 
 
140
/**
 
141
 * lib_ring_buffer_reserve - Reserve space in a ring buffer.
 
142
 * @config: ring buffer instance configuration.
 
143
 * @ctx: ring buffer context. (input and output) Must be already initialized.
 
144
 *
 
145
 * Atomic wait-free slot reservation. The reserved space starts at the context
 
146
 * "pre_offset". Its length is "slot_size". The associated time-stamp is "tsc".
 
147
 *
 
148
 * Return :
 
149
 *  0 on success.
 
150
 * -EAGAIN if channel is disabled.
 
151
 * -ENOSPC if event size is too large for packet.
 
152
 * -ENOBUFS if there is currently not enough space in buffer for the event.
 
153
 * -EIO if data cannot be written into the buffer for any other reason.
 
154
 */
 
155
 
 
156
static inline
 
157
int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *config,
 
158
                            struct lttng_ust_lib_ring_buffer_ctx *ctx)
 
159
{
 
160
        struct channel *chan = ctx->chan;
 
161
        struct lttng_ust_shm_handle *handle = ctx->handle;
 
162
        struct lttng_ust_lib_ring_buffer *buf;
 
163
        unsigned long o_begin, o_end, o_old;
 
164
        size_t before_hdr_pad = 0;
 
165
 
 
166
        if (uatomic_read(&chan->record_disabled))
 
167
                return -EAGAIN;
 
168
 
 
169
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
 
170
                buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
 
171
        else
 
172
                buf = shmp(handle, chan->backend.buf[0].shmp);
 
173
        if (uatomic_read(&buf->record_disabled))
 
174
                return -EAGAIN;
 
175
        ctx->buf = buf;
 
176
 
 
177
        /*
 
178
         * Perform retryable operations.
 
179
         */
 
180
        if (caa_unlikely(lib_ring_buffer_try_reserve(config, ctx, &o_begin,
 
181
                                                 &o_end, &o_old, &before_hdr_pad)))
 
182
                goto slow_path;
 
183
 
 
184
        if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
 
185
                     != o_old))
 
186
                goto slow_path;
 
187
 
 
188
        /*
 
189
         * Atomically update last_tsc. This update races against concurrent
 
190
         * atomic updates, but the race will always cause supplementary full TSC
 
191
         * record headers, never the opposite (missing a full TSC record header
 
192
         * when it would be needed).
 
193
         */
 
194
        save_last_tsc(config, ctx->buf, ctx->tsc);
 
195
 
 
196
        /*
 
197
         * Push the reader if necessary
 
198
         */
 
199
        lib_ring_buffer_reserve_push_reader(ctx->buf, chan, o_end - 1);
 
200
 
 
201
        /*
 
202
         * Clear noref flag for this subbuffer.
 
203
         */
 
204
        lib_ring_buffer_clear_noref(config, &ctx->buf->backend,
 
205
                                subbuf_index(o_end - 1, chan), handle);
 
206
 
 
207
        ctx->pre_offset = o_begin;
 
208
        ctx->buf_offset = o_begin + before_hdr_pad;
 
209
        return 0;
 
210
slow_path:
 
211
        return lib_ring_buffer_reserve_slow(ctx);
 
212
}
 
213
 
 
214
/**
 
215
 * lib_ring_buffer_switch - Perform a sub-buffer switch for a per-cpu buffer.
 
216
 * @config: ring buffer instance configuration.
 
217
 * @buf: buffer
 
218
 * @mode: buffer switch mode (SWITCH_ACTIVE or SWITCH_FLUSH)
 
219
 *
 
220
 * This operation is completely reentrant : can be called while tracing is
 
221
 * active with absolutely no lock held.
 
222
 *
 
223
 * Note, however, that as a v_cmpxchg is used for some atomic operations and
 
224
 * requires to be executed locally for per-CPU buffers, this function must be
 
225
 * called from the CPU which owns the buffer for a ACTIVE flush, with preemption
 
226
 * disabled, for RING_BUFFER_SYNC_PER_CPU configuration.
 
227
 */
 
228
static inline
 
229
void lib_ring_buffer_switch(const struct lttng_ust_lib_ring_buffer_config *config,
 
230
                            struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
 
231
                            struct lttng_ust_shm_handle *handle)
 
232
{
 
233
        lib_ring_buffer_switch_slow(buf, mode, handle);
 
234
}
 
235
 
 
236
/* See ring_buffer_frontend_api.h for lib_ring_buffer_reserve(). */
 
237
 
 
238
/**
 
239
 * lib_ring_buffer_commit - Commit an record.
 
240
 * @config: ring buffer instance configuration.
 
241
 * @ctx: ring buffer context. (input arguments only)
 
242
 *
 
243
 * Atomic unordered slot commit. Increments the commit count in the
 
244
 * specified sub-buffer, and delivers it if necessary.
 
245
 */
 
246
static inline
 
247
void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *config,
 
248
                            const struct lttng_ust_lib_ring_buffer_ctx *ctx)
 
249
{
 
250
        struct channel *chan = ctx->chan;
 
251
        struct lttng_ust_shm_handle *handle = ctx->handle;
 
252
        struct lttng_ust_lib_ring_buffer *buf = ctx->buf;
 
253
        unsigned long offset_end = ctx->buf_offset;
 
254
        unsigned long endidx = subbuf_index(offset_end - 1, chan);
 
255
        unsigned long commit_count;
 
256
 
 
257
        /*
 
258
         * Must count record before incrementing the commit count.
 
259
         */
 
260
        subbuffer_count_record(config, &buf->backend, endidx, handle);
 
261
 
 
262
        /*
 
263
         * Order all writes to buffer before the commit count update that will
 
264
         * determine that the subbuffer is full.
 
265
         */
 
266
        cmm_smp_wmb();
 
267
 
 
268
        v_add(config, ctx->slot_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
 
269
 
 
270
        /*
 
271
         * commit count read can race with concurrent OOO commit count updates.
 
272
         * This is only needed for lib_ring_buffer_check_deliver (for
 
273
         * non-polling delivery only) and for
 
274
         * lib_ring_buffer_write_commit_counter.  The race can only cause the
 
275
         * counter to be read with the same value more than once, which could
 
276
         * cause :
 
277
         * - Multiple delivery for the same sub-buffer (which is handled
 
278
         *   gracefully by the reader code) if the value is for a full
 
279
         *   sub-buffer. It's important that we can never miss a sub-buffer
 
280
         *   delivery. Re-reading the value after the v_add ensures this.
 
281
         * - Reading a commit_count with a higher value that what was actually
 
282
         *   added to it for the lib_ring_buffer_write_commit_counter call
 
283
         *   (again caused by a concurrent committer). It does not matter,
 
284
         *   because this function is interested in the fact that the commit
 
285
         *   count reaches back the reserve offset for a specific sub-buffer,
 
286
         *   which is completely independent of the order.
 
287
         */
 
288
        commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
 
289
 
 
290
        lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
 
291
                                      commit_count, endidx, handle);
 
292
        /*
 
293
         * Update used size at each commit. It's needed only for extracting
 
294
         * ring_buffer buffers from vmcore, after crash.
 
295
         */
 
296
        lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
 
297
                                             ctx->buf_offset, commit_count,
 
298
                                             ctx->slot_size, handle);
 
299
}
 
300
 
 
301
/**
 
302
 * lib_ring_buffer_try_discard_reserve - Try discarding a record.
 
303
 * @config: ring buffer instance configuration.
 
304
 * @ctx: ring buffer context. (input arguments only)
 
305
 *
 
306
 * Only succeeds if no other record has been written after the record to
 
307
 * discard. If discard fails, the record must be committed to the buffer.
 
308
 *
 
309
 * Returns 0 upon success, -EPERM if the record cannot be discarded.
 
310
 */
 
311
static inline
 
312
int lib_ring_buffer_try_discard_reserve(const struct lttng_ust_lib_ring_buffer_config *config,
 
313
                                        const struct lttng_ust_lib_ring_buffer_ctx *ctx)
 
314
{
 
315
        struct lttng_ust_lib_ring_buffer *buf = ctx->buf;
 
316
        unsigned long end_offset = ctx->pre_offset + ctx->slot_size;
 
317
 
 
318
        /*
 
319
         * We need to ensure that if the cmpxchg succeeds and discards the
 
320
         * record, the next record will record a full TSC, because it cannot
 
321
         * rely on the last_tsc associated with the discarded record to detect
 
322
         * overflows. The only way to ensure this is to set the last_tsc to 0
 
323
         * (assuming no 64-bit TSC overflow), which forces to write a 64-bit
 
324
         * timestamp in the next record.
 
325
         *
 
326
         * Note: if discard fails, we must leave the TSC in the record header.
 
327
         * It is needed to keep track of TSC overflows for the following
 
328
         * records.
 
329
         */
 
330
        save_last_tsc(config, buf, 0ULL);
 
331
 
 
332
        if (caa_likely(v_cmpxchg(config, &buf->offset, end_offset, ctx->pre_offset)
 
333
                   != end_offset))
 
334
                return -EPERM;
 
335
        else
 
336
                return 0;
 
337
}
 
338
 
 
339
static inline
 
340
void channel_record_disable(const struct lttng_ust_lib_ring_buffer_config *config,
 
341
                            struct channel *chan)
 
342
{
 
343
        uatomic_inc(&chan->record_disabled);
 
344
}
 
345
 
 
346
static inline
 
347
void channel_record_enable(const struct lttng_ust_lib_ring_buffer_config *config,
 
348
                           struct channel *chan)
 
349
{
 
350
        uatomic_dec(&chan->record_disabled);
 
351
}
 
352
 
 
353
static inline
 
354
void lib_ring_buffer_record_disable(const struct lttng_ust_lib_ring_buffer_config *config,
 
355
                                    struct lttng_ust_lib_ring_buffer *buf)
 
356
{
 
357
        uatomic_inc(&buf->record_disabled);
 
358
}
 
359
 
 
360
static inline
 
361
void lib_ring_buffer_record_enable(const struct lttng_ust_lib_ring_buffer_config *config,
 
362
                                   struct lttng_ust_lib_ring_buffer *buf)
 
363
{
 
364
        uatomic_dec(&buf->record_disabled);
 
365
}
 
366
 
 
367
#endif /* _LTTNG_RING_BUFFER_FRONTEND_API_H */