1
// ---------------------------------------------------------------------
2
// pion: a Boost C++ framework for building lightweight HTTP interfaces
3
// ---------------------------------------------------------------------
4
// Copyright (C) 2007-2012 Cloudmeter, Inc. (http://www.cloudmeter.com)
6
// Distributed under the Boost Software License, Version 1.0.
7
// See http://www.boost.org/LICENSE_1_0.txt
10
#include <boost/exception/diagnostic_information.hpp>
11
#include <boost/date_time/posix_time/posix_time_duration.hpp>
12
#include <pion/scheduler.hpp>
14
namespace pion { // begin namespace pion
17
// static members of scheduler
19
const boost::uint32_t scheduler::DEFAULT_NUM_THREADS = 8;
20
const boost::uint32_t scheduler::NSEC_IN_SECOND = 1000000000; // (10^9)
21
const boost::uint32_t scheduler::MICROSEC_IN_SECOND = 1000000; // (10^6)
22
const boost::uint32_t scheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
25
// scheduler member functions
27
void scheduler::shutdown(void)
29
// lock mutex for thread safety
30
boost::mutex::scoped_lock scheduler_lock(m_mutex);
34
PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
36
while (m_active_users > 0) {
37
// first, wait for any active users to exit
38
PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
39
m_no_more_active_users.wait(scheduler_lock);
42
// shut everything down
49
PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
51
// Make sure anyone waiting on shutdown gets notified
52
m_scheduler_has_stopped.notify_all();
56
// stop and finish everything to be certain that no events are pending
62
// Make sure anyone waiting on shutdown gets notified
63
// even if the scheduler did not startup successfully
64
m_scheduler_has_stopped.notify_all();
68
void scheduler::join(void)
70
boost::mutex::scoped_lock scheduler_lock(m_mutex);
71
while (m_is_running) {
72
// sleep until scheduler_has_stopped condition is signaled
73
m_scheduler_has_stopped.wait(scheduler_lock);
77
void scheduler::keep_running(boost::asio::io_service& my_service,
78
boost::asio::deadline_timer& my_timer)
81
// schedule this again to make sure the service doesn't complete
82
my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
83
my_timer.async_wait(boost::bind(&scheduler::keep_running, this,
84
boost::ref(my_service), boost::ref(my_timer)));
88
void scheduler::add_active_user(void)
90
if (!m_is_running) startup();
91
boost::mutex::scoped_lock scheduler_lock(m_mutex);
95
void scheduler::remove_active_user(void)
97
boost::mutex::scoped_lock scheduler_lock(m_mutex);
98
if (--m_active_users == 0)
99
m_no_more_active_users.notify_all();
102
boost::system_time scheduler::get_wakeup_time(boost::uint32_t sleep_sec,
103
boost::uint32_t sleep_nsec)
105
return boost::get_system_time() + boost::posix_time::seconds(sleep_sec) + boost::posix_time::microseconds(sleep_nsec / 1000);
108
void scheduler::process_service_work(boost::asio::io_service& service) {
109
while (m_is_running) {
112
} catch (std::exception& e) {
113
PION_LOG_ERROR(m_logger, boost::diagnostic_information(e));
115
PION_LOG_ERROR(m_logger, "caught unrecognized exception");
121
// single_service_scheduler member functions
123
void single_service_scheduler::startup(void)
125
// lock mutex for thread safety
126
boost::mutex::scoped_lock scheduler_lock(m_mutex);
128
if (! m_is_running) {
129
PION_LOG_INFO(m_logger, "Starting thread scheduler");
132
// schedule a work item to make sure that the service doesn't complete
134
keep_running(m_service, m_timer);
136
// start multiple threads to handle async tasks
137
for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
138
boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
139
this, boost::ref(m_service)) ));
140
m_thread_pool.push_back(new_thread);
146
// one_to_one_scheduler member functions
148
void one_to_one_scheduler::startup(void)
150
// lock mutex for thread safety
151
boost::mutex::scoped_lock scheduler_lock(m_mutex);
153
if (! m_is_running) {
154
PION_LOG_INFO(m_logger, "Starting thread scheduler");
157
// make sure there are enough services initialized
158
while (m_service_pool.size() < m_num_threads) {
159
boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type());
160
m_service_pool.push_back(service_ptr);
163
// schedule a work item for each service to make sure that it doesn't complete
164
for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
165
keep_running((*i)->first, (*i)->second);
168
// start multiple threads to handle async tasks
169
for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
170
boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
171
this, boost::ref(m_service_pool[n]->first)) ));
172
m_thread_pool.push_back(new_thread);
178
} // end namespace pion