517
void ObjectAdapter::broker_thread()
481
// Load-balancing message pump using router-router sockets (for twoway) or
482
// pull-router (for oneway). Loosely follows the "Load Balancing Broker" in the Zmq Guide.
483
// (Workers use a REQ socket to pull work from the pump.)
485
void ObjectAdapter::pump(std::promise<void> ready)
489
zmqpp::poller poller;
491
zmqpp::socket_type stype = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull;
493
// Front-end receives incoming requests from clients. (Frontend is in the server role.)
494
zmqpp::socket frontend(*mw_.context(), stype);
495
frontend.set(zmqpp::socket_option::linger, 50);
496
// "Safe" bind: prevents two servers from binding to the same endpoint.
497
safe_bind(frontend, endpoint_);
498
// We do not add the frontend to the poller just yet because we poll the frontend
499
// only when workers are ready to process a request.
501
// The backend replies to requests from workers for more work. (Backend is in the server role.)
502
// This means that our *reply* to the request from the worker contains the incoming
503
// request from the client, and the next *request* from the worker contains the result that
504
// is to go back to the client via the frontend (or the ready message for a oneway worker).
505
zmqpp::socket backend(*mw_.context(), zmqpp::socket_type::router);
506
backend.set(zmqpp::socket_option::linger, 50);
507
backend.bind("inproc://" + name_ + pump_suffix);
521
510
// Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
522
zmqpp::socket stop(*mw_.context(), zmqpp::socket_type::subscribe);
524
lock_guard<mutex> state_lock(state_mutex_);
525
stop = stopper_->subscribe();
528
// Set up message pump. Router-dealer for twoway adapter, pull-push for oneway adapter.
529
auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull;
530
zmqpp::socket frontend(*mw_.context(), socket_type);
532
socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::dealer : zmqpp::socket_type::push;
533
zmqpp::socket backend(*mw_.context(), socket_type);
535
zmqpp::poller poller;
541
frontend.set(zmqpp::socket_option::linger, 200);
542
// "Safe" bind: prevents two servers from binding to the same endpoint.
543
safe_bind(frontend, endpoint_);
544
poller.add(frontend);
546
backend.set(zmqpp::socket_option::linger, 200);
547
backend.bind("inproc://" + name_ + "-worker");
550
// Tell parent that we are ready
552
lock_guard<mutex> lock(ready_mutex_);
557
catch (...) // LCOV_EXCL_LINE
561
lock_guard<mutex> lock(state_mutex_);
564
lock_guard<mutex> lock(ready_mutex_);
565
ready_.set_exception(current_exception());
570
int pending_requests = 0;
511
auto stop = stopper_->subscribe();
514
// Fire up the worker threads.
516
for (int i = 0; i < pool_size_; ++i)
518
workers_.push_back(thread(&ObjectAdapter::worker, this));
522
// Tell parent that we are ready
571
526
bool shutting_down = false;
527
queue<string> ready_workers;
574
zmqpp::message message;
575
// Poll for incoming messages. If a timeout has been set, and no incoming messages are received for
576
// idle_timeout_ milliseconds, we shutdown the server.
577
531
if (!poller.poll(idle_timeout_))
579
// Shutdown this server by stopping the middleware.
533
// Shut down, no activity for the idle timeout period.
582
536
if (!shutting_down && poller.has_input(stop))
584
538
// When the stop socket becomes ready, we need to get out of here.
585
539
// We stop reading more requests from the router, but continue processing
586
// while there are still replies outstanding.
540
// while there are still outstanding replies for twoway requests.
587
541
poller.remove(stop);
543
if (poller.has(frontend))
545
poller.remove(frontend); // Don't accept any more request from clients.
589
547
shutting_down = true;
591
if (!shutting_down && poller.has_input(frontend)) // Once shutting down, we no longer read incoming messages.
549
if (poller.has_input(backend))
551
// A worker is asking for more work to do.
553
backend.receive(worker_id); // First frame: worker ID for LRU routing
554
ready_workers.push(worker_id); // Thread will be ready again in a sec
555
if (!shutting_down && ready_workers.size() == 1)
557
// We poll the front end when there is at least one worker
558
poller.add(frontend);
561
backend.receive(buf); // Second frame: empty delimiter frame
563
string client_address;
564
backend.receive(client_address); // Third frame: "ready" or client reply address
565
if (client_address != "ready")
567
backend.receive(buf); // Fourth frame: empty delimiter frame
569
if (mode_ == RequestMode::Twoway)
571
frontend.send(client_address, zmqpp::socket::send_more); // Client address tells router where to send the reply to
572
frontend.send("", zmqpp::socket::send_more); // Empty delimiter frame
573
// Read reply contents and return them to client via frontend
577
backend.receive(buf);
578
flag = backend.has_more_parts() ? zmqpp::socket::send_more : zmqpp::socket::normal;
579
frontend.send(buf, flag);
580
} while (flag == zmqpp::socket::send_more);
584
if (!shutting_down && poller.has(frontend) && poller.has_input(frontend))
586
// Incoming request from client.
587
string client_address;
588
if (mode_ == RequestMode::Twoway)
590
frontend.receive(client_address); // First frame: client address
592
frontend.receive(buf); // Second frame: empty delimiter frame
595
string worker_id = ready_workers.front();
597
if (ready_workers.size() == 0) // Stop reading from frontend once all workers are busy.
599
poller.remove(frontend);
602
// Give incoming request to worker.
603
backend.send(worker_id, zmqpp::socket::send_more);
604
backend.send("", zmqpp::socket::send_more);
605
backend.send(client_address, zmqpp::socket::send_more);
606
backend.send("", zmqpp::socket::send_more);
596
// This is the actual message pump. We read an incoming request and pass it to one of the workers
597
// for processing. The dealer socket ensures fair sharing.
598
frontend.receive(message);
611
frontend.receive(buf);
599
612
flag = frontend.has_more_parts() ? zmqpp::socket::send_more : zmqpp::socket::normal;
600
backend.send(message, flag);
602
while (flag == zmqpp::socket::send_more);
603
if (mode_ == RequestMode::Twoway)
605
++pending_requests; // Only twoway requests require us to wait for replies
608
if (pending_requests != 0 && poller.has_input(backend))
610
// We need to read a reply for an earlier request.
614
// Message pump in the opposite direction, for replies from server to client.
615
backend.receive(message);
616
flag = backend.has_more_parts() ? zmqpp::socket::send_more : zmqpp::socket::normal;
617
frontend.send(message, flag);
619
while (flag == zmqpp::socket::send_more);
620
assert(pending_requests > 0);
623
if (shutting_down && pending_requests == 0)
613
backend.send(buf, flag);
614
} while (flag == zmqpp::socket::send_more);
618
// Tell each worker that is ready to stop. This automatically
619
// "waits" for executing twoway requests to complete because
620
// a worker that's doing work isn't ready. Once all workers
621
// have been told to stop, we are done.
622
while (ready_workers.size() > 0)
624
string worker_id = ready_workers.front();
626
backend.send(worker_id, zmqpp::socket::send_more);
627
backend.send("", zmqpp::socket::send_more);
628
backend.send("stop");
629
if (--num_workers == 0)
634
lock_guard<mutex> lock(state_mutex_);
637
MiddlewareException e("ObjectAdapter: broker thread failure (adapter: " + name_ + ")");
639
MiddlewareException e("ObjectAdapter: pump thread failure (adapter: " + name_ + ")");
638
640
store_exception(e);
639
641
// We may not have signaled the parent yet, depending on where things went wrong.
642
lock_guard<mutex> lock(ready_mutex_);
643
ready_.set_exception(make_exception_ptr(e));
645
ready.set_exception(make_exception_ptr(e));
645
catch (const future_error&) // LCOV_EXCL_LINE
647
catch (future_error) // LCOV_EXCL_LINE
651
void ObjectAdapter::worker_thread()
653
void ObjectAdapter::worker()
655
zmqpp::poller poller;
657
// Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
658
zmqpp::socket stop(*mw_.context(), zmqpp::socket_type::subscribe);
660
lock_guard<mutex> lock(state_mutex_);
661
stop = stopper_->subscribe();
665
auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::reply : zmqpp::socket_type::pull;
666
zmqpp::socket s(*mw_.context(), socket_type);
667
s.set(zmqpp::socket_option::linger, 200);
668
s.connect("inproc://" + name_ + "-worker");
671
ZmqReceiver receiver(s);
672
ZmqSender sender(s); // Sender is used only by twoway adapters (zero-cost, the class just remembers the passed socket).
674
if (--num_workers_ == 0) // Last worker to reach this point notifies the parent that all workers are ready.
676
lock_guard<mutex> lock(ready_mutex_);
681
current.adapter = this; // Stays the same for all invocations, so we set this once only.
657
zmqpp::socket pump(*mw_.context(), zmqpp::socket_type::req);
658
pump.set(zmqpp::socket_option::linger, 50);
659
pump.connect("inproc://" + name_ + pump_suffix);
660
pump.send("ready"); // First message tells pump that we are ready.
664
string client_address;
665
pump.receive(client_address);
666
if (client_address == "stop") // pump has decided to stop
693
if (!finish && poller.has_input(stop)) // Parent sent a stop message, so we are supposed to go away.
700
if (!finish && poller.has_input(s)) // We stop reading new incoming messages once told to finish.
702
// Unmarshal the type-independent part of the message (id, category, operation name, mode).
703
capnproto::Request::Reader req;
704
unique_ptr<capnp::SegmentArrayMessageReader> message;
707
// Unmarshal generic part of the message.
708
auto segments = receiver.receive();
709
message.reset(new capnp::SegmentArrayMessageReader(segments));
710
req = message->getRoot<capnproto::Request>();
711
current.id = req.getId().cStr();
712
current.category = req.getCat().cStr();
713
current.op_name = req.getOpName().cStr();
714
auto mode = req.getMode();
715
if (current.id.empty() || current.op_name.empty() ||
716
(mode != capnproto::RequestMode::TWOWAY && mode != capnproto::RequestMode::ONEWAY))
718
if (mode_ == RequestMode::Twoway)
720
capnp::MallocMessageBuilder b;
721
auto exr = create_unknown_response(b, "Invalid message header");
727
cerr << "ObjectAdapter: invalid oneway message header "
728
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
732
auto expected_mode = mode_ == RequestMode::Twoway ? capnproto::RequestMode::TWOWAY : capnproto::RequestMode::ONEWAY;
733
if (mode != expected_mode) // Can't do oneway on a twoway adapter and vice-versa.
735
if (mode_ == RequestMode::Twoway)
738
s << "ObjectAdapter: oneway invocation sent to twoway adapter "
739
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")";
740
capnp::MallocMessageBuilder b;
741
auto exr = create_unknown_response(b, s.str());
747
cerr << "ObjectAdapter: twoway invocation sent to oneway adapter "
748
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
753
catch (std::exception const& e)
755
// We get here if header unmarshaling failed.
757
s << "ObjectAdapter: error unmarshaling request header "
758
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << "): " << e.what();
759
if (mode_ == RequestMode::Twoway)
761
capnp::MallocMessageBuilder b;
762
auto exr = create_unknown_response(b, s.str());
768
cerr << s.str() << endl;
773
// Look for a servant with matching id.
774
shared_ptr<ServantBase> servant;
777
servant = find_servant(current.id, current.category);
779
catch (std::exception const&)
781
// Ignore failure to find servant during destruction phase.
786
if (mode_ == RequestMode::Twoway)
788
capnp::MallocMessageBuilder b;
789
auto exr = create_object_not_exist_response(b, current);
795
// We have a target object, so we can ask it to unmarshal the in-params, forward
796
// the invocation to the application-provided method, and to marshal the results.
797
auto in_params = req.getInParams();
798
capnp::MallocMessageBuilder b;
799
auto r = b.initRoot<capnproto::Response>();
800
servant->safe_dispatch_(current, in_params, r); // noexcept
801
if (mode_ == RequestMode::Twoway)
803
sender.send(b.getSegmentsForOutput());
674
// Any bytes remaining in the input are the marshaled request payload.
675
dispatch(pump, client_address);
677
if (mode_ == RequestMode::Oneway)
679
// Oneway requests don't have a reply, so we send a dummy "reply" to tell
680
// the pump that we are ready for another request.
808
685
// LCOV_EXCL_START
812
lock_guard<mutex> lock(state_mutex_);
813
stopper_->stop(); // Fatal error, we need to stop all other workers and the broker.
688
stopper_->stop(); // Fatal error, we need to stop all other workers and the pump.
815
689
MiddlewareException e("ObjectAdapter: worker thread failure (adapter: " + name_ + ")");
816
690
store_exception(e);
817
// We may not have signaled the parent yet, depending on where things went wrong.
820
lock_guard<mutex> lock(ready_mutex_);
821
ready_.set_exception(make_exception_ptr(e));
823
catch (const future_error&)
827
692
// LCOV_EXCL_STOP
695
// Unmarshal input parameters, dispatch to servant and, if this is a twoway request,
696
// marshal the results (or exception).
698
void ObjectAdapter::dispatch(zmqpp::socket& pump, string const& client_address)
700
ZmqSender sender(pump); // Unused for oneway requests
701
capnproto::Request::Reader req;
703
ZmqReceiver receiver(pump);
704
unique_ptr<capnp::SegmentArrayMessageReader> message;
708
// Unmarshal the type-independent part of the message (id, category, operation name, mode).
709
auto segments = receiver.receive();
710
message.reset(new capnp::SegmentArrayMessageReader(segments));
711
req = message->getRoot<capnproto::Request>();
713
current.adapter = this;
714
current.id = req.getId().cStr();
715
current.category = req.getCat().cStr();
716
current.op_name = req.getOpName().cStr();
717
auto mode = req.getMode();
718
if (current.id.empty() || current.op_name.empty() ||
719
(mode != capnproto::RequestMode::TWOWAY && mode != capnproto::RequestMode::ONEWAY))
721
// Something is wrong with the request header.
722
if (mode_ == RequestMode::Twoway)
724
pump.send(client_address, zmqpp::socket::send_more);
725
pump.send("", zmqpp::socket::send_more);
726
capnp::MallocMessageBuilder b;
727
auto exr = create_unknown_response(b, "Invalid message header");
733
cerr << "ObjectAdapter: invalid oneway message header "
734
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
738
auto expected_mode = mode_ == RequestMode::Twoway ? capnproto::RequestMode::TWOWAY : capnproto::RequestMode::ONEWAY;
739
if (mode != expected_mode) // Can't do oneway on a twoway adapter and vice-versa.
741
if (mode_ == RequestMode::Twoway)
743
pump.send(client_address, zmqpp::socket::send_more);
744
pump.send("", zmqpp::socket::send_more);
746
s << "ObjectAdapter: oneway invocation sent to twoway adapter "
747
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")";
748
capnp::MallocMessageBuilder b;
749
auto exr = create_unknown_response(b, s.str());
755
cerr << "ObjectAdapter: twoway invocation sent to oneway adapter "
756
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << ")" << endl;
761
catch (std::exception const& e)
763
// We get here if header unmarshaling failed.
765
s << "ObjectAdapter: error unmarshaling request header "
766
<< "(id: " << current.id << ", adapter: " << name_ << ", op: " << current.op_name << "): " << e.what();
767
if (mode_ == RequestMode::Twoway)
769
pump.send(client_address, zmqpp::socket::send_more);
770
pump.send("", zmqpp::socket::send_more);
771
capnp::MallocMessageBuilder b;
772
auto exr = create_unknown_response(b, s.str());
778
cerr << s.str() << endl;
783
// Look for a servant with matching id.
784
shared_ptr<ServantBase> servant;
787
servant = find_servant(current.id, current.category);
789
catch (std::exception const&)
791
// Ignore failure to find servant during destruction phase.
796
if (mode_ == RequestMode::Twoway)
798
pump.send(client_address, zmqpp::socket::send_more);
799
pump.send("", zmqpp::socket::send_more);
800
capnp::MallocMessageBuilder b;
801
auto exr = create_object_not_exist_response(b, current);
807
// We have a target object, so we can ask it to unmarshal the in-params, forward
808
// the invocation to the application-provided method, and to marshal the results.
809
auto in_params = req.getInParams();
810
capnp::MallocMessageBuilder b;
811
auto r = b.initRoot<capnproto::Response>();
812
servant->safe_dispatch_(current, in_params, r); // noexcept
813
if (mode_ == RequestMode::Twoway)
815
pump.send(client_address, zmqpp::socket::send_more);
816
pump.send("", zmqpp::socket::send_more);
817
sender.send(b.getSegmentsForOutput());
830
821
void ObjectAdapter::cleanup()
832
823
join_with_all_threads();