2
* Copyright © 2015 Canonical Ltd.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU General Public License version 3 as
6
* published by the Free Software Foundation.
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
* GNU General Public License for more details.
13
* You should have received a copy of the GNU General Public License
14
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16
* Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
19
#include "mir/dispatch/multiplexing_dispatchable.h"
20
#include "mir/dispatch/simple_dispatch_thread.h"
22
#include "mir_test/pipe.h"
23
#include "mir_test/signal.h"
24
#include "mir_test/fd_utils.h"
25
#include "mir_test/test_dispatchable.h"
26
#include "mir_test/auto_unblock_thread.h"
33
#include <gtest/gtest.h>
34
#include <gmock/gmock.h>
36
namespace md = mir::dispatch;
37
namespace mt = mir::test;
39
TEST(MultiplexingDispatchableTest, dispatches_dispatchee_when_appropriate)
41
bool dispatched{false};
42
auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
43
md::MultiplexingDispatchable dispatcher{dispatchee};
45
dispatchee->trigger();
47
ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
48
dispatcher.dispatch(md::FdEvent::readable);
50
EXPECT_TRUE(dispatched);
53
TEST(MultiplexingDispatchableTest, calls_correct_dispatchee_when_fd_becomes_readable)
55
bool a_dispatched{false};
56
auto dispatchee_a = std::make_shared<mt::TestDispatchable>([&a_dispatched]() { a_dispatched = true; });
58
bool b_dispatched{false};
59
auto dispatchee_b = std::make_shared<mt::TestDispatchable>([&b_dispatched]() { b_dispatched = true; });
61
md::MultiplexingDispatchable dispatcher{dispatchee_a, dispatchee_b};
63
dispatchee_a->trigger();
65
ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
66
dispatcher.dispatch(md::FdEvent::readable);
68
EXPECT_TRUE(a_dispatched);
69
EXPECT_FALSE(b_dispatched);
71
ASSERT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
74
dispatchee_b->trigger();
76
ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
77
dispatcher.dispatch(md::FdEvent::readable);
79
EXPECT_FALSE(a_dispatched);
80
EXPECT_TRUE(b_dispatched);
83
TEST(MultiplexingDispatchableTest, keeps_dispatching_until_fd_is_unreadable)
85
bool dispatched{false};
86
auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
87
md::MultiplexingDispatchable dispatcher{dispatchee};
89
int const trigger_count{10};
91
for (int i = 0; i < trigger_count; ++i)
93
dispatchee->trigger();
96
for (int i = 0; i < trigger_count; ++i)
98
ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
99
dispatcher.dispatch(md::FdEvent::readable);
101
EXPECT_TRUE(dispatched);
105
EXPECT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
108
TEST(MultiplexingDispatchableTest, dispatching_without_pending_event_is_harmless)
110
bool dispatched{false};
111
auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
112
md::MultiplexingDispatchable dispatcher{dispatchee};
114
dispatcher.dispatch(md::FdEvent::readable);
116
EXPECT_FALSE(dispatched);
119
TEST(MultiplexingDispatchableTest, keeps_dispatchees_alive)
121
bool dispatched{false};
122
auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
123
dispatchee->trigger();
125
md::MultiplexingDispatchable dispatcher;
126
dispatcher.add_watch(dispatchee);
129
ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
130
dispatcher.dispatch(md::FdEvent::readable);
132
EXPECT_TRUE(dispatched);
135
TEST(MultiplexingDispatchableTest, removed_dispatchables_are_no_longer_dispatched)
137
using namespace testing;
141
bool dispatched{false};
142
auto dispatchable = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
144
md::MultiplexingDispatchable dispatcher;
145
dispatcher.add_watch(dispatchable);
146
dispatcher.remove_watch(dispatchable);
148
while (mt::fd_is_readable(dispatcher.watch_fd()))
150
dispatcher.dispatch(md::FdEvent::readable);
153
dispatchable->trigger();
155
EXPECT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
156
dispatcher.dispatch(md::FdEvent::readable);
158
EXPECT_FALSE(dispatched);
161
TEST(MultiplexingDispatchableTest, adding_same_fd_twice_is_an_error)
163
using namespace testing;
165
auto dispatchable = std::make_shared<mt::TestDispatchable>([](){});
167
md::MultiplexingDispatchable dispatcher;
168
dispatcher.add_watch(dispatchable);
170
EXPECT_THROW(dispatcher.add_watch(dispatchable),
174
TEST(MultiplexingDispatchableTest, dispatcher_does_not_hold_reference_after_failing_to_add_dispatchee)
176
using namespace testing;
178
auto dispatchable = std::make_shared<mt::TestDispatchable>([](){});
180
md::MultiplexingDispatchable dispatcher;
181
dispatcher.add_watch(dispatchable);
183
auto const dispatchable_refcount = dispatchable.use_count();
185
// This should not increase refcount
186
EXPECT_THROW(dispatcher.add_watch(dispatchable),
188
EXPECT_THAT(dispatchable.use_count(), Eq(dispatchable_refcount));
191
TEST(MultiplexingDispatchableTest, individual_dispatchee_is_not_concurrent)
193
using namespace testing;
195
auto second_dispatch = std::make_shared<mt::Signal>();
196
auto dispatchee = std::make_shared<mt::TestDispatchable>([second_dispatch]()
198
static std::atomic<int> canary{0};
199
static std::atomic<int> total_count{0};
202
EXPECT_THAT(canary, Eq(1));
203
if (++total_count == 2)
205
second_dispatch->raise();
209
std::this_thread::sleep_for(std::chrono::seconds{1});
214
dispatchee->trigger();
215
dispatchee->trigger();
217
auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
218
dispatcher->add_watch(dispatchee);
220
md::SimpleDispatchThread first_loop{dispatcher};
221
md::SimpleDispatchThread second_loop{dispatcher};
223
EXPECT_TRUE(second_dispatch->wait_for(std::chrono::seconds{5}));
226
TEST(MultiplexingDispatchableTest, reentrant_dispatchee_is_dispatched_concurrently)
228
using namespace testing;
230
std::atomic<int> count{0};
232
auto dispatchee = std::make_shared<mt::TestDispatchable>([&count]()
235
std::this_thread::sleep_for(std::chrono::seconds{1});
236
EXPECT_THAT(count, Gt(1));
239
dispatchee->trigger();
240
dispatchee->trigger();
242
md::MultiplexingDispatchable dispatcher;
243
dispatcher.add_watch(dispatchee, md::DispatchReentrancy::reentrant);
245
std::thread first{[&dispatcher]() { dispatcher.dispatch(md::FdEvent::readable); }};
246
std::thread second{[&dispatcher]() { dispatcher.dispatch(md::FdEvent::readable); }};
252
TEST(MultiplexingDispatchableTest, raw_callback_is_dispatched)
254
using namespace testing;
256
bool dispatched{false};
257
auto dispatchee = [&dispatched]() { dispatched = true; };
260
md::MultiplexingDispatchable dispatcher;
261
dispatcher.add_watch(fd_source.read_fd(), dispatchee);
264
ASSERT_THAT(::write(fd_source.write_fd(), &buffer, sizeof(buffer)), Eq(sizeof(buffer)));
266
EXPECT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
267
dispatcher.dispatch(md::FdEvent::readable);
269
EXPECT_TRUE(dispatched);
272
TEST(MultiplexingDispatchableTest, raw_callback_can_be_removed)
274
using namespace testing;
276
bool dispatched{false};
277
auto dispatchee = [&dispatched]() { dispatched = true; };
280
md::MultiplexingDispatchable dispatcher;
281
dispatcher.add_watch(fd_source.read_fd(), dispatchee);
282
dispatcher.remove_watch(fd_source.read_fd());
284
while (mt::fd_is_readable(dispatcher.watch_fd()))
286
dispatcher.dispatch(md::FdEvent::readable);
290
ASSERT_THAT(::write(fd_source.write_fd(), &buffer, sizeof(buffer)), Eq(sizeof(buffer)));
292
EXPECT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
293
dispatcher.dispatch(md::FdEvent::readable);
295
EXPECT_FALSE(dispatched);
298
TEST(MultiplexingDispatchableTest, removal_is_threadsafe)
300
using namespace testing;
302
auto canary_killed = std::make_shared<mt::Signal>();
303
auto canary = std::shared_ptr<int>(new int, [canary_killed](int* victim) { delete victim; canary_killed->raise(); });
304
auto in_dispatch = std::make_shared<mt::Signal>();
306
auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
308
auto dispatchee = std::make_shared<mt::TestDispatchable>([canary, in_dispatch]()
310
in_dispatch->raise();
311
std::this_thread::sleep_for(std::chrono::seconds{1});
312
EXPECT_THAT(canary.use_count(), Gt(0));
314
dispatcher->add_watch(dispatchee);
316
dispatchee->trigger();
318
md::SimpleDispatchThread eventloop{dispatcher};
320
EXPECT_TRUE(in_dispatch->wait_for(std::chrono::seconds{1}));
322
dispatcher->remove_watch(dispatchee);
326
EXPECT_TRUE(canary_killed->wait_for(std::chrono::seconds{2}));
329
TEST(MultiplexingDispatchableTest, destruction_is_threadsafe)
331
using namespace testing;
333
auto canary_killed = std::make_shared<mt::Signal>();
334
auto canary = std::shared_ptr<int>(new int, [canary_killed](int* victim) { delete victim; canary_killed->raise(); });
335
auto in_dispatch = std::make_shared<mt::Signal>();
337
auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
339
auto dispatchee = std::make_shared<mt::TestDispatchable>([canary, in_dispatch]()
341
in_dispatch->raise();
342
std::this_thread::sleep_for(std::chrono::seconds{1});
343
EXPECT_THAT(canary.use_count(), Gt(0));
345
dispatcher->add_watch(dispatchee);
347
dispatchee->trigger();
349
mt::AutoJoinThread dispatch_thread{[dispatcher]() { dispatcher->dispatch(md::FdEvent::readable); }};
351
EXPECT_TRUE(in_dispatch->wait_for(std::chrono::seconds{1}));
353
dispatcher->remove_watch(dispatchee);
358
EXPECT_TRUE(canary_killed->wait_for(std::chrono::seconds{2}));
361
TEST(MultiplexingDispatchableTest, stress_test_threading)
363
using namespace testing;
365
int const dispatchee_count{20};
367
auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
369
std::vector<std::shared_ptr<md::SimpleDispatchThread>> eventloops;
370
for (int i = 0 ; i < dispatchee_count + 5 ; ++i)
372
eventloops.push_back(std::make_shared<md::SimpleDispatchThread>(dispatcher));
375
std::vector<std::shared_ptr<mt::Signal>> canary_tomb;
376
std::vector<std::shared_ptr<mt::TestDispatchable>> dispatchees;
377
for (int i = 0 ; i < dispatchee_count ; ++i)
379
canary_tomb.push_back(std::make_shared<mt::Signal>());
380
auto current_canary = canary_tomb.back();
381
auto canary = std::shared_ptr<int>(new int, [current_canary](int* victim) { delete victim; current_canary->raise(); });
382
auto dispatchee = std::make_shared<mt::TestDispatchable>([canary]()
384
std::this_thread::sleep_for(std::chrono::seconds{1});
385
EXPECT_THAT(canary.use_count(), Gt(0));
387
dispatcher->add_watch(dispatchee, md::DispatchReentrancy::reentrant);
389
dispatchee->trigger();
392
for (auto& dispatchee : dispatchees)
394
dispatchee->trigger();
395
dispatcher->remove_watch(dispatchee);
402
for (auto headstone : canary_tomb)
404
// Use assert so as to not block for *ages* on failure
405
ASSERT_TRUE(headstone->wait_for(std::chrono::seconds{2}));
409
TEST(MultiplexingDispatchableTest, removes_dispatchable_that_returns_false_from_dispatch)
411
bool dispatched{false};
412
auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched](md::FdEvents)
417
md::MultiplexingDispatchable dispatcher{dispatchee};
419
dispatchee->trigger();
420
dispatchee->trigger();
422
ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
423
dispatcher.dispatch(md::FdEvent::readable);
425
EXPECT_TRUE(dispatched);
428
while (mt::fd_is_readable(dispatcher.watch_fd()))
430
dispatcher.dispatch(md::FdEvent::readable);
433
EXPECT_FALSE(dispatched);
436
TEST(MultiplexingDispatchableTest, multiple_removals_are_threadsafe)
438
using namespace testing;
440
auto canary_killed = std::make_shared<mt::Signal>();
441
auto canary = std::shared_ptr<int>(new int, [canary_killed](int* victim) { delete victim; canary_killed->raise(); });
442
auto in_dispatch = std::make_shared<mt::Signal>();
443
auto unblock_dispatchee = std::make_shared<mt::Signal>();
445
auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
447
auto first_dispatchee = std::make_shared<mt::TestDispatchable>([canary, in_dispatch, unblock_dispatchee]()
449
in_dispatch->raise();
450
EXPECT_TRUE(unblock_dispatchee->wait_for(std::chrono::seconds{5}));
451
EXPECT_THAT(canary.use_count(), Gt(0));
453
auto dummy_dispatchee = std::make_shared<mt::TestDispatchable>([](){});
454
dispatcher->add_watch(first_dispatchee);
455
dispatcher->add_watch(dummy_dispatchee);
457
first_dispatchee->trigger();
459
md::SimpleDispatchThread eventloop_one{dispatcher};
460
md::SimpleDispatchThread eventloop_two{dispatcher};
462
EXPECT_TRUE(in_dispatch->wait_for(std::chrono::seconds{1}));
464
dispatcher->remove_watch(dummy_dispatchee);
465
dispatcher->remove_watch(first_dispatchee);
467
first_dispatchee.reset();
468
dummy_dispatchee.reset();
471
unblock_dispatchee->raise();
473
EXPECT_TRUE(canary_killed->wait_for(std::chrono::seconds{2}));
476
TEST(MultiplexingDispatchableTest, automatic_removals_are_threadsafe)
478
auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
480
auto dispatchee = std::make_shared<mt::TestDispatchable>([](md::FdEvents) { return false; });
482
dispatcher->add_watch(dispatchee, md::DispatchReentrancy::reentrant);
484
md::SimpleDispatchThread one{dispatcher}, two{dispatcher}, three{dispatcher}, four{dispatcher};
486
dispatchee->trigger();