~ubuntu-branches/ubuntu/wily/grpc/wily

« back to all changes in this revision

Viewing changes to test/core/surface/completion_queue_test.c

  • Committer: Package Import Robot
  • Author(s): Andrew Pollock
  • Date: 2015-05-07 13:28:11 UTC
  • Revision ID: package-import@ubuntu.com-20150507132811-ybm4hfq73tnvvd2e
Tags: upstream-0.10.0
ImportĀ upstreamĀ versionĀ 0.10.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 *
 
3
 * Copyright 2015, Google Inc.
 
4
 * All rights reserved.
 
5
 *
 
6
 * Redistribution and use in source and binary forms, with or without
 
7
 * modification, are permitted provided that the following conditions are
 
8
 * met:
 
9
 *
 
10
 *     * Redistributions of source code must retain the above copyright
 
11
 * notice, this list of conditions and the following disclaimer.
 
12
 *     * Redistributions in binary form must reproduce the above
 
13
 * copyright notice, this list of conditions and the following disclaimer
 
14
 * in the documentation and/or other materials provided with the
 
15
 * distribution.
 
16
 *     * Neither the name of Google Inc. nor the names of its
 
17
 * contributors may be used to endorse or promote products derived from
 
18
 * this software without specific prior written permission.
 
19
 *
 
20
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
21
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
22
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
23
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
24
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
25
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
26
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
27
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
28
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
29
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
30
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
31
 *
 
32
 */
 
33
 
 
34
#include "src/core/surface/completion_queue.h"
 
35
 
 
36
#include "src/core/iomgr/iomgr.h"
 
37
#include <grpc/support/alloc.h>
 
38
#include <grpc/support/log.h>
 
39
#include <grpc/support/thd.h>
 
40
#include <grpc/support/time.h>
 
41
#include <grpc/support/useful.h>
 
42
#include "test/core/util/test_config.h"
 
43
 
 
44
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
 
45
 
 
46
static void *create_test_tag(void) {
 
47
  static gpr_intptr i = 0;
 
48
  return (void *)(++i);
 
49
}
 
50
 
 
51
/* helper for tests to shutdown correctly and tersely */
 
52
static void shutdown_and_destroy(grpc_completion_queue *cc) {
 
53
  grpc_event ev;
 
54
  grpc_completion_queue_shutdown(cc);
 
55
  ev = grpc_completion_queue_next(cc, gpr_inf_past);
 
56
  GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
 
57
  grpc_completion_queue_destroy(cc);
 
58
}
 
59
 
 
60
/* ensure we can create and destroy a completion channel */
 
61
static void test_no_op(void) {
 
62
  LOG_TEST("test_no_op");
 
63
  shutdown_and_destroy(grpc_completion_queue_create());
 
64
}
 
65
 
 
66
static void test_wait_empty(void) {
 
67
  grpc_completion_queue *cc;
 
68
 
 
69
  LOG_TEST("test_wait_empty");
 
70
 
 
71
  cc = grpc_completion_queue_create();
 
72
  GPR_ASSERT(grpc_completion_queue_next(cc, gpr_now()).type ==
 
73
             GRPC_QUEUE_TIMEOUT);
 
74
  shutdown_and_destroy(cc);
 
75
}
 
76
 
 
77
static void test_cq_end_op(void) {
 
78
  grpc_event ev;
 
79
  grpc_completion_queue *cc;
 
80
  void *tag = create_test_tag();
 
81
 
 
82
  LOG_TEST("test_cq_end_op");
 
83
 
 
84
  cc = grpc_completion_queue_create();
 
85
 
 
86
  grpc_cq_begin_op(cc, NULL);
 
87
  grpc_cq_end_op(cc, tag, NULL, 1);
 
88
 
 
89
  ev = grpc_completion_queue_next(cc, gpr_inf_past);
 
90
  GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
 
91
  GPR_ASSERT(ev.tag == tag);
 
92
  GPR_ASSERT(ev.success);
 
93
 
 
94
  shutdown_and_destroy(cc);
 
95
}
 
96
 
 
97
static void test_shutdown_then_next_polling(void) {
 
98
  grpc_completion_queue *cc;
 
99
  LOG_TEST("test_shutdown_then_next_polling");
 
100
 
 
101
  cc = grpc_completion_queue_create();
 
102
  grpc_completion_queue_shutdown(cc);
 
103
  GPR_ASSERT(grpc_completion_queue_next(cc, gpr_inf_past).type ==
 
104
             GRPC_QUEUE_SHUTDOWN);
 
105
  grpc_completion_queue_destroy(cc);
 
106
}
 
107
 
 
108
static void test_shutdown_then_next_with_timeout(void) {
 
109
  grpc_completion_queue *cc;
 
110
  LOG_TEST("test_shutdown_then_next_with_timeout");
 
111
 
 
112
  cc = grpc_completion_queue_create();
 
113
  grpc_completion_queue_shutdown(cc);
 
114
  GPR_ASSERT(grpc_completion_queue_next(cc, gpr_inf_future).type ==
 
115
             GRPC_QUEUE_SHUTDOWN);
 
116
  grpc_completion_queue_destroy(cc);
 
117
}
 
118
 
 
119
static void test_pluck(void) {
 
120
  grpc_event ev;
 
121
  grpc_completion_queue *cc;
 
122
  void *tags[128];
 
123
  unsigned i, j;
 
124
 
 
125
  LOG_TEST("test_pluck");
 
126
 
 
127
  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
 
128
    tags[i] = create_test_tag();
 
129
    for (j = 0; j < i; j++) {
 
130
      GPR_ASSERT(tags[i] != tags[j]);
 
131
    }
 
132
  }
 
133
 
 
134
  cc = grpc_completion_queue_create();
 
135
 
 
136
  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
 
137
    grpc_cq_begin_op(cc, NULL);
 
138
    grpc_cq_end_op(cc, tags[i], NULL, 1);
 
139
  }
 
140
 
 
141
  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
 
142
    ev = grpc_completion_queue_pluck(cc, tags[i], gpr_inf_past);
 
143
    GPR_ASSERT(ev.tag == tags[i]);
 
144
  }
 
145
 
 
146
  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
 
147
    grpc_cq_begin_op(cc, NULL);
 
148
    grpc_cq_end_op(cc, tags[i], NULL, 1);
 
149
  }
 
150
 
 
151
  for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
 
152
    ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
 
153
                                     gpr_inf_past);
 
154
    GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
 
155
  }
 
156
 
 
157
  shutdown_and_destroy(cc);
 
158
}
 
159
 
 
160
#define TEST_THREAD_EVENTS 10000
 
161
 
 
162
typedef struct test_thread_options {
 
163
  gpr_event on_started;
 
164
  gpr_event *phase1;
 
165
  gpr_event on_phase1_done;
 
166
  gpr_event *phase2;
 
167
  gpr_event on_finished;
 
168
  int events_triggered;
 
169
  int id;
 
170
  grpc_completion_queue *cc;
 
171
} test_thread_options;
 
172
 
 
173
gpr_timespec ten_seconds_time(void) {
 
174
  return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
 
175
}
 
176
 
 
177
static void producer_thread(void *arg) {
 
178
  test_thread_options *opt = arg;
 
179
  int i;
 
180
 
 
181
  gpr_log(GPR_INFO, "producer %d started", opt->id);
 
182
  gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
 
183
  GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
 
184
 
 
185
  gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
 
186
  for (i = 0; i < TEST_THREAD_EVENTS; i++) {
 
187
    grpc_cq_begin_op(opt->cc, NULL);
 
188
  }
 
189
 
 
190
  gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
 
191
  gpr_event_set(&opt->on_phase1_done, (void *)(gpr_intptr)1);
 
192
  GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
 
193
 
 
194
  gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
 
195
  for (i = 0; i < TEST_THREAD_EVENTS; i++) {
 
196
    grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, 1);
 
197
    opt->events_triggered++;
 
198
  }
 
199
 
 
200
  gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
 
201
  gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
 
202
}
 
203
 
 
204
static void consumer_thread(void *arg) {
 
205
  test_thread_options *opt = arg;
 
206
  grpc_event ev;
 
207
 
 
208
  gpr_log(GPR_INFO, "consumer %d started", opt->id);
 
209
  gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
 
210
  GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
 
211
 
 
212
  gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
 
213
 
 
214
  gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
 
215
  gpr_event_set(&opt->on_phase1_done, (void *)(gpr_intptr)1);
 
216
  GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
 
217
 
 
218
  gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
 
219
  for (;;) {
 
220
    ev = grpc_completion_queue_next(opt->cc, ten_seconds_time());
 
221
    switch (ev.type) {
 
222
      case GRPC_OP_COMPLETE:
 
223
        GPR_ASSERT(ev.success);
 
224
        opt->events_triggered++;
 
225
        break;
 
226
      case GRPC_QUEUE_SHUTDOWN:
 
227
        gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
 
228
        gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
 
229
        return;
 
230
      case GRPC_QUEUE_TIMEOUT:
 
231
        gpr_log(GPR_ERROR, "Invalid timeout received");
 
232
        abort();
 
233
    }
 
234
  }
 
235
}
 
236
 
 
237
static void test_threading(int producers, int consumers) {
 
238
  test_thread_options *options =
 
239
      gpr_malloc((producers + consumers) * sizeof(test_thread_options));
 
240
  gpr_event phase1 = GPR_EVENT_INIT;
 
241
  gpr_event phase2 = GPR_EVENT_INIT;
 
242
  grpc_completion_queue *cc = grpc_completion_queue_create();
 
243
  int i;
 
244
  int total_consumed = 0;
 
245
  static int optid = 101;
 
246
 
 
247
  gpr_log(GPR_INFO, "%s: %d producers, %d consumers", "test_threading",
 
248
          producers, consumers);
 
249
 
 
250
  /* start all threads: they will wait for phase1 */
 
251
  for (i = 0; i < producers + consumers; i++) {
 
252
    gpr_thd_id id;
 
253
    gpr_event_init(&options[i].on_started);
 
254
    gpr_event_init(&options[i].on_phase1_done);
 
255
    gpr_event_init(&options[i].on_finished);
 
256
    options[i].phase1 = &phase1;
 
257
    options[i].phase2 = &phase2;
 
258
    options[i].events_triggered = 0;
 
259
    options[i].cc = cc;
 
260
    options[i].id = optid++;
 
261
    GPR_ASSERT(gpr_thd_new(&id,
 
262
                           i < producers ? producer_thread : consumer_thread,
 
263
                           options + i, NULL));
 
264
    gpr_event_wait(&options[i].on_started, ten_seconds_time());
 
265
  }
 
266
 
 
267
  /* start phase1: producers will pre-declare all operations they will
 
268
     complete */
 
269
  gpr_log(GPR_INFO, "start phase 1");
 
270
  gpr_event_set(&phase1, (void *)(gpr_intptr)1);
 
271
 
 
272
  gpr_log(GPR_INFO, "wait phase 1");
 
273
  for (i = 0; i < producers + consumers; i++) {
 
274
    GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
 
275
  }
 
276
  gpr_log(GPR_INFO, "done phase 1");
 
277
 
 
278
  /* start phase2: operations will complete, and consumers will consume them */
 
279
  gpr_log(GPR_INFO, "start phase 2");
 
280
  gpr_event_set(&phase2, (void *)(gpr_intptr)1);
 
281
 
 
282
  /* in parallel, we shutdown the completion channel - all events should still
 
283
     be consumed */
 
284
  grpc_completion_queue_shutdown(cc);
 
285
 
 
286
  /* join all threads */
 
287
  gpr_log(GPR_INFO, "wait phase 2");
 
288
  for (i = 0; i < producers + consumers; i++) {
 
289
    GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
 
290
  }
 
291
  gpr_log(GPR_INFO, "done phase 2");
 
292
 
 
293
  /* destroy the completion channel */
 
294
  grpc_completion_queue_destroy(cc);
 
295
 
 
296
  /* verify that everything was produced and consumed */
 
297
  for (i = 0; i < producers + consumers; i++) {
 
298
    if (i < producers) {
 
299
      GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
 
300
    } else {
 
301
      total_consumed += options[i].events_triggered;
 
302
    }
 
303
  }
 
304
  GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
 
305
 
 
306
  gpr_free(options);
 
307
}
 
308
 
 
309
int main(int argc, char **argv) {
 
310
  grpc_test_init(argc, argv);
 
311
  grpc_iomgr_init();
 
312
  test_no_op();
 
313
  test_wait_empty();
 
314
  test_shutdown_then_next_polling();
 
315
  test_shutdown_then_next_with_timeout();
 
316
  test_cq_end_op();
 
317
  test_pluck();
 
318
  test_threading(1, 1);
 
319
  test_threading(1, 10);
 
320
  test_threading(10, 1);
 
321
  test_threading(10, 10);
 
322
  grpc_iomgr_shutdown();
 
323
  return 0;
 
324
}