~unity-api-team/unity-scopes-api/child-scopes-option

« back to all changes in this revision

Viewing changes to src/scopes/internal/zmq_middleware/ConnectionPool.cpp

  • Committer: Tarmac
  • Author(s): Michi Henning
  • Date: 2014-09-25 12:56:03 UTC
  • mfrom: (490.1.7 reconnect-fix)
  • Revision ID: tarmac-20140925125603-v5wj5ub44fhv6ljc
Interim fix for bug #1364464 to reduce Zmq reconnection attempts once the target endpoint for a oneway socket is removed. This limits the reconnection attempts but doesn't completely eliminate them. (I'm working on a better fix that will stop the reconnections completely, but that's non-trivial.) For now, this will do.

Added non-blocking option to ZmqSender::send(). Changed oneway_invoke_() to check whether a send succeeded and trash the corresponding socket if not. Changed reconnect interval for sockets to exponential back-off with 1 sec maximum. Fixes: https://bugs.launchpad.net/bugs/1364464.

Approved by Pawel Stolowski, PS Jenkins bot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
46
46
    pool_.clear();
47
47
}
48
48
 
49
 
zmqpp::socket& ConnectionPool::find(std::string const& endpoint, RequestMode m)
 
49
zmqpp::socket& ConnectionPool::find(std::string const& endpoint)
50
50
{
51
51
    assert(!endpoint.empty());
52
52
 
54
54
    auto const& it = pool_.find(endpoint);
55
55
    if (it != pool_.end())
56
56
    {
57
 
        if (it->second.mode != m)
58
 
        {
59
 
            string msg("ConnectionPool::find(): cannot send " + to_string(m) +
60
 
                       " request via " + to_string(it->second.mode) + " connection (endpoint: " + endpoint + ")");
61
 
            throw MiddlewareException(msg);
62
 
        }
63
 
        return it->second.socket;
 
57
        return it->second;
64
58
    }
65
59
 
66
60
    // No existing connection yet, establish one.
67
 
    auto entry = create_connection(endpoint, m);
68
 
    return pool_.emplace(move(entry)).first->second.socket;
 
61
    auto s = create_connection(endpoint);
 
62
    return pool_.emplace(make_pair(endpoint, move(s))).first->second;
69
63
}
70
64
 
71
 
ConnectionPool::CPool::value_type ConnectionPool::create_connection(std::string const& endpoint, RequestMode m)
 
65
zmqpp::socket ConnectionPool::create_connection(std::string const& endpoint)
72
66
{
73
 
    zmqpp::socket_type stype = m == RequestMode::Twoway ? zmqpp::socket_type::request : zmqpp::socket_type::push;
74
 
    zmqpp::socket s(context_, stype);
 
67
    zmqpp::socket s(context_, zmqpp::socket_type::push);
75
68
    // Allow short linger time so messages written just before we shut down
76
69
    // have some chance of being sent, and we don't block indefinitely if the
77
70
    // peer has gone away.
78
71
    s.set(zmqpp::socket_option::linger, 50);
 
72
    // We set a reconnect interval of 20 ms, so we get to the peer quickly, in case
 
73
    // the peer hasn't finished binding to its endpoint yet after the first query
 
74
    // is sent. We back off exponentially to one second.
 
75
    // TODO: This still doesn't entirely stop the reconnection attempts that are
 
76
    //       made by Zmq behind the scenes. We'll have to add a garbage collection
 
77
    //       thread that closes outgoing connections after some idle time.
 
78
    s.set(zmqpp::socket_option::reconnect_interval, 20);
 
79
    s.set(zmqpp::socket_option::reconnect_interval_max, 1000);
79
80
    s.connect(endpoint);
80
 
    return CPool::value_type{ endpoint, SocketData{ move(s), m } };
 
81
    return move(s);
81
82
}
82
83
 
83
84
void ConnectionPool::remove(std::string const& endpoint)
91
92
    }
92
93
}
93
94
 
94
 
void ConnectionPool::register_socket(std::string const& endpoint, zmqpp::socket socket, RequestMode m)
 
95
void ConnectionPool::register_socket(std::string const& endpoint, zmqpp::socket socket)
95
96
{
96
97
    assert(!endpoint.empty());
97
98
 
98
 
    pool_.emplace(endpoint, SocketData{ move(socket), m });
 
99
    pool_.emplace(endpoint, move(socket));
99
100
}
100
101
 
101
102
} // namespace zmq_middleware