~ubuntu-branches/ubuntu/wily/grpc/wily

« back to all changes in this revision

Viewing changes to test/cpp/qps/driver.cc

  • Committer: Package Import Robot
  • Author(s): Andrew Pollock
  • Date: 2015-05-07 13:28:11 UTC
  • Revision ID: package-import@ubuntu.com-20150507132811-ybm4hfq73tnvvd2e
Tags: upstream-0.10.0
ImportĀ upstreamĀ versionĀ 0.10.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 *
 
3
 * Copyright 2015, Google Inc.
 
4
 * All rights reserved.
 
5
 *
 
6
 * Redistribution and use in source and binary forms, with or without
 
7
 * modification, are permitted provided that the following conditions are
 
8
 * met:
 
9
 *
 
10
 *     * Redistributions of source code must retain the above copyright
 
11
 * notice, this list of conditions and the following disclaimer.
 
12
 *     * Redistributions in binary form must reproduce the above
 
13
 * copyright notice, this list of conditions and the following disclaimer
 
14
 * in the documentation and/or other materials provided with the
 
15
 * distribution.
 
16
 *     * Neither the name of Google Inc. nor the names of its
 
17
 * contributors may be used to endorse or promote products derived from
 
18
 * this software without specific prior written permission.
 
19
 *
 
20
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
21
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
22
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
23
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
24
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
25
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
26
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
27
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
28
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
29
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
30
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
31
 *
 
32
 */
 
33
 
 
34
#include "test/cpp/qps/driver.h"
 
35
#include "src/core/support/env.h"
 
36
#include <grpc/support/alloc.h>
 
37
#include <grpc/support/log.h>
 
38
#include <grpc/support/host_port.h>
 
39
#include <grpc++/channel_arguments.h>
 
40
#include <grpc++/client_context.h>
 
41
#include <grpc++/create_channel.h>
 
42
#include <grpc++/stream.h>
 
43
#include <list>
 
44
#include <thread>
 
45
#include <deque>
 
46
#include <vector>
 
47
#include <unistd.h>
 
48
#include "test/cpp/qps/histogram.h"
 
49
#include "test/cpp/qps/qps_worker.h"
 
50
#include "test/core/util/port.h"
 
51
#include "test/core/util/test_config.h"
 
52
 
 
53
using std::list;
 
54
using std::thread;
 
55
using std::unique_ptr;
 
56
using std::deque;
 
57
using std::vector;
 
58
 
 
59
namespace grpc {
 
60
namespace testing {
 
61
static deque<string> get_hosts(const string& name) {
 
62
  char* env = gpr_getenv(name.c_str());
 
63
  if (!env) return deque<string>();
 
64
 
 
65
  deque<string> out;
 
66
  char* p = env;
 
67
  for (;;) {
 
68
    char* comma = strchr(p, ',');
 
69
    if (comma) {
 
70
      out.emplace_back(p, comma);
 
71
      p = comma + 1;
 
72
    } else {
 
73
      out.emplace_back(p);
 
74
      gpr_free(env);
 
75
      return out;
 
76
    }
 
77
  }
 
78
}
 
79
 
 
80
std::unique_ptr<ScenarioResult> RunScenario(
 
81
    const ClientConfig& initial_client_config, size_t num_clients,
 
82
    const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
 
83
    int benchmark_seconds, int spawn_local_worker_count) {
 
84
  // ClientContext allocator (all are destroyed at scope exit)
 
85
  list<ClientContext> contexts;
 
86
  auto alloc_context = [&contexts]() {
 
87
    contexts.emplace_back();
 
88
    return &contexts.back();
 
89
  };
 
90
 
 
91
  // To be added to the result, containing the final configuration used for
 
92
  // client and config (incluiding host, etc.)
 
93
  ClientConfig result_client_config;
 
94
  ServerConfig result_server_config;
 
95
 
 
96
  // Get client, server lists
 
97
  auto workers = get_hosts("QPS_WORKERS");
 
98
  ClientConfig client_config = initial_client_config;
 
99
 
 
100
  // Spawn some local workers if desired
 
101
  vector<unique_ptr<QpsWorker>> local_workers;
 
102
  for (int i = 0; i < abs(spawn_local_worker_count); i++) {
 
103
    // act as if we're a new test -- gets a good rng seed
 
104
    static bool called_init = false;
 
105
    if (!called_init) {
 
106
      char args_buf[100];
 
107
      strcpy(args_buf, "some-benchmark");
 
108
      char* args[] = {args_buf};
 
109
      grpc_test_init(1, args);
 
110
      called_init = true;
 
111
    }
 
112
 
 
113
    int driver_port = grpc_pick_unused_port_or_die();
 
114
    int benchmark_port = grpc_pick_unused_port_or_die();
 
115
    local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
 
116
    char addr[256];
 
117
    sprintf(addr, "localhost:%d", driver_port);
 
118
    if (spawn_local_worker_count < 0) {
 
119
      workers.push_front(addr);
 
120
    } else {
 
121
      workers.push_back(addr);
 
122
    }
 
123
  }
 
124
 
 
125
  // TODO(ctiller): support running multiple configurations, and binpack
 
126
  // client/server pairs
 
127
  // to available workers
 
128
  GPR_ASSERT(workers.size() >= num_clients + num_servers);
 
129
 
 
130
  // Trim to just what we need
 
131
  workers.resize(num_clients + num_servers);
 
132
 
 
133
  // Start servers
 
134
  struct ServerData {
 
135
    unique_ptr<Worker::Stub> stub;
 
136
    unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
 
137
  };
 
138
  vector<ServerData> servers;
 
139
  for (size_t i = 0; i < num_servers; i++) {
 
140
    ServerData sd;
 
141
    sd.stub = std::move(Worker::NewStub(
 
142
        CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
 
143
    ServerArgs args;
 
144
    result_server_config = server_config;
 
145
    result_server_config.set_host(workers[i]);
 
146
    *args.mutable_setup() = server_config;
 
147
    sd.stream = std::move(sd.stub->RunServer(alloc_context()));
 
148
    GPR_ASSERT(sd.stream->Write(args));
 
149
    ServerStatus init_status;
 
150
    GPR_ASSERT(sd.stream->Read(&init_status));
 
151
    char* host;
 
152
    char* driver_port;
 
153
    char* cli_target;
 
154
    gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
 
155
    gpr_join_host_port(&cli_target, host, init_status.port());
 
156
    client_config.add_server_targets(cli_target);
 
157
    gpr_free(host);
 
158
    gpr_free(driver_port);
 
159
    gpr_free(cli_target);
 
160
 
 
161
    servers.push_back(std::move(sd));
 
162
  }
 
163
 
 
164
  // Start clients
 
165
  struct ClientData {
 
166
    unique_ptr<Worker::Stub> stub;
 
167
    unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
 
168
  };
 
169
  vector<ClientData> clients;
 
170
  for (size_t i = 0; i < num_clients; i++) {
 
171
    ClientData cd;
 
172
    cd.stub = std::move(Worker::NewStub(CreateChannel(
 
173
        workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
 
174
    ClientArgs args;
 
175
    result_client_config = client_config;
 
176
    result_client_config.set_host(workers[i + num_servers]);
 
177
    *args.mutable_setup() = client_config;
 
178
    cd.stream = std::move(cd.stub->RunTest(alloc_context()));
 
179
    GPR_ASSERT(cd.stream->Write(args));
 
180
    ClientStatus init_status;
 
181
    GPR_ASSERT(cd.stream->Read(&init_status));
 
182
 
 
183
    clients.push_back(std::move(cd));
 
184
  }
 
185
 
 
186
  // Let everything warmup
 
187
  gpr_log(GPR_INFO, "Warming up");
 
188
  gpr_timespec start = gpr_now();
 
189
  gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(warmup_seconds)));
 
190
 
 
191
  // Start a run
 
192
  gpr_log(GPR_INFO, "Starting");
 
193
  ServerArgs server_mark;
 
194
  server_mark.mutable_mark();
 
195
  ClientArgs client_mark;
 
196
  client_mark.mutable_mark();
 
197
  for (auto server = servers.begin(); server != servers.end(); server++) {
 
198
    GPR_ASSERT(server->stream->Write(server_mark));
 
199
  }
 
200
  for (auto client = clients.begin(); client != clients.end(); client++) {
 
201
    GPR_ASSERT(client->stream->Write(client_mark));
 
202
  }
 
203
  ServerStatus server_status;
 
204
  ClientStatus client_status;
 
205
  for (auto server = servers.begin(); server != servers.end(); server++) {
 
206
    GPR_ASSERT(server->stream->Read(&server_status));
 
207
  }
 
208
  for (auto client = clients.begin(); client != clients.end(); client++) {
 
209
    GPR_ASSERT(client->stream->Read(&client_status));
 
210
  }
 
211
 
 
212
  // Wait some time
 
213
  gpr_log(GPR_INFO, "Running");
 
214
  gpr_sleep_until(
 
215
      gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
 
216
 
 
217
  // Finish a run
 
218
  std::unique_ptr<ScenarioResult> result(new ScenarioResult);
 
219
  result->client_config = result_client_config;
 
220
  result->server_config = result_server_config;
 
221
  gpr_log(GPR_INFO, "Finishing");
 
222
  for (auto server = servers.begin(); server != servers.end(); server++) {
 
223
    GPR_ASSERT(server->stream->Write(server_mark));
 
224
  }
 
225
  for (auto client = clients.begin(); client != clients.end(); client++) {
 
226
    GPR_ASSERT(client->stream->Write(client_mark));
 
227
  }
 
228
  for (auto server = servers.begin(); server != servers.end(); server++) {
 
229
    GPR_ASSERT(server->stream->Read(&server_status));
 
230
    const auto& stats = server_status.stats();
 
231
    result->server_resources.push_back(ResourceUsage{
 
232
        stats.time_elapsed(), stats.time_user(), stats.time_system()});
 
233
  }
 
234
  for (auto client = clients.begin(); client != clients.end(); client++) {
 
235
    GPR_ASSERT(client->stream->Read(&client_status));
 
236
    const auto& stats = client_status.stats();
 
237
    result->latencies.MergeProto(stats.latencies());
 
238
    result->client_resources.push_back(ResourceUsage{
 
239
        stats.time_elapsed(), stats.time_user(), stats.time_system()});
 
240
  }
 
241
 
 
242
  for (auto client = clients.begin(); client != clients.end(); client++) {
 
243
    GPR_ASSERT(client->stream->WritesDone());
 
244
    GPR_ASSERT(client->stream->Finish().ok());
 
245
  }
 
246
  for (auto server = servers.begin(); server != servers.end(); server++) {
 
247
    GPR_ASSERT(server->stream->WritesDone());
 
248
    GPR_ASSERT(server->stream->Finish().ok());
 
249
  }
 
250
  return result;
 
251
}
 
252
}  // namespace testing
 
253
}  // namespace grpc