169
169
#ifdef ENABLE_IPC_MONITOR
170
void register_monitor_socket (ConnectionPool& pool, zmqpp::context_t const& context)
170
void register_monitor_socket(ConnectionPool& pool, zmqpp::context_t const& context)
172
172
thread_local static bool monitor_initialized = false;
173
173
if (!monitor_initialized) {
175
175
zmqpp::socket monitor_socket(context, zmqpp::socket_type::publish);
176
176
monitor_socket.set(zmqpp::socket_option::linger, 0);
177
177
monitor_socket.connect(MONITOR_ENDPOINT);
178
pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket), RequestMode::Oneway);
178
pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket));
190
190
lock_guard<mutex> lock(shared_mutex);
192
192
assert(mode_ == RequestMode::Oneway);
193
zmqpp::socket& s = pool.find(endpoint_, mode_);
193
zmqpp::socket& s = pool.find(endpoint_);
194
194
ZmqSender sender(s);
195
195
auto segments = in_params.getSegmentsForOutput();
196
sender.send(segments);
196
if (!sender.send(segments, ZmqSender::DontWait))
198
// If there is nothing at the other end, discard the message and trash the socket.
199
pool.remove(endpoint_);
198
203
#ifdef ENABLE_IPC_MONITOR
200
205
register_monitor_socket(pool, *mw_base()->context());
201
zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
206
zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT);
202
207
auto word_arr = capnp::messageToFlatArray(segments);
203
208
monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
264
269
// Get a socket to the endpoint for this proxy and write the request on the wire.
265
270
// Poll for the reply with the given timeout.
266
// Return a socket for the response or throw if the timeout expires.
271
// Return a reader for the response or throw if the timeout expires.
268
273
ZmqObjectProxy::TwowayOutParams ZmqObjectProxy::invoke_twoway__(capnp::MessageBuilder& in_params, int64_t timeout)
270
// Each calling thread gets its own pool because zmq sockets are not thread-safe.
271
thread_local static ConnectionPool pool(*mw_base()->context());
273
275
RequestMode mode;
274
276
std::string endpoint;
281
283
assert(mode == RequestMode::Twoway);
282
zmqpp::socket& s = pool.find(endpoint, mode);
285
zmqpp::socket s(*mw_base()->context(), zmqpp::socket_type::request);
286
// Allow short linger time so we don't hang indefinitely if the other end disappears.
287
s.set(zmqpp::socket_option::linger, 50);
288
// We set a reconnect interval of 20 ms, so we get to the peer quickly, in case
289
// the peer hasn't finished binding to its endpoint yet after being exec'd.
290
// We back off exponentially to half the call timeout. If we haven't connected
291
// by then, the poll below will time out anyway. For inifinite timeout, we try
293
int reconnect_max = timeout == -1 ? 1000 : timeout / 2;
294
s.set(zmqpp::socket_option::reconnect_interval, 20);
295
s.set(zmqpp::socket_option::reconnect_interval_max, reconnect_max);
283
297
ZmqSender sender(s);
284
298
auto segments = in_params.getSegmentsForOutput();
285
299
sender.send(segments);
287
301
#ifdef ENABLE_IPC_MONITOR
302
// Each calling thread gets its own pool because zmq sockets are not thread-safe.
303
thread_local static ConnectionPool pool(*mw_base()->context());
289
306
register_monitor_socket(pool, *mw_base()->context());
290
zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
307
zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT);
291
308
auto word_arr = capnp::messageToFlatArray(segments);
292
309
monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
308
325
if (!p.has_input(s))
310
// If a request times out, we must trash the corresponding socket, otherwise
311
// zmq gets confused: the reply will never be read, so the socket ends up
313
// (Removing a socket from the connection pool deletes it, hence closing the socket.)
314
pool.remove(endpoint);
315
327
throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds");
318
// We need to read here rather than in the caller because, otherwise, it is possible for another
319
// twoway request to be sent to this thread before the response has been read from the socket.
320
330
// Because the ZmqReceiver holds the memory for the unmarshaling buffer, we pass both the receiver
321
331
// and the capnp reader in a struct.
322
332
ZmqObjectProxy::TwowayOutParams out_params;