847
844
EXPECT_EQ(num_threads, o->max_concurrent());
847
// Show that a slow twoway invocation does not delay processing of other twoway invocations if
848
// the number of outstanding invocations exceeds the number of worker threads.
850
TEST(ObjectAdapter, load_balancing_twoway)
852
auto rt = RuntimeImpl::create("testscope", runtime_ini);
853
ZmqMiddleware mw("testscope", rt.get(), zmq_ini);
855
// Twoway adapter with 3 threads.
856
ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 3);
859
// Add servants that take 50 ms (fast) and 1000 ms (slow)
860
shared_ptr<CountingServant> slow_servant(new CountingServant(1000));
861
shared_ptr<CountingServant> fast_servant(new CountingServant(50));
862
a.add("slow", slow_servant);
863
a.add("fast", fast_servant);
865
// Send a single request to the slow servant, and 30 requests to the fast servant.
866
// The slow servant ties up a thread for a second, so the other two threads
867
// should be processing the fast invocations during that time, meaning that the
868
// 31 requests should complete in about a second.
870
auto start_time = chrono::system_clock::now();
872
vector<thread> invokers;
873
invokers.push_back(thread(invoke_thread, &mw, RequestMode::Twoway, "slow"));
874
for (auto i = 0; i < 30; ++i)
876
invokers.push_back(thread(invoke_thread, &mw, RequestMode::Twoway, "fast"));
878
for (auto& i : invokers)
883
// We set a generous limit of two seconds, even though the entire thing will normally
884
// finish in about 1.1 seconds, in case we are slow on Jenkins.
885
auto end_time = chrono::system_clock::now();
886
EXPECT_LT(chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(), 2000);
888
// We must have had 1 request on the slow servant, and 30 on the fast servant, with the
889
// fast servant getting at least 2 invocations at the same time. Depending on scheduling
890
// order, it's possible for a fast request to be sent before the single slow request,
891
// we may have max concurrency of 3 in the fast servant occasionally.)
892
EXPECT_EQ(1, slow_servant->num_invocations());
893
EXPECT_EQ(30, fast_servant->num_invocations());
894
EXPECT_GE(fast_servant->max_concurrent(), 2);
897
// Show that a slow oneway invocation does not delay processing of other oneway invocations if
898
// the number of outstanding invocations exceeds the number of worker threads.
900
TEST(ObjectAdapter, load_balancing_oneway)
902
auto rt = RuntimeImpl::create("testscope", runtime_ini);
903
ZmqMiddleware mw("testscope", rt.get(), zmq_ini);
905
// Oneway adapter with 3 threads.
906
ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Oneway, 3);
909
// Add servants that take 10 ms (fast) and 1000 ms (slow)
910
shared_ptr<CountingServant> slow_servant(new CountingServant(1000));
911
shared_ptr<CountingServant> fast_servant(new CountingServant(10));
912
a.add("slow", slow_servant);
913
a.add("fast", fast_servant);
915
// Socket to invoke on adapter
916
zmqpp::socket s(*mw.context(), zmqpp::socket_type::push);
917
s.set(zmqpp::socket_option::linger, 200);
918
s.connect("ipc://testscope");
921
// Request for invoking slow servant.
922
capnp::MallocMessageBuilder slow_b;
923
auto slow_req = slow_b.initRoot<capnproto::Request>();
924
slow_req.setMode(capnproto::RequestMode::ONEWAY);
925
slow_req.setId("slow");
926
slow_req.setCat("some_cat");
927
slow_req.setOpName("count_op");
929
// Request for invoking fast servant.
930
capnp::MallocMessageBuilder fast_b;
931
auto fast_req = fast_b.initRoot<capnproto::Request>();
932
fast_req.setMode(capnproto::RequestMode::ONEWAY);
933
fast_req.setId("fast");
934
fast_req.setCat("some_cat");
935
fast_req.setOpName("count_op");
937
// Send a single request to the slow servant, and 140 requests to the fast servant.
938
// The slow servant ties up a thread for a second, so the other two threads
939
// are processing the fast invocations during that time, meaning that the
940
// 141 requests should complete in about a second.
941
auto slow_segments = slow_b.getSegmentsForOutput();
942
sender.send(slow_segments);
944
auto fast_segments = fast_b.getSegmentsForOutput();
945
for (int i = 0; i < 140; ++i)
947
sender.send(fast_segments);
950
// Oneway invocations, so we need to give them a chance to finish.
951
// We set a generous limit of two seconds, even though the entire thing will normally
952
// finish in about 1.1 seconds, in case we are slow on Jenkins.
953
this_thread::sleep_for(chrono::seconds(2));
955
// We must have had 1 request on the slow servant, and 140 on the fast servant, with the
956
// fast servant getting 2 invocations concurrently.
957
EXPECT_EQ(1, slow_servant->num_invocations());
958
EXPECT_EQ(140, fast_servant->num_invocations());
959
EXPECT_EQ(2, fast_servant->max_concurrent());
850
962
using namespace std::placeholders;
852
964
// Servant that updates the servant map in various ways from its destructor, to verify