~stolowski/+junk/revert-zmq-reconnect-fix

« back to all changes in this revision

Viewing changes to test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp

  • Committer: CI bot
  • Author(s): Pawel Stolowski
  • Date: 2014-10-06 12:03:05 UTC
  • mfrom: (163.300.21 staging)
  • Revision ID: ps-jenkins@lists.canonical.com-20141006120305-evhuxbb2f6pgaie0
Merge devel. Fixes: 1376054
Approved by: PS Jenkins bot, Marcus Tomlinson

Show diffs side-by-side

added added

removed removed

Lines of Context:
742
742
    int delay_;
743
743
};
744
744
 
745
 
void invoke_thread(ZmqMiddleware* mw, RequestMode t)
 
745
void invoke_thread(ZmqMiddleware* mw, RequestMode t, string const& object_id)
746
746
{
747
747
    zmqpp::socket s(*mw->context(), t == RequestMode::Twoway ? zmqpp::socket_type::request : zmqpp::socket_type::push);
 
748
    s.set(zmqpp::socket_option::linger, 200);
748
749
    s.connect("ipc://testscope");
749
750
    ZmqSender sender(s);
750
751
    ZmqReceiver receiver(s);
752
753
    capnp::MallocMessageBuilder b;
753
754
    auto request = b.initRoot<capnproto::Request>();
754
755
    request.setMode(t == RequestMode::Twoway ? capnproto::RequestMode::TWOWAY : capnproto::RequestMode::ONEWAY);
755
 
    request.setId("some_id");
 
756
    request.setId(object_id);
756
757
    request.setCat("some_cat");
757
758
    request.setOpName("count_op");
758
759
 
765
766
        auto response = reader.getRoot<capnproto::Response>();
766
767
        EXPECT_EQ(response.getStatus(), capnproto::ResponseStatus::SUCCESS);
767
768
    }
768
 
    else
769
 
    {
770
 
        wait(50);  // Allow some time for oneway requests to actually make it on the wire.
771
 
    }
772
769
}
773
770
 
774
771
TEST(ObjectAdapter, twoway_threading)
791
788
        vector<thread> invokers;
792
789
        for (auto i = 0; i < num_requests; ++i)
793
790
        {
794
 
            invokers.push_back(thread(invoke_thread, &mw, RequestMode::Twoway));
 
791
            invokers.push_back(thread(invoke_thread, &mw, RequestMode::Twoway, "some_id"));
795
792
        }
796
793
        for (auto& i : invokers)
797
794
        {
825
822
        vector<thread> invokers;
826
823
        for (auto i = 0; i < num_requests; ++i)
827
824
        {
828
 
            invokers.push_back(thread(invoke_thread, &mw, RequestMode::Oneway));
 
825
            invokers.push_back(thread(invoke_thread, &mw, RequestMode::Oneway, "some_id"));
829
826
        }
830
827
        for (auto& i : invokers)
831
828
        {
847
844
    EXPECT_EQ(num_threads, o->max_concurrent());
848
845
}
849
846
 
 
847
// Show that a slow twoway invocation does not delay processing of other twoway invocations if
 
848
// the number of outstanding invocations exceeds the number of worker threads.
 
849
 
 
850
TEST(ObjectAdapter, load_balancing_twoway)
 
851
{
 
852
    auto rt = RuntimeImpl::create("testscope", runtime_ini);
 
853
    ZmqMiddleware mw("testscope", rt.get(), zmq_ini);
 
854
 
 
855
    // Twoway adapter with 3 threads.
 
856
    ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 3);
 
857
    a.activate();
 
858
 
 
859
    // Add servants that take 50 ms (fast) and 1000 ms (slow)
 
860
    shared_ptr<CountingServant> slow_servant(new CountingServant(1000));
 
861
    shared_ptr<CountingServant> fast_servant(new CountingServant(50));
 
862
    a.add("slow", slow_servant);
 
863
    a.add("fast", fast_servant);
 
864
 
 
865
    // Send a single request to the slow servant, and 30 requests to the fast servant.
 
866
    // The slow servant ties up a thread for a second, so the other two threads
 
867
    // should be processing the fast invocations during that time, meaning that the
 
868
    // 31 requests should complete in about a second.
 
869
 
 
870
    auto start_time = chrono::system_clock::now();
 
871
 
 
872
    vector<thread> invokers;
 
873
    invokers.push_back(thread(invoke_thread, &mw, RequestMode::Twoway, "slow"));
 
874
    for (auto i = 0; i < 30; ++i)
 
875
    {
 
876
        invokers.push_back(thread(invoke_thread, &mw, RequestMode::Twoway, "fast"));
 
877
    }
 
878
    for (auto& i : invokers)
 
879
    {
 
880
        i.join();
 
881
    }
 
882
 
 
883
    // We set a generous limit of two seconds, even though the entire thing will normally
 
884
    // finish in about 1.1 seconds, in case we are slow on Jenkins.
 
885
    auto end_time = chrono::system_clock::now();
 
886
    EXPECT_LT(chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(), 2000);
 
887
 
 
888
    // We must have had 1 request on the slow servant, and 30 on the fast servant, with the
 
889
    // fast servant getting at least 2 invocations at the same time. Depending on scheduling
 
890
    // order, it's possible for a fast request to be sent before the single slow request,
 
891
    // we may have max concurrency of 3 in the fast servant occasionally.)
 
892
    EXPECT_EQ(1, slow_servant->num_invocations());
 
893
    EXPECT_EQ(30, fast_servant->num_invocations());
 
894
    EXPECT_GE(fast_servant->max_concurrent(), 2);
 
895
}
 
896
 
 
897
// Show that a slow oneway invocation does not delay processing of other oneway invocations if
 
898
// the number of outstanding invocations exceeds the number of worker threads.
 
899
 
 
900
TEST(ObjectAdapter, load_balancing_oneway)
 
901
{
 
902
    auto rt = RuntimeImpl::create("testscope", runtime_ini);
 
903
    ZmqMiddleware mw("testscope", rt.get(), zmq_ini);
 
904
 
 
905
    // Oneway adapter with 3 threads.
 
906
    ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Oneway, 3);
 
907
    a.activate();
 
908
 
 
909
    // Add servants that take 10 ms (fast) and 1000 ms (slow)
 
910
    shared_ptr<CountingServant> slow_servant(new CountingServant(1000));
 
911
    shared_ptr<CountingServant> fast_servant(new CountingServant(10));
 
912
    a.add("slow", slow_servant);
 
913
    a.add("fast", fast_servant);
 
914
 
 
915
    // Socket to invoke on adapter
 
916
    zmqpp::socket s(*mw.context(), zmqpp::socket_type::push);
 
917
    s.set(zmqpp::socket_option::linger, 200);
 
918
    s.connect("ipc://testscope");
 
919
    ZmqSender sender(s);
 
920
 
 
921
    // Request for invoking slow servant.
 
922
    capnp::MallocMessageBuilder slow_b;
 
923
    auto slow_req = slow_b.initRoot<capnproto::Request>();
 
924
    slow_req.setMode(capnproto::RequestMode::ONEWAY);
 
925
    slow_req.setId("slow");
 
926
    slow_req.setCat("some_cat");
 
927
    slow_req.setOpName("count_op");
 
928
 
 
929
    // Request for invoking fast servant.
 
930
    capnp::MallocMessageBuilder fast_b;
 
931
    auto fast_req = fast_b.initRoot<capnproto::Request>();
 
932
    fast_req.setMode(capnproto::RequestMode::ONEWAY);
 
933
    fast_req.setId("fast");
 
934
    fast_req.setCat("some_cat");
 
935
    fast_req.setOpName("count_op");
 
936
 
 
937
    // Send a single request to the slow servant, and 140 requests to the fast servant.
 
938
    // The slow servant ties up a thread for a second, so the other two threads
 
939
    // are processing the fast invocations during that time, meaning that the
 
940
    // 141 requests should complete in about a second.
 
941
    auto slow_segments = slow_b.getSegmentsForOutput();
 
942
    sender.send(slow_segments);
 
943
 
 
944
    auto fast_segments = fast_b.getSegmentsForOutput();
 
945
    for (int i = 0; i < 140; ++i)
 
946
    {
 
947
        sender.send(fast_segments);
 
948
    }
 
949
 
 
950
    // Oneway invocations, so we need to give them a chance to finish.
 
951
    // We set a generous limit of two seconds, even though the entire thing will normally
 
952
    // finish in about 1.1 seconds, in case we are slow on Jenkins.
 
953
    this_thread::sleep_for(chrono::seconds(2));
 
954
 
 
955
    // We must have had 1 request on the slow servant, and 140 on the fast servant, with the
 
956
    // fast servant getting 2 invocations concurrently.
 
957
    EXPECT_EQ(1, slow_servant->num_invocations());
 
958
    EXPECT_EQ(140, fast_servant->num_invocations());
 
959
    EXPECT_EQ(2, fast_servant->max_concurrent());
 
960
}
 
961
 
850
962
using namespace std::placeholders;
851
963
 
852
964
// Servant that updates the servant map in various ways from its destructor, to verify
1062
1174
        }
1063
1175
        catch (MiddlewareException const& e)
1064
1176
        {
1065
 
            EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter::run_workers(): broker thread failure "
 
1177
            EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter: pump thread failure "
1066
1178
                         "(adapter: testscope):\n"
1067
 
                         "    unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope", e.what());
 
1179
                         "    unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope",
 
1180
                         e.what());
1068
1181
        }
1069
1182
 
1070
1183
        {
1097
1210
        }
1098
1211
        catch (MiddlewareException const& e)
1099
1212
        {
1100
 
            EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter::run_workers(): broker thread failure "
 
1213
            EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter: pump thread failure "
1101
1214
                         "(adapter: testscope):\n"
1102
 
                         "    unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope", e.what());
 
1215
                         "    unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope",
 
1216
                         e.what());
1103
1217
        }
1104
1218
 
1105
1219
        {
1196
1310
    }
1197
1311
    catch (MiddlewareException const& e)
1198
1312
    {
1199
 
        EXPECT_STREQ("unity::scopes::MiddlewareException: add_dflt_servant(): "
1200
 
                     "Object adapter in Failed state (adapter: testscope2)",
 
1313
        EXPECT_STREQ("unity::scopes::MiddlewareException: add_dflt_servant(): Object adapter in"
 
1314
                     " Failed state (adapter: testscope2)\n"
 
1315
                     "    Exception history:\n"
 
1316
                     "        Exception #1:\n"
 
1317
                     "            unity::scopes::MiddlewareException: ObjectAdapter: pump thread failure"
 
1318
                     " (adapter: testscope2):\n"
 
1319
                     "                unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope",
1201
1320
                     e.what());
1202
1321
    }
1203
1322
 
1209
1328
    }
1210
1329
    catch (MiddlewareException const& e)
1211
1330
    {
1212
 
        EXPECT_STREQ("unity::scopes::MiddlewareException: remove_dflt_servant(): "
1213
 
                     "Object adapter in Failed state (adapter: testscope2)",
 
1331
        EXPECT_STREQ("unity::scopes::MiddlewareException: remove_dflt_servant(): Object adapter in"
 
1332
                     " Failed state (adapter: testscope2)\n"
 
1333
                     "    Exception history:\n"
 
1334
                     "        Exception #1:\n"
 
1335
                     "            unity::scopes::MiddlewareException: ObjectAdapter: pump thread failure"
 
1336
                     " (adapter: testscope2):\n"
 
1337
                     "                unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope",
1214
1338
                     e.what());
1215
1339
    }
1216
1340
 
1222
1346
    }
1223
1347
    catch (MiddlewareException const& e)
1224
1348
    {
1225
 
        EXPECT_STREQ("unity::scopes::MiddlewareException: find_dflt_servant(): "
1226
 
                     "Object adapter in Failed state (adapter: testscope2)",
 
1349
        EXPECT_STREQ("unity::scopes::MiddlewareException: find_dflt_servant(): Object adapter in"
 
1350
                     " Failed state (adapter: testscope2)\n"
 
1351
                     "    Exception history:\n"
 
1352
                     "        Exception #1:\n"
 
1353
                     "            unity::scopes::MiddlewareException: ObjectAdapter: pump thread failure"
 
1354
                     " (adapter: testscope2):\n"
 
1355
                     "                unity::scopes::MiddlewareException: safe_bind(): address in use: ipc://testscope",
1227
1356
                     e.what());
1228
1357
    }
1229
1358
}