~ubuntu-branches/debian/sid/pion/sid

« back to all changes in this revision

Viewing changes to src/scheduler.cpp

  • Committer: Package Import Robot
  • Author(s): Roberto C. Sanchez
  • Date: 2013-07-06 18:04:35 UTC
  • Revision ID: package-import@ubuntu.com-20130706180435-kejjzc1qpyz3qv6c
Tags: upstream-5.0.1+dfsg
ImportĀ upstreamĀ versionĀ 5.0.1+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// ---------------------------------------------------------------------
 
2
// pion:  a Boost C++ framework for building lightweight HTTP interfaces
 
3
// ---------------------------------------------------------------------
 
4
// Copyright (C) 2007-2012 Cloudmeter, Inc.  (http://www.cloudmeter.com)
 
5
//
 
6
// Distributed under the Boost Software License, Version 1.0.
 
7
// See http://www.boost.org/LICENSE_1_0.txt
 
8
//
 
9
 
 
10
#include <boost/exception/diagnostic_information.hpp>
 
11
#include <boost/date_time/posix_time/posix_time_duration.hpp>
 
12
#include <pion/scheduler.hpp>
 
13
 
 
14
namespace pion {    // begin namespace pion
 
15
 
 
16
 
 
17
// static members of scheduler
 
18
    
 
19
const boost::uint32_t   scheduler::DEFAULT_NUM_THREADS = 8;
 
20
const boost::uint32_t   scheduler::NSEC_IN_SECOND = 1000000000; // (10^9)
 
21
const boost::uint32_t   scheduler::MICROSEC_IN_SECOND = 1000000;    // (10^6)
 
22
const boost::uint32_t   scheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
 
23
 
 
24
 
 
25
// scheduler member functions
 
26
 
 
27
void scheduler::shutdown(void)
 
28
{
 
29
    // lock mutex for thread safety
 
30
    boost::mutex::scoped_lock scheduler_lock(m_mutex);
 
31
    
 
32
    if (m_is_running) {
 
33
        
 
34
        PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
 
35
        
 
36
        while (m_active_users > 0) {
 
37
            // first, wait for any active users to exit
 
38
            PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
 
39
            m_no_more_active_users.wait(scheduler_lock);
 
40
        }
 
41
 
 
42
        // shut everything down
 
43
        m_is_running = false;
 
44
        stop_services();
 
45
        stop_threads();
 
46
        finish_services();
 
47
        finish_threads();
 
48
        
 
49
        PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
 
50
 
 
51
        // Make sure anyone waiting on shutdown gets notified
 
52
        m_scheduler_has_stopped.notify_all();
 
53
        
 
54
    } else {
 
55
        
 
56
        // stop and finish everything to be certain that no events are pending
 
57
        stop_services();
 
58
        stop_threads();
 
59
        finish_services();
 
60
        finish_threads();
 
61
        
 
62
        // Make sure anyone waiting on shutdown gets notified
 
63
        // even if the scheduler did not startup successfully
 
64
        m_scheduler_has_stopped.notify_all();
 
65
    }
 
66
}
 
67
 
 
68
void scheduler::join(void)
 
69
{
 
70
    boost::mutex::scoped_lock scheduler_lock(m_mutex);
 
71
    while (m_is_running) {
 
72
        // sleep until scheduler_has_stopped condition is signaled
 
73
        m_scheduler_has_stopped.wait(scheduler_lock);
 
74
    }
 
75
}
 
76
    
 
77
void scheduler::keep_running(boost::asio::io_service& my_service,
 
78
                                boost::asio::deadline_timer& my_timer)
 
79
{
 
80
    if (m_is_running) {
 
81
        // schedule this again to make sure the service doesn't complete
 
82
        my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
 
83
        my_timer.async_wait(boost::bind(&scheduler::keep_running, this,
 
84
                                        boost::ref(my_service), boost::ref(my_timer)));
 
85
    }
 
86
}
 
87
 
 
88
void scheduler::add_active_user(void)
 
89
{
 
90
    if (!m_is_running) startup();
 
91
    boost::mutex::scoped_lock scheduler_lock(m_mutex);
 
92
    ++m_active_users;
 
93
}
 
94
 
 
95
void scheduler::remove_active_user(void)
 
96
{
 
97
    boost::mutex::scoped_lock scheduler_lock(m_mutex);
 
98
    if (--m_active_users == 0)
 
99
        m_no_more_active_users.notify_all();
 
100
}
 
101
 
 
102
boost::system_time scheduler::get_wakeup_time(boost::uint32_t sleep_sec,
 
103
    boost::uint32_t sleep_nsec)
 
104
{
 
105
    return boost::get_system_time() + boost::posix_time::seconds(sleep_sec) + boost::posix_time::microseconds(sleep_nsec / 1000);
 
106
}
 
107
                     
 
108
void scheduler::process_service_work(boost::asio::io_service& service) {
 
109
    while (m_is_running) {
 
110
        try {
 
111
            service.run();
 
112
        } catch (std::exception& e) {
 
113
            PION_LOG_ERROR(m_logger, boost::diagnostic_information(e));
 
114
        } catch (...) {
 
115
            PION_LOG_ERROR(m_logger, "caught unrecognized exception");
 
116
        }
 
117
    }   
 
118
}
 
119
                     
 
120
 
 
121
// single_service_scheduler member functions
 
122
 
 
123
void single_service_scheduler::startup(void)
 
124
{
 
125
    // lock mutex for thread safety
 
126
    boost::mutex::scoped_lock scheduler_lock(m_mutex);
 
127
    
 
128
    if (! m_is_running) {
 
129
        PION_LOG_INFO(m_logger, "Starting thread scheduler");
 
130
        m_is_running = true;
 
131
        
 
132
        // schedule a work item to make sure that the service doesn't complete
 
133
        m_service.reset();
 
134
        keep_running(m_service, m_timer);
 
135
        
 
136
        // start multiple threads to handle async tasks
 
137
        for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
 
138
            boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
 
139
                                                                                       this, boost::ref(m_service)) ));
 
140
            m_thread_pool.push_back(new_thread);
 
141
        }
 
142
    }
 
143
}
 
144
 
 
145
    
 
146
// one_to_one_scheduler member functions
 
147
 
 
148
void one_to_one_scheduler::startup(void)
 
149
{
 
150
    // lock mutex for thread safety
 
151
    boost::mutex::scoped_lock scheduler_lock(m_mutex);
 
152
    
 
153
    if (! m_is_running) {
 
154
        PION_LOG_INFO(m_logger, "Starting thread scheduler");
 
155
        m_is_running = true;
 
156
        
 
157
        // make sure there are enough services initialized
 
158
        while (m_service_pool.size() < m_num_threads) {
 
159
            boost::shared_ptr<service_pair_type>  service_ptr(new service_pair_type());
 
160
            m_service_pool.push_back(service_ptr);
 
161
        }
 
162
 
 
163
        // schedule a work item for each service to make sure that it doesn't complete
 
164
        for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
 
165
            keep_running((*i)->first, (*i)->second);
 
166
        }
 
167
        
 
168
        // start multiple threads to handle async tasks
 
169
        for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
 
170
            boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
 
171
                                                                                       this, boost::ref(m_service_pool[n]->first)) ));
 
172
            m_thread_pool.push_back(new_thread);
 
173
        }
 
174
    }
 
175
}
 
176
 
 
177
    
 
178
}   // end namespace pion