54
54
auto const& it = pool_.find(endpoint);
55
55
if (it != pool_.end())
57
if (it->second.mode != m)
59
string msg("ConnectionPool::find(): cannot send " + to_string(m) +
60
" request via " + to_string(it->second.mode) + " connection (endpoint: " + endpoint + ")");
61
throw MiddlewareException(msg);
63
return it->second.socket;
66
60
// No existing connection yet, establish one.
67
auto entry = create_connection(endpoint, m);
68
return pool_.emplace(move(entry)).first->second.socket;
61
auto s = create_connection(endpoint);
62
return pool_.emplace(make_pair(endpoint, move(s))).first->second;
71
ConnectionPool::CPool::value_type ConnectionPool::create_connection(std::string const& endpoint, RequestMode m)
65
zmqpp::socket ConnectionPool::create_connection(std::string const& endpoint)
73
zmqpp::socket_type stype = m == RequestMode::Twoway ? zmqpp::socket_type::request : zmqpp::socket_type::push;
74
zmqpp::socket s(context_, stype);
67
zmqpp::socket s(context_, zmqpp::socket_type::push);
75
68
// Allow short linger time so messages written just before we shut down
76
69
// have some chance of being sent, and we don't block indefinitely if the
77
70
// peer has gone away.
78
71
s.set(zmqpp::socket_option::linger, 50);
72
// We set a reconnect interval of 20 ms, so we get to the peer quickly, in case
73
// the peer hasn't finished binding to its endpoint yet after the first query
74
// is sent. We back off exponentially to one second.
75
// TODO: This still doesn't entirely stop the reconnection attempts that are
76
// made by Zmq behind the scenes. We'll have to add a garbage collection
77
// thread that closes outgoing connections after some idle time.
78
s.set(zmqpp::socket_option::reconnect_interval, 20);
79
s.set(zmqpp::socket_option::reconnect_interval_max, 1000);
79
80
s.connect(endpoint);
80
return CPool::value_type{ endpoint, SocketData{ move(s), m } };
83
84
void ConnectionPool::remove(std::string const& endpoint)
94
void ConnectionPool::register_socket(std::string const& endpoint, zmqpp::socket socket, RequestMode m)
95
void ConnectionPool::register_socket(std::string const& endpoint, zmqpp::socket socket)
96
97
assert(!endpoint.empty());
98
pool_.emplace(endpoint, SocketData{ move(socket), m });
99
pool_.emplace(endpoint, move(socket));
101
102
} // namespace zmq_middleware