~mordred/libmemcached/fix-weird-link

« back to all changes in this revision

Viewing changes to src/memslap.c

  • Committer: brian@gir-2.local
  • Date: 2008-03-10 15:04:41 UTC
  • mto: (317.6.1)
  • mto: This revision was merged to the branch mainline in revision 321.
  • Revision ID: brian@gir-2.local-20080310150441-jyhbjx6bwo46f6tg
Huge refactoring of directory structure.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#include <stdio.h>
2
 
#include <stdlib.h>
3
 
#include <string.h>
4
 
#include <sys/types.h>
5
 
#include <sys/stat.h>
6
 
#include <sys/types.h>
7
 
#include <sys/mman.h>
8
 
#include <fcntl.h>
9
 
#include <assert.h>
10
 
#include <sys/time.h>
11
 
#include <getopt.h>
12
 
#include <pthread.h>
13
 
 
14
 
#include <memcached.h>
15
 
 
16
 
#include "client_options.h"
17
 
#include "utilities.h"
18
 
#include "generator.h"
19
 
#include "execute.h"
20
 
 
21
 
#define DEFAULT_INITIAL_LOAD 10000
22
 
#define DEFAULT_EXECUTE_NUMBER 10000
23
 
#define DEFAULT_CONCURRENCY 1
24
 
 
25
 
#define PROGRAM_NAME "memslap"
26
 
#define PROGRAM_DESCRIPTION "Generates a load against a memcached custer of servers."
27
 
 
28
 
/* Global Thread counter */
29
 
volatile unsigned int thread_counter;
30
 
pthread_mutex_t counter_mutex;
31
 
pthread_cond_t count_threshhold;
32
 
volatile unsigned int master_wakeup;
33
 
pthread_mutex_t sleeper_mutex;
34
 
pthread_cond_t sleep_threshhold;
35
 
 
36
 
void *run_task(void *p);
37
 
 
38
 
/* Types */
39
 
typedef struct conclusions_st conclusions_st;
40
 
typedef struct thread_context_st thread_context_st;
41
 
typedef enum {
42
 
  SET_TEST,
43
 
  GET_TEST,
44
 
} test_type;
45
 
 
46
 
struct thread_context_st {
47
 
  unsigned int key_count;
48
 
  pairs_st *initial_pairs;
49
 
  unsigned int initial_number;
50
 
  pairs_st *execute_pairs;
51
 
  unsigned int execute_number;
52
 
  test_type test;
53
 
  memcached_st *memc;
54
 
};
55
 
 
56
 
struct conclusions_st {
57
 
  long int load_time;
58
 
  long int read_time;
59
 
  unsigned int rows_loaded;
60
 
  unsigned int rows_read;
61
 
};
62
 
 
63
 
/* Prototypes */
64
 
void options_parse(int argc, char *argv[]);
65
 
void conclusions_print(conclusions_st *conclusion);
66
 
void scheduler(memcached_server_st *servers, conclusions_st *conclusion);
67
 
pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, 
68
 
                           unsigned int *actual_loaded);
69
 
void flush_all(memcached_st *memc);
70
 
 
71
 
static int opt_verbose= 0;
72
 
static int opt_flush= 0;
73
 
static int opt_non_blocking_io= 0;
74
 
static int opt_tcp_nodelay= 0;
75
 
static unsigned int opt_execute_number= 0;
76
 
static unsigned int opt_createial_load= 0;
77
 
static unsigned int opt_concurrency= 0;
78
 
static int opt_displayflag= 0;
79
 
static char *opt_servers= NULL;
80
 
test_type opt_test= SET_TEST;
81
 
 
82
 
int main(int argc, char *argv[])
83
 
{
84
 
  conclusions_st conclusion;
85
 
  memcached_server_st *servers;
86
 
 
87
 
  memset(&conclusion, 0, sizeof(conclusions_st));
88
 
 
89
 
  srandom(time(NULL));
90
 
  options_parse(argc, argv);
91
 
 
92
 
  if (!opt_servers)
93
 
  {
94
 
    char *temp;
95
 
 
96
 
    if ((temp= getenv("MEMCACHED_SERVERS")))
97
 
      opt_servers= strdup(temp);
98
 
    else
99
 
    {
100
 
      fprintf(stderr, "No Servers provided\n");
101
 
      exit(1);
102
 
    }
103
 
  }
104
 
 
105
 
  servers= memcached_servers_parse(opt_servers);
106
 
 
107
 
  pthread_mutex_init(&counter_mutex, NULL);
108
 
  pthread_cond_init(&count_threshhold, NULL);
109
 
  pthread_mutex_init(&sleeper_mutex, NULL);
110
 
  pthread_cond_init(&sleep_threshhold, NULL);
111
 
 
112
 
  scheduler(servers, &conclusion);
113
 
 
114
 
  free(opt_servers);
115
 
 
116
 
  (void)pthread_mutex_destroy(&counter_mutex);
117
 
  (void)pthread_cond_destroy(&count_threshhold);
118
 
  (void)pthread_mutex_destroy(&sleeper_mutex);
119
 
  (void)pthread_cond_destroy(&sleep_threshhold);
120
 
  conclusions_print(&conclusion);
121
 
  memcached_server_list_free(servers);
122
 
 
123
 
  return 0;
124
 
}
125
 
 
126
 
void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
127
 
{
128
 
  unsigned int x;
129
 
  unsigned int actual_loaded= 0; /* Fix warning */
130
 
  memcached_st *memc;
131
 
 
132
 
  struct timeval start_time, end_time;
133
 
  pthread_t mainthread;            /* Thread descriptor */
134
 
  pthread_attr_t attr;          /* Thread attributes */
135
 
  pairs_st *pairs= NULL;
136
 
 
137
 
  pthread_attr_init(&attr);
138
 
  pthread_attr_setdetachstate(&attr,
139
 
                              PTHREAD_CREATE_DETACHED);
140
 
 
141
 
  memc= memcached_create(NULL);
142
 
  memcached_server_push(memc, servers);
143
 
 
144
 
  if (opt_flush)
145
 
    flush_all(memc);
146
 
  if (opt_createial_load)
147
 
    pairs= load_create_data(memc, opt_createial_load, &actual_loaded);
148
 
 
149
 
  /* We set this after we have loaded */
150
 
  {
151
 
    unsigned int value= 1;
152
 
    if (opt_non_blocking_io)
153
 
      memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &value);
154
 
    if (opt_tcp_nodelay)
155
 
      memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &value);
156
 
  }
157
 
 
158
 
 
159
 
  pthread_mutex_lock(&counter_mutex);
160
 
  thread_counter= 0;
161
 
 
162
 
  pthread_mutex_lock(&sleeper_mutex);
163
 
  master_wakeup= 1;
164
 
  pthread_mutex_unlock(&sleeper_mutex);
165
 
 
166
 
  for (x= 0; x < opt_concurrency; x++)
167
 
  {
168
 
    thread_context_st *context;
169
 
    context= (thread_context_st *)malloc(sizeof(thread_context_st));
170
 
    memset(context, 0, sizeof(thread_context_st));
171
 
 
172
 
    context->memc= memcached_clone(NULL, memc);
173
 
    context->test= opt_test;
174
 
 
175
 
    context->initial_pairs= pairs;
176
 
    context->initial_number= actual_loaded;
177
 
 
178
 
    if (opt_test == SET_TEST)
179
 
    {
180
 
      context->execute_pairs= pairs_generate(opt_execute_number, 400);
181
 
      context->execute_number= opt_execute_number;
182
 
    }
183
 
 
184
 
    /* now you create the thread */
185
 
    if (pthread_create(&mainthread, &attr, run_task,
186
 
                       (void *)context) != 0)
187
 
    {
188
 
      fprintf(stderr,"Could not create thread\n");
189
 
      exit(1);
190
 
    }
191
 
    thread_counter++;
192
 
  }
193
 
 
194
 
  pthread_mutex_unlock(&counter_mutex);
195
 
  pthread_attr_destroy(&attr);
196
 
 
197
 
  pthread_mutex_lock(&sleeper_mutex);
198
 
  master_wakeup= 0;
199
 
  pthread_mutex_unlock(&sleeper_mutex);
200
 
  pthread_cond_broadcast(&sleep_threshhold);
201
 
 
202
 
  gettimeofday(&start_time, NULL);
203
 
  /*
204
 
    We loop until we know that all children have cleaned up.
205
 
  */
206
 
  pthread_mutex_lock(&counter_mutex);
207
 
  while (thread_counter)
208
 
  {
209
 
    struct timespec abstime;
210
 
 
211
 
    memset(&abstime, 0, sizeof(struct timespec));
212
 
    abstime.tv_sec= 1;
213
 
 
214
 
    pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
215
 
  }
216
 
  pthread_mutex_unlock(&counter_mutex);
217
 
 
218
 
  gettimeofday(&end_time, NULL);
219
 
 
220
 
  conclusion->load_time= timedif(end_time, start_time);
221
 
  conclusion->read_time= timedif(end_time, start_time);
222
 
  pairs_free(pairs);
223
 
}
224
 
 
225
 
void options_parse(int argc, char *argv[])
226
 
{
227
 
  memcached_programs_help_st help_options[]=
228
 
  {
229
 
    {0},
230
 
  };
231
 
 
232
 
  static struct option long_options[]=
233
 
    {
234
 
      {"concurrency", required_argument, NULL, OPT_SLAP_CONCURRENCY},
235
 
      {"debug", no_argument, &opt_verbose, OPT_DEBUG},
236
 
      {"execute-number", required_argument, NULL, OPT_SLAP_EXECUTE_NUMBER},
237
 
      {"flag", no_argument, &opt_displayflag, OPT_FLAG},
238
 
      {"flush", no_argument, &opt_flush, OPT_FLUSH},
239
 
      {"help", no_argument, NULL, OPT_HELP},
240
 
      {"initial-load", required_argument, NULL, OPT_SLAP_INITIAL_LOAD}, /* Number to load initially */
241
 
      {"non-blocking", no_argument, &opt_non_blocking_io, OPT_SLAP_NON_BLOCK},
242
 
      {"servers", required_argument, NULL, OPT_SERVERS},
243
 
      {"tcp-nodelay", no_argument, &opt_tcp_nodelay, OPT_SLAP_TCP_NODELAY},
244
 
      {"test", required_argument, NULL, OPT_SLAP_TEST},
245
 
      {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
246
 
      {"version", no_argument, NULL, OPT_VERSION},
247
 
      {0, 0, 0, 0},
248
 
    };
249
 
 
250
 
  int option_index= 0;
251
 
  int option_rv;
252
 
 
253
 
  while (1) 
254
 
  {
255
 
    option_rv= getopt_long(argc, argv, "Vhvds:", long_options, &option_index);
256
 
    if (option_rv == -1) break;
257
 
    switch (option_rv)
258
 
    {
259
 
    case 0:
260
 
      break;
261
 
    case OPT_VERBOSE: /* --verbose or -v */
262
 
      opt_verbose = OPT_VERBOSE;
263
 
      break;
264
 
    case OPT_DEBUG: /* --debug or -d */
265
 
      opt_verbose = OPT_DEBUG;
266
 
      break;
267
 
    case OPT_VERSION: /* --version or -V */
268
 
      version_command(PROGRAM_NAME);
269
 
      break;
270
 
    case OPT_HELP: /* --help or -h */
271
 
      help_command(PROGRAM_NAME, PROGRAM_DESCRIPTION, long_options, help_options);
272
 
      break;
273
 
    case OPT_SERVERS: /* --servers or -s */
274
 
      opt_servers= strdup(optarg);
275
 
      break;
276
 
    case OPT_SLAP_TEST:
277
 
      if (!strcmp(optarg, "get"))
278
 
        opt_test= GET_TEST ;
279
 
      else if (!strcmp(optarg, "set"))
280
 
        opt_test= SET_TEST;
281
 
      else 
282
 
      {
283
 
        fprintf(stderr, "Your test, %s, is not a known test\n", optarg);
284
 
        exit(1);
285
 
      }
286
 
      break;
287
 
    case OPT_SLAP_CONCURRENCY:
288
 
      opt_concurrency= strtol(optarg, (char **)NULL, 10);
289
 
    case OPT_SLAP_EXECUTE_NUMBER:
290
 
      opt_execute_number= strtol(optarg, (char **)NULL, 10);
291
 
      break;
292
 
    case OPT_SLAP_INITIAL_LOAD:
293
 
      opt_createial_load= strtol(optarg, (char **)NULL, 10);
294
 
      break;
295
 
    case '?':
296
 
      /* getopt_long already printed an error message. */
297
 
      exit(1);
298
 
    default:
299
 
      abort();
300
 
    }
301
 
  }
302
 
 
303
 
  if (opt_test == GET_TEST && opt_createial_load == 0)
304
 
    opt_createial_load= DEFAULT_INITIAL_LOAD;
305
 
 
306
 
  if (opt_execute_number == 0)
307
 
    opt_execute_number= DEFAULT_EXECUTE_NUMBER;
308
 
 
309
 
  if (opt_concurrency == 0)
310
 
    opt_concurrency= DEFAULT_CONCURRENCY;
311
 
}
312
 
 
313
 
void conclusions_print(conclusions_st *conclusion)
314
 
{
315
 
  printf("\tThreads connecting to servers %u\n", opt_concurrency);
316
 
#ifdef NOT_FINISHED
317
 
  printf("\tLoaded %u rows\n", conclusion->rows_loaded);
318
 
  printf("\tRead %u rows\n", conclusion->rows_read);
319
 
#endif
320
 
  if (opt_test == SET_TEST)
321
 
    printf("\tTook %ld.%03ld seconds to load data\n", conclusion->load_time / 1000, 
322
 
           conclusion->load_time % 1000);
323
 
  else
324
 
    printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000, 
325
 
           conclusion->read_time % 1000);
326
 
}
327
 
 
328
 
void *run_task(void *p)
329
 
{
330
 
  thread_context_st *context= (thread_context_st *)p;
331
 
  memcached_st *memc;
332
 
 
333
 
  memc= context->memc;
334
 
 
335
 
  pthread_mutex_lock(&sleeper_mutex);
336
 
  while (master_wakeup)
337
 
  {
338
 
    pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
339
 
  } 
340
 
  pthread_mutex_unlock(&sleeper_mutex);
341
 
 
342
 
  /* Do Stuff */
343
 
  switch (context->test)
344
 
  {
345
 
  case SET_TEST:
346
 
    execute_set(memc, context->execute_pairs, context->execute_number);
347
 
    break;
348
 
  case GET_TEST:
349
 
    execute_get(memc, context->initial_pairs, context->initial_number);
350
 
    break;
351
 
  }
352
 
 
353
 
  memcached_free(memc);
354
 
 
355
 
  if (context->execute_pairs)
356
 
    pairs_free(context->execute_pairs);
357
 
  free(context);
358
 
 
359
 
  pthread_mutex_lock(&counter_mutex);
360
 
  thread_counter--;
361
 
  pthread_cond_signal(&count_threshhold);
362
 
  pthread_mutex_unlock(&counter_mutex);
363
 
 
364
 
  return NULL;
365
 
}
366
 
 
367
 
void flush_all(memcached_st *memc)
368
 
{
369
 
  memcached_flush(memc, 0);
370
 
}
371
 
 
372
 
pairs_st *load_create_data(memcached_st *memc, unsigned int number_of, 
373
 
                           unsigned int *actual_loaded)
374
 
{
375
 
  memcached_st *clone;
376
 
  pairs_st *pairs;
377
 
 
378
 
  clone= memcached_clone(NULL, memc);
379
 
  /* We always used non-blocking IO for load since it is faster */
380
 
  memcached_behavior_set(clone, MEMCACHED_BEHAVIOR_NO_BLOCK, NULL );
381
 
 
382
 
  pairs= pairs_generate(number_of, 400);
383
 
  *actual_loaded= execute_set(clone, pairs, number_of);
384
 
 
385
 
  memcached_free(clone);
386
 
 
387
 
  return pairs;
388
 
}