~mhr3/unity-scopes-api/merge-trunk

« back to all changes in this revision

Viewing changes to src/scopes/internal/zmq_middleware/ObjectAdapter.cpp

  • Committer: Tarmac
  • Author(s): Michi Henning
  • Date: 2014-07-29 14:34:50 UTC
  • mfrom: (420.3.9 fix-various-races)
  • Revision ID: tarmac-20140729143450-9icxsr7ypfqqgn28
Fixed a bunch of issues:

- AnnotationImpl was relying on an uninitialized value when instantiated with a VariantMap without an annotation type.

- Excluded header tests for unity::scopes::testing namespace from valgrind (as is the case for all the other public header tests).

- Excluded the testing benchmark from valgrind target. The test fails under valgrind because things are too slow, and valgrind crashes on one of the test cases.

- Added suppression to valgrind for leaky u1db_open() implementation.

- Fixed race conditions in ObjectAdapter that could cause the destructor to operate on stale memory.

- Similar fix for destructor race in StopPublisher, which caused a thread leak.

- Fixed race condition in ZmqMiddleware that could cause adapters to not be destroyed fully by the time wait_for_shutdown() returned. This was an issue only in the pathological case where a scope's search() method took longer to complete than the server time-out.

- The RegistryI test passes with valgrind with gcc 4.8, but valgrind crashes on it with gcc 4.9. I've left the test enabled for the time being until we can have a closer look.

- Added linger timeout to outgoing sockets for oneway invocations. This almost certainly was responsible for the annoying test failures we we were getting on Jenkins under heavy load.

Approved by PS Jenkins bot, Marcus Tomlinson.

Show diffs side-by-side

added added

removed removed

Lines of Context:
389
389
    // We join with threads and call servant destructors outside synchronization.
390
390
    call_once(once_, [this](){ this->cleanup(); });
391
391
 
 
392
    {
 
393
        unique_lock<mutex> lock(state_mutex_);
 
394
        stopper_ = nullptr;
 
395
    }
 
396
 
392
397
    if (state == Failed)
393
398
    {
394
399
        throw_bad_state("wait_for_shutdown()", state);
487
492
        // LCOV_EXCL_START
488
493
        catch (...) //
489
494
        {
 
495
            lock_guard<mutex> state_lock(state_mutex_);
490
496
            stopper_->stop();
491
497
            throw MiddlewareException("ObjectAdapter::run_workers(): worker thread failure (adapter: " + name_ + ")");
492
498
        }
509
515
    try
510
516
    {
511
517
        // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
512
 
        auto stop = stopper_->subscribe();
 
518
        zmqpp::socket stop(*mw_.context(), zmqpp::socket_type::subscribe);
 
519
        {
 
520
            lock_guard<mutex> state_lock(state_mutex_);
 
521
            stop = stopper_->subscribe();
 
522
        }
513
523
 
514
524
        // Set up message pump. Router-dealer for twoway adapter, pull-push for oneway adapter.
515
525
        auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull;
633
643
        zmqpp::poller poller;
634
644
 
635
645
        // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
636
 
        auto stop = stopper_->subscribe();
 
646
        zmqpp::socket stop(*mw_.context(), zmqpp::socket_type::subscribe);
 
647
        {
 
648
            lock_guard<mutex> lock(state_mutex_);
 
649
            stop = stopper_->subscribe();
 
650
        }
637
651
        poller.add(stop);
638
652
 
639
653
        auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::reply : zmqpp::socket_type::pull;
782
796
    // LCOV_EXCL_START
783
797
    catch (...)
784
798
    {
785
 
        stopper_->stop();  // Fatal error, we need to stop all other workers and the broker.
 
799
        {
 
800
            lock_guard<mutex> lock(state_mutex_);
 
801
            stopper_->stop();  // Fatal error, we need to stop all other workers and the broker.
 
802
        }
786
803
        MiddlewareException e("ObjectAdapter: worker thread failure (adapter: " + name_ + ")");
787
804
        store_exception(e);
788
805
        // We may not have signaled the parent yet, depending on where things went wrong.
801
818
void ObjectAdapter::cleanup()
802
819
{
803
820
    join_with_all_threads();
 
821
    {
 
822
        // Need a full fence here to make sure this thread sees up-to-date
 
823
        // memory for servants_ and dflt_servants_.
 
824
        lock_guard<mutex> lock(map_mutex_);
 
825
    }
 
826
    // Don't hold a lock while the servant destructors run.
804
827
    servants_.clear();
805
828
    dflt_servants_.clear();
806
829
}