~ubuntu-branches/ubuntu/saucy/drizzle/saucy-proposed

« back to all changes in this revision

Viewing changes to plugin/archive/concurrency_test.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-03-18 12:12:31 UTC
  • Revision ID: james.westby@ubuntu.com-20100318121231-k6g1xe6cshbwa0f8
Tags: upstream-2010.03.1347
ImportĀ upstreamĀ versionĀ 2010.03.1347

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 Sun Microsystems
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; version 2 of the License.
 
9
 *
 
10
 *  This program is distributed in the hope that it will be useful,
 
11
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 *  GNU General Public License for more details.
 
14
 *
 
15
 *  You should have received a copy of the GNU General Public License
 
16
 *  along with this program; if not, write to the Free Software
 
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
18
 */
 
19
 
 
20
/*
 
21
  Just a test application for threads.
 
22
  */
 
23
 
 
24
#include "config.h"
 
25
 
 
26
#include "azio.h"
 
27
#include "drizzled/my_getopt.h"
 
28
#include <stdio.h>
 
29
#include <stdlib.h>
 
30
#include <sys/types.h>
 
31
#include <sys/stat.h>
 
32
#include <sys/types.h>
 
33
#include <sys/mman.h>
 
34
#include <fcntl.h>
 
35
#include <sys/time.h>
 
36
#include <pthread.h>
 
37
#include <string.h>                             /* Pull in memset() */
 
38
#ifndef __WIN__
 
39
#include <sys/wait.h>
 
40
#endif
 
41
 
 
42
#ifdef __WIN__
 
43
#define srandom  srand
 
44
#define random   rand
 
45
#define snprintf _snprintf
 
46
#endif
 
47
 
 
48
#include "azio.h"
 
49
 
 
50
#define DEFAULT_INITIAL_LOAD 10000
 
51
#define DEFAULT_EXECUTE_SECONDS 120
 
52
#define TEST_FILENAME "concurrency_test.az"
 
53
 
 
54
#define HUGE_STRING_LENGTH 8192
 
55
 
 
56
/* Global Thread counter */
 
57
unsigned int thread_counter;
 
58
pthread_mutex_t counter_mutex;
 
59
pthread_cond_t count_threshhold;
 
60
unsigned int master_wakeup;
 
61
pthread_mutex_t sleeper_mutex;
 
62
pthread_cond_t sleep_threshhold;
 
63
static bool timer_alarm= false;
 
64
pthread_mutex_t timer_alarm_mutex;
 
65
pthread_cond_t timer_alarm_threshold;
 
66
 
 
67
pthread_mutex_t row_lock;
 
68
 
 
69
/* Prototypes */
 
70
extern "C" {
 
71
  void *run_concurrent_task(void *p);
 
72
  void *timer_thread(void *p);
 
73
}
 
74
void scheduler(az_method use_aio);
 
75
void create_data_file(azio_stream *write_handler, uint64_t rows);
 
76
unsigned int write_row(azio_stream *s);
 
77
 
 
78
typedef struct thread_context_st thread_context_st;
 
79
struct thread_context_st {
 
80
  unsigned int how_often_to_write;
 
81
  uint64_t counter;
 
82
  az_method use_aio;
 
83
  azio_stream *writer;
 
84
};
 
85
 
 
86
/* Use this for string generation */
 
87
static const char ALPHANUMERICS[]=
 
88
  "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
 
89
 
 
90
#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
 
91
 
 
92
static void get_random_string(char *buffer, size_t size)
 
93
{
 
94
  char *buffer_ptr= buffer;
 
95
 
 
96
  while (--size)
 
97
    *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
 
98
  *buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
 
99
}
 
100
 
 
101
int main(int argc, char *argv[])
 
102
{
 
103
 
 
104
  unsigned int method;
 
105
  drizzled::internal::my_init();
 
106
 
 
107
  MY_INIT(argv[0]);
 
108
 
 
109
  if (argc > 1)
 
110
    exit(1);
 
111
 
 
112
  srandom(time(NULL));
 
113
 
 
114
  pthread_mutex_init(&counter_mutex, NULL);
 
115
  pthread_cond_init(&count_threshhold, NULL);
 
116
  pthread_mutex_init(&sleeper_mutex, NULL);
 
117
  pthread_cond_init(&sleep_threshhold, NULL);
 
118
  pthread_mutex_init(&timer_alarm_mutex, NULL);
 
119
  pthread_cond_init(&timer_alarm_threshold, NULL);
 
120
  pthread_mutex_init(&row_lock, NULL);
 
121
 
 
122
  for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
 
123
    scheduler((az_method)method);
 
124
 
 
125
  (void)pthread_mutex_destroy(&counter_mutex);
 
126
  (void)pthread_cond_destroy(&count_threshhold);
 
127
  (void)pthread_mutex_destroy(&sleeper_mutex);
 
128
  (void)pthread_cond_destroy(&sleep_threshhold);
 
129
  pthread_mutex_destroy(&timer_alarm_mutex);
 
130
  pthread_cond_destroy(&timer_alarm_threshold);
 
131
  pthread_mutex_destroy(&row_lock);
 
132
 
 
133
  return 0;
 
134
}
 
135
 
 
136
void scheduler(az_method use_aio)
 
137
{
 
138
  unsigned int x;
 
139
  uint64_t total;
 
140
  azio_stream writer_handle;
 
141
  thread_context_st *context;
 
142
  pthread_t mainthread;            /* Thread descriptor */
 
143
  pthread_attr_t attr;          /* Thread attributes */
 
144
 
 
145
  pthread_attr_init(&attr);
 
146
  pthread_attr_setdetachstate(&attr,
 
147
                              PTHREAD_CREATE_DETACHED);
 
148
 
 
149
  pthread_mutex_lock(&counter_mutex);
 
150
  thread_counter= 0;
 
151
 
 
152
  create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD);
 
153
 
 
154
  pthread_mutex_lock(&sleeper_mutex);
 
155
  master_wakeup= 1;
 
156
  pthread_mutex_unlock(&sleeper_mutex);
 
157
 
 
158
  context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
 
159
  memset(context, 0, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
 
160
 
 
161
  if (!context)
 
162
  {
 
163
    fprintf(stderr, "Could not allocate memory for context\n");
 
164
    exit(1);
 
165
  }
 
166
 
 
167
  for (x= 0; x < DEFAULT_CONCURRENCY; x++)
 
168
  {
 
169
 
 
170
    context[x].how_often_to_write= random()%1000;
 
171
    context[x].writer= &writer_handle;
 
172
    context[x].counter= 0;
 
173
    context[x].use_aio= use_aio;
 
174
 
 
175
    /* now you create the thread */
 
176
    if (pthread_create(&mainthread, &attr, run_concurrent_task,
 
177
                       (void *)context) != 0)
 
178
    {
 
179
      fprintf(stderr,"Could not create thread\n");
 
180
      exit(1);
 
181
    }
 
182
    thread_counter++;
 
183
  }
 
184
 
 
185
  if (DEFAULT_EXECUTE_SECONDS)
 
186
  {
 
187
    time_t opt_timer_length= DEFAULT_EXECUTE_SECONDS;
 
188
    pthread_mutex_lock(&timer_alarm_mutex);
 
189
    timer_alarm= true;
 
190
    pthread_mutex_unlock(&timer_alarm_mutex);
 
191
 
 
192
    if (pthread_create(&mainthread, &attr, timer_thread,
 
193
                       (void *)&opt_timer_length) != 0)
 
194
    {
 
195
      fprintf(stderr,"%s: Could not create timer thread\n", drizzled::internal::my_progname);
 
196
      exit(1);
 
197
    }
 
198
  }
 
199
 
 
200
  pthread_mutex_unlock(&counter_mutex);
 
201
  pthread_attr_destroy(&attr);
 
202
 
 
203
  pthread_mutex_lock(&sleeper_mutex);
 
204
  master_wakeup= 0;
 
205
  pthread_mutex_unlock(&sleeper_mutex);
 
206
  pthread_cond_broadcast(&sleep_threshhold);
 
207
 
 
208
  /*
 
209
    We loop until we know that all children have cleaned up.
 
210
  */
 
211
  pthread_mutex_lock(&counter_mutex);
 
212
  while (thread_counter)
 
213
  {
 
214
    struct timespec abstime;
 
215
 
 
216
    memset(&abstime, 0, sizeof(struct timespec));
 
217
    abstime.tv_sec= 1;
 
218
 
 
219
    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
 
220
  }
 
221
  pthread_mutex_unlock(&counter_mutex);
 
222
 
 
223
  for (total= x= 0; x < DEFAULT_CONCURRENCY; x++)
 
224
    total+= context[x].counter;
 
225
 
 
226
  free(context);
 
227
  azclose(&writer_handle);
 
228
 
 
229
  printf("Read %"PRIu64" rows\n", total);
 
230
}
 
231
 
 
232
void *timer_thread(void *p)
 
233
{
 
234
  time_t *timer_length= (time_t *)p;
 
235
  struct timespec abstime;
 
236
 
 
237
  /*
 
238
    We lock around the initial call in case were we in a loop. This
 
239
    also keeps the value properly syncronized across call threads.
 
240
  */
 
241
  pthread_mutex_lock(&sleeper_mutex);
 
242
  while (master_wakeup)
 
243
  {
 
244
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
 
245
  }
 
246
  pthread_mutex_unlock(&sleeper_mutex);
 
247
 
 
248
  set_timespec(abstime, *timer_length);
 
249
 
 
250
  pthread_mutex_lock(&timer_alarm_mutex);
 
251
  pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
 
252
  pthread_mutex_unlock(&timer_alarm_mutex);
 
253
 
 
254
  pthread_mutex_lock(&timer_alarm_mutex);
 
255
  timer_alarm= false;
 
256
  pthread_mutex_unlock(&timer_alarm_mutex);
 
257
 
 
258
  return 0;
 
259
}
 
260
 
 
261
void *run_concurrent_task(void *p)
 
262
{
 
263
  thread_context_st *context= (thread_context_st *)p;
 
264
  uint64_t count;
 
265
  int ret;
 
266
  int error;
 
267
  azio_stream reader_handle;
 
268
 
 
269
  if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY,
 
270
                    context->use_aio)))
 
271
  {
 
272
    printf("Could not open test file\n");
 
273
    return 0;
 
274
  }
 
275
 
 
276
  pthread_mutex_lock(&sleeper_mutex);
 
277
  while (master_wakeup)
 
278
  {
 
279
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
 
280
  }
 
281
  pthread_mutex_unlock(&sleeper_mutex);
 
282
 
 
283
  /* Do Stuff */
 
284
  count= 0;
 
285
  while (1)
 
286
  {
 
287
    azread_init(&reader_handle);
 
288
    while ((ret= azread_row(&reader_handle, &error)))
 
289
      context->counter++;
 
290
 
 
291
    if (count % context->how_often_to_write)
 
292
    {
 
293
      write_row(context->writer);
 
294
    }
 
295
 
 
296
    /* If the timer is set, and the alarm is not active then end */
 
297
    if (timer_alarm == false)
 
298
      break;
 
299
  }
 
300
 
 
301
  pthread_mutex_lock(&counter_mutex);
 
302
  thread_counter--;
 
303
  pthread_cond_signal(&count_threshhold);
 
304
  pthread_mutex_unlock(&counter_mutex);
 
305
  azclose(&reader_handle);
 
306
 
 
307
  return NULL;
 
308
}
 
309
 
 
310
void create_data_file(azio_stream *write_handler, uint64_t rows)
 
311
{
 
312
  int ret;
 
313
  uint64_t x;
 
314
 
 
315
  if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC,
 
316
                    AZ_METHOD_BLOCK)))
 
317
  {
 
318
    printf("Could not create test file\n");
 
319
    exit(1);
 
320
  }
 
321
 
 
322
  for (x= 0; x < rows; x++)
 
323
    write_row(write_handler);
 
324
 
 
325
  azflush(write_handler, Z_SYNC_FLUSH);
 
326
}
 
327
 
 
328
unsigned int write_row(azio_stream *s)
 
329
{
 
330
  size_t length;
 
331
  char buffer[HUGE_STRING_LENGTH];
 
332
 
 
333
  length= random() % HUGE_STRING_LENGTH;
 
334
 
 
335
  /* Avoid zero length strings */
 
336
  length++;
 
337
 
 
338
  get_random_string(buffer, length);
 
339
  pthread_mutex_lock(&row_lock);
 
340
  azwrite_row(s, buffer, length);
 
341
  pthread_mutex_unlock(&row_lock);
 
342
 
 
343
  return 0;
 
344
}