~stolowski/+junk/symbols-test

« back to all changes in this revision

Viewing changes to src/scopes/internal/zmq_middleware/ZmqObject.cpp

  • Committer: Tarmac
  • Author(s): Michi Henning
  • Date: 2014-09-25 12:56:03 UTC
  • mfrom: (490.1.7 reconnect-fix)
  • Revision ID: tarmac-20140925125603-v5wj5ub44fhv6ljc
Interim fix for bug #1364464 to reduce Zmq reconnection attempts once the target endpoint for a oneway socket is removed. This limits the reconnection attempts but doesn't completely eliminate them. (I'm working on a better fix that will stop the reconnections completely, but that's non-trivial.) For now, this will do.

Added non-blocking option to ZmqSender::send(). Changed oneway_invoke_() to check whether a send succeeded and trash the corresponding socket if not. Changed reconnect interval for sockets to exponential back-off with 1 sec maximum. Fixes: https://bugs.launchpad.net/bugs/1364464.

Approved by Pawel Stolowski, PS Jenkins bot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
167
167
}
168
168
 
169
169
#ifdef ENABLE_IPC_MONITOR
170
 
void register_monitor_socket (ConnectionPool& pool, zmqpp::context_t const& context)
 
170
void register_monitor_socket(ConnectionPool& pool, zmqpp::context_t const& context)
171
171
{
172
172
    thread_local static bool monitor_initialized = false;
173
173
    if (!monitor_initialized) {
175
175
        zmqpp::socket monitor_socket(context, zmqpp::socket_type::publish);
176
176
        monitor_socket.set(zmqpp::socket_option::linger, 0);
177
177
        monitor_socket.connect(MONITOR_ENDPOINT);
178
 
        pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket), RequestMode::Oneway);
 
178
        pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket));
179
179
    }
180
180
}
181
181
#endif
190
190
    lock_guard<mutex> lock(shared_mutex);
191
191
 
192
192
    assert(mode_ == RequestMode::Oneway);
193
 
    zmqpp::socket& s = pool.find(endpoint_, mode_);
 
193
    zmqpp::socket& s = pool.find(endpoint_);
194
194
    ZmqSender sender(s);
195
195
    auto segments = in_params.getSegmentsForOutput();
196
 
    sender.send(segments);
 
196
    if (!sender.send(segments, ZmqSender::DontWait))
 
197
    {
 
198
        // If there is nothing at the other end, discard the message and trash the socket.
 
199
        pool.remove(endpoint_);
 
200
        return;
 
201
    }
197
202
 
198
203
#ifdef ENABLE_IPC_MONITOR
199
204
    if (true) {
200
205
        register_monitor_socket(pool, *mw_base()->context());
201
 
        zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
 
206
        zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT);
202
207
        auto word_arr = capnp::messageToFlatArray(segments);
203
208
        monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
204
209
    }
263
268
 
264
269
// Get a socket to the endpoint for this proxy and write the request on the wire.
265
270
// Poll for the reply with the given timeout.
266
 
// Return a socket for the response or throw if the timeout expires.
 
271
// Return a reader for the response or throw if the timeout expires.
267
272
 
268
273
ZmqObjectProxy::TwowayOutParams ZmqObjectProxy::invoke_twoway__(capnp::MessageBuilder& in_params, int64_t timeout)
269
274
{
270
 
    // Each calling thread gets its own pool because zmq sockets are not thread-safe.
271
 
    thread_local static ConnectionPool pool(*mw_base()->context());
272
 
 
273
275
    RequestMode mode;
274
276
    std::string endpoint;
275
277
    {
279
281
    }
280
282
 
281
283
    assert(mode == RequestMode::Twoway);
282
 
    zmqpp::socket& s = pool.find(endpoint, mode);
 
284
 
 
285
    zmqpp::socket s(*mw_base()->context(), zmqpp::socket_type::request);
 
286
    // Allow short linger time so we don't hang indefinitely if the other end disappears.
 
287
    s.set(zmqpp::socket_option::linger, 50);
 
288
    // We set a reconnect interval of 20 ms, so we get to the peer quickly, in case
 
289
    // the peer hasn't finished binding to its endpoint yet after being exec'd.
 
290
    // We back off exponentially to half the call timeout. If we haven't connected
 
291
    // by then, the poll below will time out anyway. For inifinite timeout, we try
 
292
    // a second.
 
293
    int reconnect_max = timeout == -1 ? 1000 : timeout / 2;
 
294
    s.set(zmqpp::socket_option::reconnect_interval, 20);
 
295
    s.set(zmqpp::socket_option::reconnect_interval_max, reconnect_max);
 
296
    s.connect(endpoint);
283
297
    ZmqSender sender(s);
284
298
    auto segments = in_params.getSegmentsForOutput();
285
299
    sender.send(segments);
286
300
 
287
301
#ifdef ENABLE_IPC_MONITOR
 
302
    // Each calling thread gets its own pool because zmq sockets are not thread-safe.
 
303
    thread_local static ConnectionPool pool(*mw_base()->context());
 
304
 
288
305
    if (true) {
289
306
        register_monitor_socket(pool, *mw_base()->context());
290
 
        zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
 
307
        zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT);
291
308
        auto word_arr = capnp::messageToFlatArray(segments);
292
309
        monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
293
310
    }
307
324
 
308
325
    if (!p.has_input(s))
309
326
    {
310
 
        // If a request times out, we must trash the corresponding socket, otherwise
311
 
        // zmq gets confused: the reply will never be read, so the socket ends up
312
 
        // in a bad state.
313
 
        // (Removing a socket from the connection pool deletes it, hence closing the socket.)
314
 
        pool.remove(endpoint);
315
327
        throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds");
316
328
    }
317
329
 
318
 
    // We need to read here rather than in the caller because, otherwise, it is possible for another
319
 
    // twoway request to be sent to this thread before the response has been read from the socket.
320
330
    // Because the ZmqReceiver holds the memory for the unmarshaling buffer, we pass both the receiver
321
331
    // and the capnp reader in a struct.
322
332
    ZmqObjectProxy::TwowayOutParams out_params;
324
334
    auto params = out_params.receiver->receive();
325
335
    out_params.reader.reset(new capnp::SegmentArrayMessageReader(params));
326
336
    return out_params;
 
337
    // Outgoing twoway socket closed here.
327
338
}
328
339
 
329
340
} // namespace zmq_middleware