2
* Copyright © 2013 Canonical Ltd.
4
* This program is free software: you can redistribute it and/or modify it
5
* under the terms of the GNU General Public License version 3,
6
* as published by the Free Software Foundation.
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
* GNU General Public License for more details.
13
* You should have received a copy of the GNU General Public License
14
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16
* Authored by: Alexandros Frantzis <alexandros.frantzis@canonical.com>
19
#include "mir/asio_main_loop.h"
20
#include "mir/time/clock.h"
22
#include "boost/date_time/posix_time/conversion.hpp"
26
#include <condition_variable>
31
void synchronous_server_action(
32
mir::ServerActionQueue& queue,
33
boost::optional<std::thread::id> queue_thread_id,
34
mir::ServerAction const& action)
36
if (!queue_thread_id || *queue_thread_id == std::this_thread::get_id())
38
try { action(); } catch(...) {}
42
std::mutex done_mutex;
44
std::condition_variable done_condition;
46
queue.enqueue(&queue, [&]
48
std::lock_guard<std::mutex> lock(done_mutex);
50
try { action(); } catch(...) {}
53
done_condition.notify_one();
56
std::unique_lock<std::mutex> lock(done_mutex);
57
done_condition.wait(lock, [&] { return done; });
61
struct MirClockTimerTraits
63
// TODO the clock used by the main loop is a global setting, this is a restriction
64
// of boost::asio only allowing static methods inside the taits type.
65
struct TimerServiceClockStorage
68
void set_clock(std::shared_ptr<mir::time::Clock const> const& clock)
70
std::lock_guard<std::mutex> lock(timer_service_mutex);
71
auto stored_clock = timer_service_clock.lock();
72
if (stored_clock && stored_clock != clock)
73
BOOST_THROW_EXCEPTION(std::logic_error("A clock is already in use as time source for mir::AsioMainLoop"));
74
timer_service_clock = clock;
76
mir::time::Timestamp now()
78
std::lock_guard<std::mutex> lock(timer_service_mutex);
79
auto clock = timer_service_clock.lock();
81
BOOST_THROW_EXCEPTION(std::logic_error("No clock available to create time stamp"));
85
std::mutex timer_service_mutex;
86
std::weak_ptr<mir::time::Clock const> timer_service_clock;
89
static TimerServiceClockStorage clock_storage;
91
static void set_clock(std::shared_ptr<mir::time::Clock const> const& clock)
93
clock_storage.set_clock(clock);
96
// time_traits interface required by boost::asio::deadline_timer{_service}
97
typedef mir::time::Timestamp time_type;
98
typedef std::chrono::milliseconds duration_type;
101
static time_type now()
103
return clock_storage.now();
106
static time_type add(const time_type& t, const duration_type& d)
111
static duration_type subtract(const time_type& t1, const time_type& t2)
113
return std::chrono::duration_cast<duration_type>(t1 - t2);
116
static bool less_than(const time_type& t1, const time_type& t2)
121
static boost::posix_time::time_duration to_posix_duration(
122
const duration_type& d)
124
return boost::posix_time::millisec(d.count());
128
MirClockTimerTraits::TimerServiceClockStorage MirClockTimerTraits::clock_storage;
130
typedef boost::asio::basic_deadline_timer<mir::time::Timestamp, MirClockTimerTraits> deadline_timer;
133
class mir::AsioMainLoop::SignalHandler
136
SignalHandler(boost::asio::io_service& io,
137
std::initializer_list<int> signals,
138
std::function<void(int)> const& handler)
142
for (auto sig : signals)
148
signal_set.async_wait(
149
std::bind(&SignalHandler::handle, this,
150
std::placeholders::_1, std::placeholders::_2));
154
void handle(boost::system::error_code err, int sig)
159
signal_set.async_wait(
160
std::bind(&SignalHandler::handle, this,
161
std::placeholders::_1, std::placeholders::_2));
165
boost::asio::signal_set signal_set;
166
std::function<void(int)> handler;
169
class mir::AsioMainLoop::FDHandler
172
FDHandler(boost::asio::io_service& io,
173
std::initializer_list<int> fds,
175
std::function<void(int)> const& handler)
176
: handler{handler}, owner{owner}
180
auto raw = new boost::asio::posix::stream_descriptor{io, fd};
181
auto s = std::unique_ptr<boost::asio::posix::stream_descriptor>(raw);
182
stream_descriptors.push_back(std::move(s));
188
for (auto & desc : stream_descriptors)
190
desc->release(); // release native handle which is not owned by main loop
194
bool is_owned_by(void const* possible_owner) const
196
return owner == possible_owner;
199
static void async_wait(std::shared_ptr<FDHandler> const& fd_handler, ServerActionQueue& queue)
201
for (auto const& s : fd_handler->stream_descriptors)
202
read_some(s.get(), fd_handler, queue);
206
static void read_some(boost::asio::posix::stream_descriptor* s, std::weak_ptr<FDHandler> const& possible_fd_handler, ServerActionQueue& queue)
209
boost::asio::null_buffers(),
210
[possible_fd_handler,s,&queue](boost::system::error_code err, size_t /*bytes*/)
215
// The actual execution of the fd handler is moved to the action queue.This allows us to synchronously
216
// unregister FDHandlers even when they are about to be executed. We do this because of the way Asio
217
// copies and executes pending completion handlers.
218
// In worst case during the call to unregister the FDHandler, it may still be executed, but not after
219
// the unregister call returned.
222
[possible_fd_handler, s, &queue]()
224
auto fd_handler = possible_fd_handler.lock();
228
fd_handler->handler(s->native_handle());
231
if (possible_fd_handler.lock())
232
read_some(s, possible_fd_handler, queue);
237
std::vector<std::unique_ptr<boost::asio::posix::stream_descriptor>> stream_descriptors;
238
std::function<void(int)> handler;
243
* We need to define the constructor and destructor in the .cpp file,
244
* so that we can use unique_ptr to hold SignalHandler. Otherwise, users
245
* of AsioMainLoop end up creating constructors and destructors that
246
* don't have complete type information for SignalHandler and fail
249
mir::AsioMainLoop::AsioMainLoop(std::shared_ptr<time::Clock> const& clock)
250
: work{io}, clock(clock)
252
MirClockTimerTraits::set_clock(clock);
255
mir::AsioMainLoop::~AsioMainLoop() noexcept(true)
259
void mir::AsioMainLoop::run()
261
main_loop_thread = std::this_thread::get_id();
265
void mir::AsioMainLoop::stop()
268
main_loop_thread.reset();
271
void mir::AsioMainLoop::register_signal_handler(
272
std::initializer_list<int> signals,
273
std::function<void(int)> const& handler)
277
auto sig_handler = std::unique_ptr<SignalHandler>{
278
new SignalHandler{io, signals, handler}};
280
sig_handler->async_wait();
282
signal_handlers.push_back(std::move(sig_handler));
285
void mir::AsioMainLoop::register_fd_handler(
286
std::initializer_list<int> fds,
288
std::function<void(int)> const& handler)
292
auto fd_handler = std::make_shared<FDHandler>(io, fds, owner, handler);
294
FDHandler::async_wait(fd_handler, *this);
296
std::lock_guard<std::mutex> lock(fd_handlers_mutex);
297
fd_handlers.push_back(std::move(fd_handler));
300
void mir::AsioMainLoop::unregister_fd_handler(void const* owner)
302
// synchronous_server_action makes sure that with the
303
// completion of the method unregister_fd_handler the
304
// handler will no longer be called.
305
// There is a chance for a fd handler callback to happen before
306
// the completion of this method.
307
synchronous_server_action(
312
std::lock_guard<std::mutex> lock(fd_handlers_mutex);
313
auto end_of_valid = remove_if(
316
[owner](std::shared_ptr<FDHandler> const& item)
318
return item->is_owned_by(owner);
320
fd_handlers.erase(end_of_valid, end(fd_handlers));
326
class AlarmImpl : public mir::time::Alarm
329
AlarmImpl(boost::asio::io_service& io,
330
std::chrono::milliseconds delay,
331
std::function<void(void)> callback);
333
AlarmImpl(boost::asio::io_service& io,
334
mir::time::Timestamp time_point,
335
std::function<void(void)> callback);
337
AlarmImpl(boost::asio::io_service& io,
338
std::function<void(void)> callback);
340
~AlarmImpl() noexcept override;
342
bool cancel() override;
343
State state() const override;
345
bool reschedule_in(std::chrono::milliseconds delay) override;
346
bool reschedule_for(mir::time::Timestamp time_point) override;
349
bool cancel_unlocked();
353
explicit InternalState(std::function<void(void)> callback)
354
: callback{callback}, state{pending}
359
int callbacks_running = 0;
360
std::condition_variable callback_done;
361
std::function<void(void)> const callback;
365
::deadline_timer timer;
366
std::shared_ptr<InternalState> const data;
367
std::function<void(boost::system::error_code const& ec)> const handler;
369
friend auto make_handler(std::weak_ptr<InternalState> possible_data)
370
-> std::function<void(boost::system::error_code const& ec)>;
373
auto make_handler(std::weak_ptr<AlarmImpl::InternalState> possible_data)
374
-> std::function<void(boost::system::error_code const& ec)>
376
// Awkwardly, we can't stop the async_wait handler from being called
377
// on a destroyed AlarmImpl. This means we need to wedge a weak_ptr
378
// into the async_wait callback.
379
return [possible_data](boost::system::error_code const& ec)
383
if (auto data = possible_data.lock())
385
std::unique_lock<decltype(data->m)> lock(data->m);
386
if (data->state == mir::time::Alarm::pending)
388
data->state = mir::time::Alarm::triggered;
389
++data->callbacks_running;
393
--data->callbacks_running;
394
data->callback_done.notify_all();
401
AlarmImpl::AlarmImpl(boost::asio::io_service& io,
402
std::chrono::milliseconds delay,
403
std::function<void ()> callback)
404
: AlarmImpl(io, callback)
406
reschedule_in(delay);
409
AlarmImpl::AlarmImpl(boost::asio::io_service& io,
410
mir::time::Timestamp time_point,
411
std::function<void ()> callback)
412
: AlarmImpl(io, callback)
414
reschedule_for(time_point);
417
AlarmImpl::AlarmImpl(boost::asio::io_service& io,
418
std::function<void(void)> callback)
420
data{std::make_shared<InternalState>(callback)},
421
handler{make_handler(data)}
423
data->state = triggered;
426
AlarmImpl::~AlarmImpl() noexcept
431
void AlarmImpl::stop()
433
std::unique_lock<decltype(data->m)> lock(data->m);
435
while (data->callbacks_running > 0)
436
data->callback_done.wait(lock);
441
bool AlarmImpl::cancel()
443
std::lock_guard<decltype(data->m)> lock(data->m);
444
return cancel_unlocked();
447
bool AlarmImpl::cancel_unlocked()
449
if (data->state == triggered)
452
data->state = cancelled;
457
mir::time::Alarm::State AlarmImpl::state() const
459
std::lock_guard<decltype(data->m)> lock(data->m);
464
bool AlarmImpl::reschedule_in(std::chrono::milliseconds delay)
466
bool cancelling = timer.expires_from_now(delay);
471
bool AlarmImpl::reschedule_for(mir::time::Timestamp time_point)
473
bool cancelling = timer.expires_at(time_point);
478
void AlarmImpl::update_timer()
480
std::lock_guard<decltype(data->m)> lock(data->m);
481
data->state = pending;
482
timer.async_wait(handler);
486
std::unique_ptr<mir::time::Alarm> mir::AsioMainLoop::notify_in(std::chrono::milliseconds delay,
487
std::function<void()> callback)
489
return std::unique_ptr<mir::time::Alarm>{new AlarmImpl{io, delay, callback}};
492
std::unique_ptr<mir::time::Alarm> mir::AsioMainLoop::notify_at(mir::time::Timestamp time_point,
493
std::function<void()> callback)
495
return std::unique_ptr<mir::time::Alarm>{new AlarmImpl{io, time_point, callback}};
498
std::unique_ptr<mir::time::Alarm> mir::AsioMainLoop::create_alarm(std::function<void()> callback)
500
return std::unique_ptr<mir::time::Alarm>{new AlarmImpl{io, callback}};
503
void mir::AsioMainLoop::enqueue(void const* owner, ServerAction const& action)
506
std::lock_guard<std::mutex> lock{server_actions_mutex};
507
server_actions.push_back({owner, action});
510
io.post([this] { process_server_actions(); });
513
void mir::AsioMainLoop::pause_processing_for(void const* owner)
515
std::lock_guard<std::mutex> lock{server_actions_mutex};
516
do_not_process.insert(owner);
519
void mir::AsioMainLoop::resume_processing_for(void const* owner)
522
std::lock_guard<std::mutex> lock{server_actions_mutex};
523
do_not_process.erase(owner);
526
io.post([this] { process_server_actions(); });
529
void mir::AsioMainLoop::process_server_actions()
531
std::unique_lock<std::mutex> lock{server_actions_mutex};
535
while (i < server_actions.size())
538
* It's safe to use references to elements, since std::deque<>
539
* guarantees that references remain valid after appends, which is
540
* the only operation that can be performed on server_actions outside
541
* this function (in AsioMainLoop::post()).
543
auto const& owner = server_actions[i].first;
544
auto const& action = server_actions[i].second;
546
if (do_not_process.find(owner) == do_not_process.end())
552
* This erase is always ok, since outside this function
553
* we only append to server_actions, i.e., our index i
554
* is guaranteed to remain valid and correct.
556
server_actions.erase(server_actions.begin() + i);