~ubuntu-branches/ubuntu/lucid/ecasound2.2/lucid

« back to all changes in this revision

Viewing changes to kvutils/kvu_value_queue.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Junichi Uekawa
  • Date: 2005-04-14 09:15:48 UTC
  • Revision ID: james.westby@ubuntu.com-20050414091548-o7kgb47z0tcunh0s
Tags: upstream-2.4.1
ImportĀ upstreamĀ versionĀ 2.4.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// ------------------------------------------------------------------------
 
2
// kvu_value_queue.cpp: A thread-safe way to transmit int-double pairs.
 
3
// Copyright (C) 1999,2004 Kai Vehmanen
 
4
//
 
5
// Attributes:
 
6
//     eca-style-version: 3
 
7
//
 
8
// This program is free software; you can redistribute it and/or modify
 
9
// it under the terms of the GNU General Public License as published by
 
10
// the Free Software Foundation; either version 2 of the License, or
 
11
// (at your option) any later version.
 
12
// 
 
13
// This program is distributed in the hope that it will be useful,
 
14
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
// GNU General Public License for more details.
 
17
// 
 
18
// You should have received a copy of the GNU General Public License
 
19
// along with this program; if not, write to the Free Software
 
20
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
 
21
// ------------------------------------------------------------------------
 
22
 
 
23
#include <cstdio>
 
24
#include <deque>
 
25
#include <string>
 
26
 
 
27
#include <errno.h>
 
28
#include <unistd.h>
 
29
#include <pthread.h>
 
30
#include <sys/time.h>
 
31
 
 
32
#include "kvu_dbc.h"
 
33
#include "kvu_value_queue.h"
 
34
 
 
35
/* --------------------------------------------------------------------- 
 
36
 * Options
 
37
 */
 
38
 
 
39
// #define VERBOSE
 
40
 
 
41
/* --------------------------------------------------------------------- 
 
42
 * Test util macros
 
43
 */
 
44
 
 
45
#ifdef VERBOSE
 
46
#define KVU_NOTE_S(x)       do { printf("%s:%d - %s\n", __FILE__, __LINE__, x); fflush(stdout); } while(0)
 
47
#define KVU_NOTE_SD(x, y)   do { printf("%s:%d - %s=%d\n", __FILE__, __LINE__, x, y); fflush(stdout); } while(0)
 
48
#else
 
49
#define KVU_NOTE_S(x)       ((void) 0)
 
50
#define KVU_NOTE_SD(x, y)   ((void) 0)
 
51
#endif
 
52
 
 
53
/* --------------------------------------------------------------------- 
 
54
 * Definitions
 
55
 */
 
56
 
 
57
using namespace std;
 
58
 
 
59
VALUE_QUEUE::VALUE_QUEUE(void)
 
60
{
 
61
  pthread_mutex_init(&lock_rep, NULL);
 
62
  pthread_cond_init(&cond_rep, NULL);
 
63
  empty_rep = pair<int,double>(0, 0.0f);
 
64
}
 
65
 
 
66
void VALUE_QUEUE::push_back(int key, double value)
 
67
{
 
68
  pthread_mutex_lock(&lock_rep);
 
69
  cmds_rep.push_back(pair<int,double>(key, value));
 
70
  pthread_cond_broadcast(&cond_rep);
 
71
  pthread_mutex_unlock(&lock_rep);
 
72
}
 
73
 
 
74
void VALUE_QUEUE::pop_front(void)
 
75
{
 
76
  // --------
 
77
  DBC_REQUIRE(is_empty() == false);
 
78
  // --------
 
79
  pthread_mutex_lock(&lock_rep);
 
80
  cmds_rep.pop_front();
 
81
  pthread_mutex_unlock(&lock_rep);
 
82
}    
 
83
 
 
84
const pair<int,double>& VALUE_QUEUE::front(void)
 
85
{
 
86
  // --------
 
87
  DBC_REQUIRE(is_empty() == false);
 
88
  // --------
 
89
  pthread_mutex_lock(&lock_rep);
 
90
  const pair<int,double>& s = cmds_rep.front();
 
91
  pthread_mutex_unlock(&lock_rep);
 
92
  return s;
 
93
}
 
94
 
 
95
void VALUE_QUEUE::poll(int timeout_sec,
 
96
                       long int timeout_usec)
 
97
{
 
98
  struct timeval now;
 
99
  struct timespec timeout;
 
100
  int retcode;
 
101
 
 
102
  pthread_mutex_lock(&lock_rep);
 
103
  gettimeofday(&now, 0);
 
104
  timeout.tv_sec = now.tv_sec + timeout_sec;
 
105
  timeout.tv_nsec = now.tv_usec * 1000 + timeout_usec * 1000;
 
106
  retcode = 0;
 
107
  while (cmds_rep.empty() == true && retcode != ETIMEDOUT) {
 
108
    retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout);
 
109
  }
 
110
  pthread_mutex_unlock(&lock_rep);
 
111
  return;
 
112
}
 
113
 
 
114
bool VALUE_QUEUE::is_empty(void) const
 
115
{
 
116
  pthread_mutex_lock(&lock_rep);
 
117
  bool result = cmds_rep.empty(); 
 
118
  pthread_mutex_unlock(&lock_rep);
 
119
  return result;
 
120
}
 
121
 
 
122
/************************************************************************/
 
123
 
 
124
/**
 
125
 * Default for maximum size of the queue for operation in 
 
126
 * bounded execution time mode.
 
127
 */
 
128
static const size_t kvqr_bound_exec_max_size_const = 1024;
 
129
 
 
130
/**
 
131
 * Class constructor.
 
132
 *
 
133
 * @param 
 
134
 *
 
135
 * Execution note: not bounded (may block, may allocate memory)
 
136
 */
 
137
VALUE_QUEUE_RT_C::VALUE_QUEUE_RT_C(int bounded_exec_max_size)
 
138
  : pending_pops_rep(0)
 
139
{
 
140
  pthread_mutex_init(&lock_rep, NULL);
 
141
  pthread_cond_init(&cond_rep, NULL);
 
142
  if (bounded_exec_max_size == -1)
 
143
    bounded_exec_max_size_rep = kvqr_bound_exec_max_size_const;
 
144
  else 
 
145
    bounded_exec_max_size_rep = static_cast<size_t>(bounded_exec_max_size);
 
146
}
 
147
 
 
148
/**
 
149
 * Adds a new item to the end of the queue.
 
150
 *
 
151
 * Execution note: non-realtime (may block, may allocate memory)
 
152
 */
 
153
void VALUE_QUEUE_RT_C::push_back(int key, double value)
 
154
{
 
155
  pthread_mutex_lock(&lock_rep);
 
156
  cmds_rep.push_back(pair<int,double>(key, value));
 
157
  KVU_NOTE_SD("pushback-when-size=", cmds_rep.size());
 
158
  pthread_cond_broadcast(&cond_rep);
 
159
  pthread_mutex_unlock(&lock_rep);
 
160
}
 
161
 
 
162
/**
 
163
 * Removes the first item.
 
164
 *
 
165
 * Execution note: bounded
 
166
 *
 
167
 * @pre is_empty() != true
 
168
 */
 
169
void VALUE_QUEUE_RT_C::pop_front(void)
 
170
{
 
171
  int ret = pthread_mutex_trylock(&lock_rep);
 
172
  if (ret == 0) {
 
173
    cmds_rep.pop_front();
 
174
    pthread_mutex_unlock(&lock_rep);
 
175
  }
 
176
  else {
 
177
    /* could not remove item, add to pending pops */
 
178
    if (pending_pops_rep != cmds_rep.size()) {
 
179
      ++pending_pops_rep;
 
180
      KVU_NOTE_SD("add-pending-pop=", pending_pops_rep);
 
181
    }
 
182
  }
 
183
}    
 
184
 
 
185
/**
 
186
 * Returns the first item.
 
187
 *
 
188
 * Execution note: bounded
 
189
 *
 
190
 * @pre is_empty() != true
 
191
 * @return returns VALUE_QUEUE_RT_C::invalid_item() if temporarily 
 
192
 *         unable to access the queue
 
193
 */
 
194
const pair<int,double>* VALUE_QUEUE_RT_C::front(void)
 
195
{
 
196
  pair<int,double>* s = &invalid_rep;
 
197
  int ret = pthread_mutex_trylock(&lock_rep);
 
198
 
 
199
  if (ret != 0 && 
 
200
      cmds_rep.size() >= bounded_execution_queue_size_limit()) {
 
201
    /* queue has grown beyond the rt-safe maximum size, 
 
202
     * change to non-bounded mode to force synchronization
 
203
     * between the producer and consumer threads 
 
204
     */
 
205
    KVU_NOTE_SD("queue-limit-when-size=", cmds_rep.size());
 
206
    ret = pthread_mutex_lock(&lock_rep);
 
207
  }
 
208
 
 
209
  if (ret == 0) {
 
210
    /* now that we have the lock, we can safely process
 
211
     * any pending pop requests */
 
212
    DBC_CHECK(cmds_rep.size() - pending_pops_rep > 0);
 
213
    while(pending_pops_rep > 0) {
 
214
      cmds_rep.pop_front();
 
215
      --pending_pops_rep;
 
216
      KVU_NOTE_SD("dec-pending-pop=", pending_pops_rep);
 
217
    }
 
218
    KVU_NOTE_SD("front-when-size=", cmds_rep.size());
 
219
    s = &cmds_rep.front();
 
220
    pthread_mutex_unlock(&lock_rep);
 
221
  }
 
222
 
 
223
  return s;
 
224
}
 
225
 
 
226
/**
 
227
 * Blocks until 'is_empty() != true'. 'timeout_sec' and
 
228
 * 'timeout_usec' specify the upper time limit for blocking.
 
229
 *
 
230
 * Execution: not bounded (may block, may allocate memory)
 
231
 *
 
232
 * @pre is_empty() != true
 
233
 */
 
234
void VALUE_QUEUE_RT_C::poll(int timeout_sec,
 
235
                            long int timeout_usec)
 
236
{
 
237
  struct timeval now;
 
238
  struct timespec timeout;
 
239
  int retcode;
 
240
 
 
241
  pthread_mutex_lock(&lock_rep);
 
242
  gettimeofday(&now, 0);
 
243
  timeout.tv_sec = now.tv_sec + timeout_sec;
 
244
  timeout.tv_nsec = now.tv_usec * 1000 + timeout_usec * 1000;
 
245
  retcode = 0;
 
246
  while (cmds_rep.empty() == true && retcode != ETIMEDOUT) {
 
247
    KVU_NOTE_S("poll-in");
 
248
    retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout);
 
249
    KVU_NOTE_S("poll-out");
 
250
  }
 
251
  pthread_mutex_unlock(&lock_rep);
 
252
  return;
 
253
}
 
254
 
 
255
/**
 
256
 * Is queue empty?
 
257
 *
 
258
 * Execution note: bounded (may block, may allocate memory)
 
259
 */
 
260
bool VALUE_QUEUE_RT_C::is_empty(void) const
 
261
{
 
262
  size_t size = 0;
 
263
  int ret = pthread_mutex_trylock(&lock_rep);
 
264
 
 
265
  if (ret != 0 && 
 
266
      cmds_rep.size() >= bounded_execution_queue_size_limit()) {
 
267
    /* queue has grown beyond the rt-safe maximum size, 
 
268
     * change to non-bounded mode to force synchronization
 
269
     * between the producer and consumer threads 
 
270
     */
 
271
    KVU_NOTE_SD("queue-limit-when-size=", cmds_rep.size());
 
272
    ret = pthread_mutex_lock(&lock_rep);
 
273
  }
 
274
 
 
275
  if (ret == 0) {
 
276
    size = cmds_rep.size();
 
277
    DBC_CHECK(size - pending_pops_rep >= 0);
 
278
    pthread_mutex_unlock(&lock_rep);
 
279
  }
 
280
 
 
281
  return (size - pending_pops_rep) == 0;
 
282
}
 
283
 
 
284
size_t VALUE_QUEUE_RT_C::bounded_execution_queue_size_limit(void) const
 
285
{
 
286
  return bounded_exec_max_size_rep;
 
287
}
 
288
 
 
289
/************************************************************************/