~unity-api-team/unity-scopes-api/child-scopes-option

« back to all changes in this revision

Viewing changes to src/scopes/internal/zmq_middleware/ObjectAdapter.cpp

  • Committer: Marcus Tomlinson
  • Date: 2014-10-08 10:58:57 UTC
  • mfrom: (497.1.3 devel)
  • mto: (497.1.5 devel)
  • mto: This revision was merged to the branch mainline in revision 505.
  • Revision ID: marcus.tomlinson@canonical.com-20141008105857-gf4ilb9ywfeaeroi
Merged devel

Show diffs side-by-side

added added

removed removed

Lines of Context:
51
51
namespace zmq_middleware
52
52
{
53
53
 
 
54
namespace
 
55
{
 
56
 
 
57
char const* pump_suffix = "-pump";
 
58
 
 
59
}  // namespace
 
60
 
54
61
ObjectAdapter::ObjectAdapter(ZmqMiddleware& mw, string const& name, string const& endpoint, RequestMode m,
55
62
                             int pool_size, int64_t idle_timeout) :
56
63
    mw_(mw),
454
461
        }
455
462
    }
456
463
 
457
 
    // Start a broker thread to forward incoming messages to backend workers and
458
 
    // wait for the broker thread to finish its initialization. The broker
459
 
    // signals after it has connected to the stop socket.
460
 
    {
461
 
        lock_guard<mutex> lock(ready_mutex_);
462
 
        ready_ = promise<void>();
463
 
    }
464
 
    broker_ = thread(&ObjectAdapter::broker_thread, this);
465
 
    {
466
 
        auto f = ready_.get_future();
467
 
        try
468
 
        {
469
 
            f.get();
470
 
        }
471
 
        catch (...) // LCOV_EXCL_LINE
472
 
        {
473
 
            throw MiddlewareException("ObjectAdapter::run_workers(): broker thread failure (adapter: " + name_ + ")"); // LCOV_EXCL_LINE
474
 
        }
475
 
    }
476
 
 
477
 
    // Create the appropriate number of worker threads.
478
 
    // The last worker thread to subscribe signals that the workers are ready.
479
 
    // This ensures that we won't send a stop from deactivate() until after
480
 
    // the workers can actually receive it.
481
 
    {
482
 
        lock_guard<mutex> lock(ready_mutex_);
483
 
        ready_ = promise<void>();
484
 
    }
485
 
    {
486
 
        auto f = ready_.get_future();
487
 
        num_workers_.store(pool_size_);
488
 
        for (auto i = 0; i < pool_size_; ++i)
489
 
        {
490
 
            workers_.push_back(thread(&ObjectAdapter::worker_thread, this));
491
 
        }
492
 
        try
493
 
        {
494
 
            f.get();
495
 
        }
496
 
        // LCOV_EXCL_START
497
 
        catch (...) //
498
 
        {
499
 
            lock_guard<mutex> state_lock(state_mutex_);
500
 
            stopper_->stop();
501
 
            throw MiddlewareException("ObjectAdapter::run_workers(): worker thread failure (adapter: " + name_ + ")");
502
 
        }
503
 
        // LCOV_EXCL_STOP
504
 
    }
 
464
    // Start pump.
 
465
    auto ready = promise<void>();
 
466
    auto f = ready.get_future();
 
467
    pump_ = thread(&ObjectAdapter::pump, this, move(ready));
 
468
    f.get();
505
469
}
506
470
 
507
471
shared_ptr<ServantBase> ObjectAdapter::find_servant(string const& id, string const& category)
514
478
    return servant;
515
479
}
516
480
 
517
 
void ObjectAdapter::broker_thread()
 
481
// Load-balancing message pump using router-router sockets (for twoway) or
 
482
// pull-router (for oneway). Loosely follows the "Load Balancing Broker" in the Zmq Guide.
 
483
// (Workers use a REQ socket to pull work from the pump.)
 
484
 
 
485
void ObjectAdapter::pump(std::promise<void> ready)
518
486
{
519
487
    try
520
488
    {
 
489
        zmqpp::poller poller;
 
490
 
 
491
        zmqpp::socket_type stype = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull;
 
492
 
 
493
        // Front-end receives incoming requests from clients. (Frontend is in the server role.)
 
494
        zmqpp::socket frontend(*mw_.context(), stype);
 
495
        frontend.set(zmqpp::socket_option::linger, 50);
 
496
        // "Safe" bind: prevents two servers from binding to the same endpoint.
 
497
        safe_bind(frontend, endpoint_);
 
498
        // We do not add the frontend to the poller just yet because we poll the frontend
 
499
        // only when workers are ready to process a request.
 
500
 
 
501
        // The backend replies to requests from workers for more work. (Backend is in the server role.)
 
502
        // This means that our *reply* to the request from the worker contains the incoming
 
503
        // request from the client, and the next *request* from the worker contains the result that
 
504
        // is to go back to the client via the frontend (or the ready message for a oneway worker).
 
505
        zmqpp::socket backend(*mw_.context(), zmqpp::socket_type::router);
 
506
        backend.set(zmqpp::socket_option::linger, 50);
 
507
        backend.bind("inproc://" + name_ + pump_suffix);
 
508
        poller.add(backend);
 
509
 
521
510
        // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
522
 
        zmqpp::socket stop(*mw_.context(), zmqpp::socket_type::subscribe);
523
 
        {
524
 
            lock_guard<mutex> state_lock(state_mutex_);
525
 
            stop = stopper_->subscribe();
526
 
        }
527
 
 
528
 
        // Set up message pump. Router-dealer for twoway adapter, pull-push for oneway adapter.
529
 
        auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull;
530
 
        zmqpp::socket frontend(*mw_.context(), socket_type);
531
 
 
532
 
        socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::dealer : zmqpp::socket_type::push;
533
 
        zmqpp::socket backend(*mw_.context(), socket_type);
534
 
 
535
 
        zmqpp::poller poller;
536
 
 
537
 
        try
538
 
        {
539
 
            poller.add(stop);
540
 
 
541
 
            frontend.set(zmqpp::socket_option::linger, 200);
542
 
            // "Safe" bind: prevents two servers from binding to the same endpoint.
543
 
            safe_bind(frontend, endpoint_);
544
 
            poller.add(frontend);
545
 
 
546
 
            backend.set(zmqpp::socket_option::linger, 200);
547
 
            backend.bind("inproc://" + name_ + "-worker");
548
 
            poller.add(backend);
549
 
 
550
 
            // Tell parent that we are ready
551
 
            {
552
 
                lock_guard<mutex> lock(ready_mutex_);
553
 
                ready_.set_value();
554
 
            }
555
 
        }
556
 
        // LCOV_EXCL_START
557
 
        catch (...) // LCOV_EXCL_LINE
558
 
        {
559
 
            // TODO: log error
560
 
            {
561
 
                lock_guard<mutex> lock(state_mutex_);
562
 
                stopper_->stop();
563
 
            }
564
 
            lock_guard<mutex> lock(ready_mutex_);
565
 
            ready_.set_exception(current_exception());
566
 
            return;
567
 
        }
568
 
        // LCOV_EXCL_STOP
569
 
 
570
 
        int pending_requests = 0;
 
511
        auto stop = stopper_->subscribe();
 
512
        poller.add(stop);
 
513
 
 
514
        // Fire up the worker threads.
 
515
        int num_workers = 0;
 
516
        for (int i = 0; i < pool_size_; ++i)
 
517
        {
 
518
            workers_.push_back(thread(&ObjectAdapter::worker, this));
 
519
            ++num_workers;
 
520
        }
 
521
 
 
522
        // Tell parent that we are ready
 
523
        ready.set_value();
 
524
 
 
525
        // Start the pump.
571
526
        bool shutting_down = false;
 
527
        queue<string> ready_workers;
 
528
 
572
529
        for (;;)
573
530
        {
574
 
            zmqpp::message message;
575
 
            // Poll for incoming messages. If a timeout has been set, and no incoming messages are received for
576
 
            // idle_timeout_ milliseconds, we shutdown the server.
577
531
            if (!poller.poll(idle_timeout_))
578
532
            {
579
 
                // Shutdown this server by stopping the middleware.
 
533
                // Shut down, no activity for the idle timeout period.
580
534
                mw_.stop();
581
535
            }
582
536
            if (!shutting_down && poller.has_input(stop))
583
537
            {
584
538
                // When the stop socket becomes ready, we need to get out of here.
585
539
                // We stop reading more requests from the router, but continue processing
586
 
                // while there are still replies outstanding.
 
540
                // while there are still outstanding replies for twoway requests.
587
541
                poller.remove(stop);
588
542
                stop.close();
 
543
                if (poller.has(frontend))
 
544
                {
 
545
                    poller.remove(frontend);  // Don't accept any more request from clients.
 
546
                }
589
547
                shutting_down = true;
590
548
            }
591
 
            if (!shutting_down && poller.has_input(frontend)) // Once shutting down, we no longer read incoming messages.
592
 
            {
 
549
            if (poller.has_input(backend))
 
550
            {
 
551
                // A worker is asking for more work to do.
 
552
                string worker_id;
 
553
                backend.receive(worker_id);          // First frame: worker ID for LRU routing
 
554
                ready_workers.push(worker_id);       // Thread will be ready again in a sec
 
555
                if (!shutting_down && ready_workers.size() == 1)
 
556
                {
 
557
                    // We poll the front end when there is at least one worker
 
558
                    poller.add(frontend);
 
559
                }
 
560
                string buf;
 
561
                backend.receive(buf);                // Second frame: empty delimiter frame
 
562
                assert(buf.empty());
 
563
                string client_address;
 
564
                backend.receive(client_address);     // Third frame: "ready" or client reply address
 
565
                if (client_address != "ready")
 
566
                {
 
567
                    backend.receive(buf);            // Fourth frame: empty delimiter frame
 
568
                    assert(buf.empty());
 
569
                    if (mode_ == RequestMode::Twoway)
 
570
                    {
 
571
                        frontend.send(client_address, zmqpp::socket::send_more);   // Client address tells router where to send the reply to
 
572
                        frontend.send("", zmqpp::socket::send_more);               // Empty delimiter frame
 
573
                        // Read reply contents and return them to client via frontend
 
574
                        int flag;
 
575
                        do
 
576
                        {
 
577
                            backend.receive(buf);
 
578
                            flag = backend.has_more_parts() ? zmqpp::socket::send_more : zmqpp::socket::normal;
 
579
                            frontend.send(buf, flag);
 
580
                        } while (flag == zmqpp::socket::send_more);
 
581
                    }
 
582
                }
 
583
            }
 
584
            if (!shutting_down && poller.has(frontend) && poller.has_input(frontend))
 
585
            {
 
586
                // Incoming request from client.
 
587
                string client_address;
 
588
                if (mode_ == RequestMode::Twoway)
 
589
                {
 
590
                    frontend.receive(client_address);  // First frame: client address
 
591
                    string buf;
 
592
                    frontend.receive(buf);             // Second frame: empty delimiter frame
 
593
                    assert(buf.empty());
 
594
                }
 
595
                string worker_id = ready_workers.front();
 
596
                ready_workers.pop();
 
597
                if (ready_workers.size() == 0)  // Stop reading from frontend once all workers are busy.
 
598
                {
 
599
                    poller.remove(frontend);
 
600
                }
 
601
 
 
602
                // Give incoming request to worker.
 
603
                backend.send(worker_id, zmqpp::socket::send_more);
 
604
                backend.send("", zmqpp::socket::send_more);
 
605
                backend.send(client_address, zmqpp::socket::send_more);
 
606
                backend.send("", zmqpp::socket::send_more);
593
607
                int flag;
594
608
                do
595
609
                {
596
 
                    // This is the actual message pump. We read an incoming request and pass it to one of the workers
597
 
                    // for processing. The dealer socket ensures fair sharing.
598
 
                    frontend.receive(message);
 
610
                    string buf;
 
611
                    frontend.receive(buf);
599
612
                    flag = frontend.has_more_parts() ? zmqpp::socket::send_more : zmqpp::socket::normal;
600
 
                    backend.send(message, flag);
601
 
                }
602
 
                while (flag == zmqpp::socket::send_more);
603
 
                if (mode_ == RequestMode::Twoway)
604
 
                {
605
 
                    ++pending_requests; // Only twoway requests require us to wait for replies
606
 
                }
607
 
            }
608
 
            if (pending_requests != 0 && poller.has_input(backend))
609
 
            {
610
 
                // We need to read a reply for an earlier request.
611
 
                int flag;
612
 
                do
613
 
                {
614
 
                    // Message pump in the opposite direction, for replies from server to client.
615
 
                    backend.receive(message);
616
 
                    flag = backend.has_more_parts() ? zmqpp::socket::send_more : zmqpp::socket::normal;
617
 
                    frontend.send(message, flag);
618
 
                }
619
 
                while (flag == zmqpp::socket::send_more);
620
 
                assert(pending_requests > 0);
621
 
                --pending_requests;
622
 
            }
623
 
            if (shutting_down && pending_requests == 0)
624
 
            {
625
 
                frontend.close();
626
 
                backend.close();
627
 
                return;
 
613
                    backend.send(buf, flag);
 
614
                } while (flag == zmqpp::socket::send_more);
 
615
            }
 
616
            if (shutting_down)
 
617
            {
 
618
                // Tell each worker that is ready to stop. This automatically
 
619
                // "waits" for executing twoway requests to complete because
 
620
                // a worker that's doing work isn't ready. Once all workers
 
621
                // have been told to stop, we are done.
 
622
                while (ready_workers.size() > 0)
 
623
                {
 
624
                    string worker_id = ready_workers.front();
 
625
                    ready_workers.pop();
 
626
                    backend.send(worker_id, zmqpp::socket::send_more);
 
627
                    backend.send("", zmqpp::socket::send_more);
 
628
                    backend.send("stop");
 
629
                    if (--num_workers == 0)
 
630
                    {
 
631
                        return;
 
632
                    }
 
633
                }
628
634
            }
629
635
        }
630
636
    }
631
637
    catch (...)
632
638
    {
633
 
        {
634
 
            lock_guard<mutex> lock(state_mutex_);
635
 
            stopper_->stop();
636
 
        }
637
 
        MiddlewareException e("ObjectAdapter: broker thread failure (adapter: " + name_ + ")");
 
639
        MiddlewareException e("ObjectAdapter: pump thread failure (adapter: " + name_ + ")");
638
640
        store_exception(e);
639
641
        // We may not have signaled the parent yet, depending on where things went wrong.
640
642
        try
641
643
        {
642
 
            lock_guard<mutex> lock(ready_mutex_);
643
 
            ready_.set_exception(make_exception_ptr(e));
 
644
            stopper_->stop();
 
645
            ready.set_exception(make_exception_ptr(e));
644
646
        }
645
 
        catch (const future_error&)  // LCOV_EXCL_LINE
 
647
        catch (future_error)  // LCOV_EXCL_LINE
646
648
        {
647
649
        }
648
650
    }
649
651
}
650
652
 
651
 
void ObjectAdapter::worker_thread()
 
653
void ObjectAdapter::worker()
652
654
{
653
655
    try
654
656
    {
655
 
        zmqpp::poller poller;
656
 
 
657
 
        // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
658
 
        zmqpp::socket stop(*mw_.context(), zmqpp::socket_type::subscribe);
659
 
        {
660
 
            lock_guard<mutex> lock(state_mutex_);
661
 
            stop = stopper_->subscribe();
662
 
        }
663
 
        poller.add(stop);
664
 
 
665
 
        auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::reply : zmqpp::socket_type::pull;
666
 
        zmqpp::socket s(*mw_.context(), socket_type);
667
 
        s.set(zmqpp::socket_option::linger, 200);
668
 
        s.connect("inproc://" + name_ + "-worker");
669
 
        poller.add(s);
670
 
 
671
 
        ZmqReceiver receiver(s);
672
 
        ZmqSender sender(s);        // Sender is used only by twoway adapters (zero-cost, the class just remembers the passed socket).
673
 
 
674
 
        if (--num_workers_ == 0)    // Last worker to reach this point notifies the parent that all workers are ready.
675
 
        {
676
 
            lock_guard<mutex> lock(ready_mutex_);
677
 
            ready_.set_value();
678
 
        }
679
 
 
680
 
        Current current;
681
 
        current.adapter = this;     // Stays the same for all invocations, so we set this once only.
682
 
 
683
 
        bool finish = false;
 
657
        zmqpp::socket pump(*mw_.context(), zmqpp::socket_type::req);
 
658
        pump.set(zmqpp::socket_option::linger, 50);
 
659
        pump.connect("inproc://" + name_ + pump_suffix);
 
660
        pump.send("ready");                             // First message tells pump that we are ready.
 
661
 
684
662
        for (;;)
685
663
        {
686
 
            if (finish)
 
664
            string client_address;
 
665
            pump.receive(client_address);
 
666
            if (client_address == "stop")  // pump has decided to stop
687
667
            {
688
 
                s.close();
689
668
                return;
690
669
            }
691
 
 
692
 
            poller.poll();
693
 
            if (!finish && poller.has_input(stop)) // Parent sent a stop message, so we are supposed to go away.
694
 
            {
695
 
                poller.remove(stop);
696
 
                stop.close();
697
 
                finish = true;
698
 
            }
699
 
 
700
 
            if (!finish && poller.has_input(s)) // We stop reading new incoming messages once told to finish.
701
 
            {
702
 
                // Unmarshal the type-independent part of the message (id, category, operation name, mode).
703
 
                capnproto::Request::Reader req;
704
 
                unique_ptr<capnp::SegmentArrayMessageReader> message;
705
 
                try
706
 
                {
707
 
                    // Unmarshal generic part of the message.
708
 
                    auto segments = receiver.receive();
709
 
                    message.reset(new capnp::SegmentArrayMessageReader(segments));
710
 
                    req = message->getRoot<capnproto::Request>();
711
 
                    current.id = req.getId().cStr();
712
 
                    current.category = req.getCat().cStr();
713
 
                    current.op_name = req.getOpName().cStr();
714
 
                    auto mode = req.getMode();
715
 
                    if (current.id.empty() || current.op_name.empty() ||
716
 
                        (mode != capnproto::RequestMode::TWOWAY && mode != capnproto::RequestMode::ONEWAY))
717
 
                    {
718
 
                        if (mode_ == RequestMode::Twoway)
719
 
                        {
720
 
                            capnp::MallocMessageBuilder b;
721
 
                            auto exr = create_unknown_response(b, "Invalid message header");
722
 
                            sender.send(exr);
723
 
                        }
724
 
                        else
725
 
                        {
726
 
                            // TODO: log error
727
 
                            cerr << "ObjectAdapter: invalid oneway message header "
728
 
                                 << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
729
 
                        }
730
 
                        continue;
731
 
                    }
732
 
                    auto expected_mode = mode_ == RequestMode::Twoway ? capnproto::RequestMode::TWOWAY : capnproto::RequestMode::ONEWAY;
733
 
                    if (mode != expected_mode) // Can't do oneway on a twoway adapter and vice-versa.
734
 
                    {
735
 
                        if (mode_ == RequestMode::Twoway)
736
 
                        {
737
 
                            ostringstream s;
738
 
                            s << "ObjectAdapter: oneway invocation sent to twoway adapter "
739
 
                              << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")";
740
 
                            capnp::MallocMessageBuilder b;
741
 
                            auto exr = create_unknown_response(b, s.str());
742
 
                            sender.send(exr);
743
 
                        }
744
 
                        else
745
 
                        {
746
 
                            // TODO: log error
747
 
                            cerr << "ObjectAdapter: twoway invocation sent to oneway adapter "
748
 
                                 << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
749
 
                        }
750
 
                        continue;
751
 
                    }
752
 
                }
753
 
                catch (std::exception const& e)
754
 
                {
755
 
                    // We get here if header unmarshaling failed.
756
 
                    ostringstream s;
757
 
                    s << "ObjectAdapter: error unmarshaling request header "
758
 
                      << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << "): " << e.what();
759
 
                    if (mode_ == RequestMode::Twoway)
760
 
                    {
761
 
                        capnp::MallocMessageBuilder b;
762
 
                        auto exr = create_unknown_response(b, s.str());
763
 
                        sender.send(exr);
764
 
                    }
765
 
                    else
766
 
                    {
767
 
                        // TODO: log error
768
 
                        cerr << s.str() << endl;
769
 
                    }
770
 
                    continue;
771
 
                }
772
 
 
773
 
                // Look for a servant with matching id.
774
 
                shared_ptr<ServantBase> servant;
775
 
                try
776
 
                {
777
 
                    servant = find_servant(current.id, current.category);
778
 
                }
779
 
                catch (std::exception const&)
780
 
                {
781
 
                    // Ignore failure to find servant during destruction phase.
782
 
                }
783
 
 
784
 
                if (!servant)
785
 
                {
786
 
                    if (mode_ == RequestMode::Twoway)
787
 
                    {
788
 
                        capnp::MallocMessageBuilder b;
789
 
                        auto exr = create_object_not_exist_response(b, current);
790
 
                        sender.send(exr);
791
 
                    }
792
 
                    continue;
793
 
                }
794
 
 
795
 
                // We have a target object, so we can ask it to unmarshal the in-params, forward
796
 
                // the invocation to the application-provided method, and to marshal the results.
797
 
                auto in_params = req.getInParams();
798
 
                capnp::MallocMessageBuilder b;
799
 
                auto r = b.initRoot<capnproto::Response>();
800
 
                servant->safe_dispatch_(current, in_params, r); // noexcept
801
 
                if (mode_ == RequestMode::Twoway)
802
 
                {
803
 
                    sender.send(b.getSegmentsForOutput());
804
 
                }
 
670
            string buf;
 
671
            pump.receive(buf);
 
672
            assert(buf.empty());
 
673
 
 
674
            // Any bytes remaining in the input are the marshaled request payload.
 
675
            dispatch(pump, client_address);
 
676
 
 
677
            if (mode_ == RequestMode::Oneway)
 
678
            {
 
679
                // Oneway requests don't have a reply, so we send a dummy "reply" to tell
 
680
                // the pump that we are ready for another request.
 
681
                pump.send("ready");
805
682
            }
806
683
        }
807
684
    }
808
685
    // LCOV_EXCL_START
809
686
    catch (...)
810
687
    {
811
 
        {
812
 
            lock_guard<mutex> lock(state_mutex_);
813
 
            stopper_->stop();  // Fatal error, we need to stop all other workers and the broker.
814
 
        }
 
688
        stopper_->stop();  // Fatal error, we need to stop all other workers and the pump.
815
689
        MiddlewareException e("ObjectAdapter: worker thread failure (adapter: " + name_ + ")");
816
690
        store_exception(e);
817
 
        // We may not have signaled the parent yet, depending on where things went wrong.
818
 
        try
819
 
        {
820
 
            lock_guard<mutex> lock(ready_mutex_);
821
 
            ready_.set_exception(make_exception_ptr(e));
822
 
        }
823
 
        catch (const future_error&)
824
 
        {
825
 
        }
826
691
    }
827
692
    // LCOV_EXCL_STOP
828
693
}
829
694
 
 
695
// Unmarshal input parameters, dispatch to servant and, if this is a twoway request,
 
696
// marshal the results (or exception).
 
697
 
 
698
void ObjectAdapter::dispatch(zmqpp::socket& pump, string const& client_address)
 
699
{
 
700
    ZmqSender sender(pump);    // Unused for oneway requests
 
701
    capnproto::Request::Reader req;
 
702
    Current current;
 
703
    ZmqReceiver receiver(pump);
 
704
    unique_ptr<capnp::SegmentArrayMessageReader> message;
 
705
 
 
706
    try
 
707
    {
 
708
        // Unmarshal the type-independent part of the message (id, category, operation name, mode).
 
709
        auto segments = receiver.receive();
 
710
        message.reset(new capnp::SegmentArrayMessageReader(segments));
 
711
        req = message->getRoot<capnproto::Request>();
 
712
 
 
713
        current.adapter = this;
 
714
        current.id = req.getId().cStr();
 
715
        current.category = req.getCat().cStr();
 
716
        current.op_name = req.getOpName().cStr();
 
717
        auto mode = req.getMode();
 
718
        if (current.id.empty() || current.op_name.empty() ||
 
719
            (mode != capnproto::RequestMode::TWOWAY && mode != capnproto::RequestMode::ONEWAY))
 
720
        {
 
721
            // Something is wrong with the request header.
 
722
            if (mode_ == RequestMode::Twoway)
 
723
            {
 
724
                pump.send(client_address, zmqpp::socket::send_more);
 
725
                pump.send("", zmqpp::socket::send_more);
 
726
                capnp::MallocMessageBuilder b;
 
727
                auto exr = create_unknown_response(b, "Invalid message header");
 
728
                sender.send(exr);
 
729
            }
 
730
            else
 
731
            {
 
732
                // TODO: log error
 
733
                cerr << "ObjectAdapter: invalid oneway message header "
 
734
                     << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
 
735
            }
 
736
            return;
 
737
        }
 
738
        auto expected_mode = mode_ == RequestMode::Twoway ? capnproto::RequestMode::TWOWAY : capnproto::RequestMode::ONEWAY;
 
739
        if (mode != expected_mode) // Can't do oneway on a twoway adapter and vice-versa.
 
740
        {
 
741
            if (mode_ == RequestMode::Twoway)
 
742
            {
 
743
                pump.send(client_address, zmqpp::socket::send_more);
 
744
                pump.send("", zmqpp::socket::send_more);
 
745
                ostringstream s;
 
746
                s << "ObjectAdapter: oneway invocation sent to twoway adapter "
 
747
                  << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")";
 
748
                capnp::MallocMessageBuilder b;
 
749
                auto exr = create_unknown_response(b, s.str());
 
750
                sender.send(exr);
 
751
            }
 
752
            else
 
753
            {
 
754
                // TODO: log error
 
755
                cerr << "ObjectAdapter: twoway invocation sent to oneway adapter "
 
756
                     << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
 
757
            }
 
758
            return;
 
759
        }
 
760
    }
 
761
    catch (std::exception const& e)
 
762
    {
 
763
        // We get here if header unmarshaling failed.
 
764
        ostringstream s;
 
765
        s << "ObjectAdapter: error unmarshaling request header "
 
766
          << "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << "): " << e.what();
 
767
        if (mode_ == RequestMode::Twoway)
 
768
        {
 
769
            pump.send(client_address, zmqpp::socket::send_more);
 
770
            pump.send("", zmqpp::socket::send_more);
 
771
            capnp::MallocMessageBuilder b;
 
772
            auto exr = create_unknown_response(b, s.str());
 
773
            sender.send(exr);
 
774
        }
 
775
        else
 
776
        {
 
777
            // TODO: log error
 
778
            cerr << s.str() << endl;
 
779
        }
 
780
        return;
 
781
    }
 
782
 
 
783
    // Look for a servant with matching id.
 
784
    shared_ptr<ServantBase> servant;
 
785
    try
 
786
    {
 
787
        servant = find_servant(current.id, current.category);
 
788
    }
 
789
    catch (std::exception const&)
 
790
    {
 
791
        // Ignore failure to find servant during destruction phase.
 
792
    }
 
793
 
 
794
    if (!servant)
 
795
    {
 
796
        if (mode_ == RequestMode::Twoway)
 
797
        {
 
798
            pump.send(client_address, zmqpp::socket::send_more);
 
799
            pump.send("", zmqpp::socket::send_more);
 
800
            capnp::MallocMessageBuilder b;
 
801
            auto exr = create_object_not_exist_response(b, current);
 
802
            sender.send(exr);
 
803
        }
 
804
        return;
 
805
    }
 
806
 
 
807
    // We have a target object, so we can ask it to unmarshal the in-params, forward
 
808
    // the invocation to the application-provided method, and to marshal the results.
 
809
    auto in_params = req.getInParams();
 
810
    capnp::MallocMessageBuilder b;
 
811
    auto r = b.initRoot<capnproto::Response>();
 
812
    servant->safe_dispatch_(current, in_params, r); // noexcept
 
813
    if (mode_ == RequestMode::Twoway)
 
814
    {
 
815
        pump.send(client_address, zmqpp::socket::send_more);
 
816
        pump.send("", zmqpp::socket::send_more);
 
817
        sender.send(b.getSegmentsForOutput());
 
818
    }
 
819
}
 
820
 
830
821
void ObjectAdapter::cleanup()
831
822
{
832
823
    join_with_all_threads();
849
840
            w.join();
850
841
        }
851
842
    }
852
 
    if (broker_.joinable())
 
843
    if (pump_.joinable())
853
844
    {
854
 
        broker_.join();
 
845
        pump_.join();
855
846
    }
856
847
}
857
848