3
* Copyright 2006 Free Software Foundation, Inc.
5
* This file is part of GNU Radio.
7
* GNU Radio is free software; you can redistribute it and/or modify
8
* it under the terms of the GNU General Public License as published by
9
* the Free Software Foundation; either version 3, or (at your option)
12
* GNU Radio is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
* GNU General Public License for more details.
17
* You should have received a copy of the GNU General Public License
18
* along with GNU Radio; see the file COPYING. If not, write to
19
* the Free Software Foundation, Inc., 51 Franklin Street,
20
* Boston, MA 02110-1301, USA.
23
#ifndef _CIRCULAR_BUFFER_H_
24
#define _CIRCULAR_BUFFER_H_
26
#include "mld_threads.h"
32
#define DEBUG(X) do{X} while(0);
34
#define DEBUG(X) do{} while(0);
37
template <class T> class circular_buffer
43
// the following are in Items (type T)
44
UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
45
UInt32 d_n_avail_write_I, d_n_avail_read_I;
47
// stuff to control access to class internals
48
mld_mutex_ptr d_internal;
49
mld_condition_ptr d_readBlock, d_writeBlock;
51
// booleans to decide how to control reading, writing, and aborting
52
bool d_doWriteBlock, d_doFullRead, d_doAbort;
54
void delete_mutex_cond () {
70
circular_buffer (UInt32 bufLen_I,
71
bool doWriteBlock = true, bool doFullRead = false) {
73
throw std::runtime_error ("circular_buffer(): "
74
"Number of items to buffer must be > 0.\n");
75
d_bufLen_I = bufLen_I;
76
d_buffer = (T*) new T[d_bufLen_I];
77
d_doWriteBlock = doWriteBlock;
78
d_doFullRead = doFullRead;
80
d_readBlock = d_writeBlock = NULL;
82
DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
83
"doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
84
(d_doWriteBlock ? "true" : "false"),
85
(d_doFullRead ? "true" : "false")));
93
inline UInt32 n_avail_write_items () {
95
UInt32 retVal = d_n_avail_write_I;
96
d_internal->unlock ();
100
inline UInt32 n_avail_read_items () {
102
UInt32 retVal = d_n_avail_read_I;
103
d_internal->unlock ();
107
inline UInt32 buffer_length_items () {return (d_bufLen_I);};
108
inline bool do_write_block () {return (d_doWriteBlock);};
109
inline bool do_full_read () {return (d_doFullRead);};
113
bzero (d_buffer, d_bufLen_I * sizeof (T));
114
d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
115
d_n_avail_write_I = d_bufLen_I;
116
delete_mutex_cond ();
117
// create a mutex to handle contention of shared resources;
118
// any routine needed access to shared resources uses lock()
119
// before doing anything, then unlock() when finished.
120
d_internal = new mld_mutex ();
121
// link the internal mutex to the read and write conditions;
122
// when wait() is called, the internal mutex will automatically
123
// be unlock()'ed. Upon return (from a signal() to the condition),
124
// the internal mutex will be lock()'ed.
125
d_readBlock = new mld_condition (d_internal);
126
d_writeBlock = new mld_condition (d_internal);
130
* enqueue: add the given buffer of item-length to the queue,
131
* first-in-first-out (FIFO).
134
* buf: a pointer to the buffer holding the data
136
* bufLen_I: the buffer length in items (of the instantiated type)
139
* -1: on overflow (write is not blocking, and data is being
140
* written faster than it is being read)
141
* 0: if nothing to do (0 length buffer)
143
* 2: in the process of aborting, do doing nothing
145
* will throw runtime errors if inputs are improper:
146
* buffer pointer is NULL
147
* buffer length is larger than the instantiated buffer length
150
int enqueue (T* buf, UInt32 bufLen_I) {
151
DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
152
"#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
153
d_n_avail_write_I, d_n_avail_read_I));
154
if (bufLen_I > d_bufLen_I) {
155
fprintf (stderr, "cannot add buffer longer (%ld"
156
") than instantiated length (%ld"
157
").\n", bufLen_I, d_bufLen_I);
158
throw std::runtime_error ("circular_buffer::enqueue()");
164
throw std::runtime_error ("circular_buffer::enqueue(): "
165
"input buffer is NULL.\n");
168
d_internal->unlock ();
171
// set the return value to 1: success; change if needed
173
if (bufLen_I > d_n_avail_write_I) {
174
if (d_doWriteBlock) {
175
while (bufLen_I > d_n_avail_write_I) {
176
DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"));
177
// wait will automatically unlock() the internal mutex
178
d_writeBlock->wait ();
179
// and lock() it here.
181
d_internal->unlock ();
182
DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"));
185
DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"));
188
d_n_avail_read_I = d_bufLen_I - bufLen_I;
189
d_n_avail_write_I = bufLen_I;
190
DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"));
194
UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
195
if (n_now_I > bufLen_I)
197
else if (n_now_I < bufLen_I)
198
n_start_I = bufLen_I - n_now_I;
199
bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
201
bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
202
d_writeNdx_I = n_start_I;
204
d_writeNdx_I += n_now_I;
205
d_n_avail_read_I += bufLen_I;
206
d_n_avail_write_I -= bufLen_I;
207
d_readBlock->signal ();
208
d_internal->unlock ();
213
* dequeue: removes from the queue the number of items requested, or
214
* available, into the given buffer on a FIFO basis.
217
* buf: a pointer to the buffer into which to copy the data
219
* bufLen_I: pointer to the number of items to remove in items
220
* (of the instantiated type)
223
* 0: if nothing to do (0 length buffer)
225
* 2: in the process of aborting, do doing nothing
227
* will throw runtime errors if inputs are improper:
228
* buffer pointer is NULL
229
* buffer length pointer is NULL
230
* buffer length is larger than the instantiated buffer length
233
int dequeue (T* buf, UInt32* bufLen_I) {
234
DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
235
"#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
236
d_n_avail_write_I, d_n_avail_read_I));
238
throw std::runtime_error ("circular_buffer::dequeue(): "
239
"input bufLen pointer is NULL.\n");
241
throw std::runtime_error ("circular_buffer::dequeue(): "
242
"input buffer pointer is NULL.\n");
243
UInt32 l_bufLen_I = *bufLen_I;
246
if (l_bufLen_I > d_bufLen_I) {
247
fprintf (stderr, "cannot remove buffer longer (%ld"
248
") than instantiated length (%ld"
249
").\n", l_bufLen_I, d_bufLen_I);
250
throw std::runtime_error ("circular_buffer::dequeue()");
255
d_internal->unlock ();
259
while (d_n_avail_read_I < l_bufLen_I) {
260
DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"));
261
// wait will automatically unlock() the internal mutex
262
d_readBlock->wait ();
263
// and lock() it here.
265
d_internal->unlock ();
266
DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"));
269
DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"));
272
while (d_n_avail_read_I == 0) {
273
DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"));
274
// wait will automatically unlock() the internal mutex
275
d_readBlock->wait ();
276
// and lock() it here.
278
d_internal->unlock ();
279
DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"));
282
DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"));
285
if (l_bufLen_I > d_n_avail_read_I)
286
l_bufLen_I = d_n_avail_read_I;
287
UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
288
if (n_now_I > l_bufLen_I)
289
n_now_I = l_bufLen_I;
290
else if (n_now_I < l_bufLen_I)
291
n_start_I = l_bufLen_I - n_now_I;
292
bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
294
bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
295
d_readNdx_I = n_start_I;
297
d_readNdx_I += n_now_I;
298
*bufLen_I = l_bufLen_I;
299
d_n_avail_read_I -= l_bufLen_I;
300
d_n_avail_write_I += l_bufLen_I;
301
d_writeBlock->signal ();
302
d_internal->unlock ();
309
d_writeBlock->signal ();
310
d_readBlock->signal ();
311
d_internal->unlock ();
315
#endif /* _CIRCULAR_BUFFER_H_ */