1
// ------------------------------------------------------------------------
2
// kvu_value_queue.cpp: A thread-safe way to transmit int-double pairs.
3
// Copyright (C) 1999,2004 Kai Vehmanen
6
// eca-style-version: 3
8
// This program is free software; you can redistribute it and/or modify
9
// it under the terms of the GNU General Public License as published by
10
// the Free Software Foundation; either version 2 of the License, or
11
// (at your option) any later version.
13
// This program is distributed in the hope that it will be useful,
14
// but WITHOUT ANY WARRANTY; without even the implied warranty of
15
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
// GNU General Public License for more details.
18
// You should have received a copy of the GNU General Public License
19
// along with this program; if not, write to the Free Software
20
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
// ------------------------------------------------------------------------
33
#include "kvu_value_queue.h"
35
/* ---------------------------------------------------------------------
41
/* ---------------------------------------------------------------------
46
#define KVU_NOTE_S(x) do { printf("%s:%d - %s\n", __FILE__, __LINE__, x); fflush(stdout); } while(0)
47
#define KVU_NOTE_SD(x, y) do { printf("%s:%d - %s=%d\n", __FILE__, __LINE__, x, y); fflush(stdout); } while(0)
49
#define KVU_NOTE_S(x) ((void) 0)
50
#define KVU_NOTE_SD(x, y) ((void) 0)
53
/* ---------------------------------------------------------------------
59
VALUE_QUEUE::VALUE_QUEUE(void)
61
pthread_mutex_init(&lock_rep, NULL);
62
pthread_cond_init(&cond_rep, NULL);
63
empty_rep = pair<int,double>(0, 0.0f);
66
void VALUE_QUEUE::push_back(int key, double value)
68
pthread_mutex_lock(&lock_rep);
69
cmds_rep.push_back(pair<int,double>(key, value));
70
pthread_cond_broadcast(&cond_rep);
71
pthread_mutex_unlock(&lock_rep);
74
void VALUE_QUEUE::pop_front(void)
77
DBC_REQUIRE(is_empty() == false);
79
pthread_mutex_lock(&lock_rep);
81
pthread_mutex_unlock(&lock_rep);
84
const pair<int,double>& VALUE_QUEUE::front(void)
87
DBC_REQUIRE(is_empty() == false);
89
pthread_mutex_lock(&lock_rep);
90
const pair<int,double>& s = cmds_rep.front();
91
pthread_mutex_unlock(&lock_rep);
95
void VALUE_QUEUE::poll(int timeout_sec,
96
long int timeout_usec)
99
struct timespec timeout;
102
pthread_mutex_lock(&lock_rep);
103
gettimeofday(&now, 0);
104
timeout.tv_sec = now.tv_sec + timeout_sec;
105
timeout.tv_nsec = now.tv_usec * 1000 + timeout_usec * 1000;
107
while (cmds_rep.empty() == true && retcode != ETIMEDOUT) {
108
retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout);
110
pthread_mutex_unlock(&lock_rep);
114
bool VALUE_QUEUE::is_empty(void) const
116
pthread_mutex_lock(&lock_rep);
117
bool result = cmds_rep.empty();
118
pthread_mutex_unlock(&lock_rep);
122
/************************************************************************/
125
* Default for maximum size of the queue for operation in
126
* bounded execution time mode.
128
static const size_t kvqr_bound_exec_max_size_const = 1024;
135
* Execution note: not bounded (may block, may allocate memory)
137
VALUE_QUEUE_RT_C::VALUE_QUEUE_RT_C(int bounded_exec_max_size)
138
: pending_pops_rep(0)
140
pthread_mutex_init(&lock_rep, NULL);
141
pthread_cond_init(&cond_rep, NULL);
142
if (bounded_exec_max_size == -1)
143
bounded_exec_max_size_rep = kvqr_bound_exec_max_size_const;
145
bounded_exec_max_size_rep = static_cast<size_t>(bounded_exec_max_size);
149
* Adds a new item to the end of the queue.
151
* Execution note: non-realtime (may block, may allocate memory)
153
void VALUE_QUEUE_RT_C::push_back(int key, double value)
155
pthread_mutex_lock(&lock_rep);
156
cmds_rep.push_back(pair<int,double>(key, value));
157
KVU_NOTE_SD("pushback-when-size=", cmds_rep.size());
158
pthread_cond_broadcast(&cond_rep);
159
pthread_mutex_unlock(&lock_rep);
163
* Removes the first item.
165
* Execution note: bounded
167
* @pre is_empty() != true
169
void VALUE_QUEUE_RT_C::pop_front(void)
171
int ret = pthread_mutex_trylock(&lock_rep);
173
cmds_rep.pop_front();
174
pthread_mutex_unlock(&lock_rep);
177
/* could not remove item, add to pending pops */
178
if (pending_pops_rep != cmds_rep.size()) {
180
KVU_NOTE_SD("add-pending-pop=", pending_pops_rep);
186
* Returns the first item.
188
* Execution note: bounded
190
* @pre is_empty() != true
191
* @return returns VALUE_QUEUE_RT_C::invalid_item() if temporarily
192
* unable to access the queue
194
const pair<int,double>* VALUE_QUEUE_RT_C::front(void)
196
pair<int,double>* s = &invalid_rep;
197
int ret = pthread_mutex_trylock(&lock_rep);
200
cmds_rep.size() >= bounded_execution_queue_size_limit()) {
201
/* queue has grown beyond the rt-safe maximum size,
202
* change to non-bounded mode to force synchronization
203
* between the producer and consumer threads
205
KVU_NOTE_SD("queue-limit-when-size=", cmds_rep.size());
206
ret = pthread_mutex_lock(&lock_rep);
210
/* now that we have the lock, we can safely process
211
* any pending pop requests */
212
DBC_CHECK(cmds_rep.size() - pending_pops_rep > 0);
213
while(pending_pops_rep > 0) {
214
cmds_rep.pop_front();
216
KVU_NOTE_SD("dec-pending-pop=", pending_pops_rep);
218
KVU_NOTE_SD("front-when-size=", cmds_rep.size());
219
s = &cmds_rep.front();
220
pthread_mutex_unlock(&lock_rep);
227
* Blocks until 'is_empty() != true'. 'timeout_sec' and
228
* 'timeout_usec' specify the upper time limit for blocking.
230
* Execution: not bounded (may block, may allocate memory)
232
* @pre is_empty() != true
234
void VALUE_QUEUE_RT_C::poll(int timeout_sec,
235
long int timeout_usec)
238
struct timespec timeout;
241
pthread_mutex_lock(&lock_rep);
242
gettimeofday(&now, 0);
243
timeout.tv_sec = now.tv_sec + timeout_sec;
244
timeout.tv_nsec = now.tv_usec * 1000 + timeout_usec * 1000;
246
while (cmds_rep.empty() == true && retcode != ETIMEDOUT) {
247
KVU_NOTE_S("poll-in");
248
retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout);
249
KVU_NOTE_S("poll-out");
251
pthread_mutex_unlock(&lock_rep);
258
* Execution note: bounded (may block, may allocate memory)
260
bool VALUE_QUEUE_RT_C::is_empty(void) const
263
int ret = pthread_mutex_trylock(&lock_rep);
266
cmds_rep.size() >= bounded_execution_queue_size_limit()) {
267
/* queue has grown beyond the rt-safe maximum size,
268
* change to non-bounded mode to force synchronization
269
* between the producer and consumer threads
271
KVU_NOTE_SD("queue-limit-when-size=", cmds_rep.size());
272
ret = pthread_mutex_lock(&lock_rep);
276
size = cmds_rep.size();
277
DBC_CHECK(size - pending_pops_rep >= 0);
278
pthread_mutex_unlock(&lock_rep);
281
return (size - pending_pops_rep) == 0;
284
size_t VALUE_QUEUE_RT_C::bounded_execution_queue_size_limit(void) const
286
return bounded_exec_max_size_rep;
289
/************************************************************************/