5
* Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6
* See COPYRIGHT.txt for details.
11
#include <sys/types.h>
12
#include <sys/socket.h>
13
#include <netinet/in.h>
16
#include <sys/resource.h>
18
#include "hstcpsvr.hpp"
19
#include "hstcpsvr_worker.hpp"
22
#include "auto_ptrcontainer.hpp"
28
struct worker_throbj {
29
worker_throbj(const hstcpsvr_worker_arg& arg)
30
: worker(hstcpsvr_worker_i::create(arg)) { }
34
hstcpsvr_worker_ptr worker;
37
struct hstcpsvr : public hstcpsvr_i, private noncopyable {
38
hstcpsvr(const config& c);
40
virtual std::string start_listen();
42
hstcpsvr_shared_c cshared;
43
volatile hstcpsvr_shared_v vshared;
44
typedef thread<worker_throbj> worker_thread_type;
45
typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
47
std::vector<unsigned int> thread_num_conns_vec;
55
check_nfile(size_t nfile)
57
struct rlimit rl = { };
58
const int r = getrlimit(RLIMIT_NOFILE, &rl);
60
fatal_abort("check_nfile: getrlimit failed");
62
if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
64
"[Warning] handlersocket: open_files_limit is too small.\n");
70
hstcpsvr::hstcpsvr(const config& c)
71
: cshared(), vshared()
74
cshared.conf = c; /* copy */
75
if (cshared.conf["port"] == "") {
76
cshared.conf["port"] = "9999";
78
cshared.num_threads = cshared.conf.get_int("num_threads", 32);
79
cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
80
cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
81
if (cshared.sockargs.use_epoll) {
82
cshared.sockargs.nonblocking = 1;
84
cshared.readsize = cshared.conf.get_int("readsize", 1);
85
cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
86
cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
87
cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
88
cshared.require_auth = !cshared.plain_secret.empty();
89
cshared.sockargs.set(cshared.conf);
90
cshared.dbptr = database_i::create(c);
91
check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
92
thread_num_conns_vec.resize(cshared.num_threads);
93
cshared.thread_num_conns = thread_num_conns_vec.empty()
94
? 0 : &thread_num_conns_vec[0];
103
hstcpsvr::start_listen()
106
if (threads.size() != 0) {
107
return "start_listen: already running";
109
if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
110
return "bind: " + err;
112
DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
113
const size_t stack_size = std::max(
114
cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
115
for (long i = 0; i < cshared.num_threads; ++i) {
116
hstcpsvr_worker_arg arg;
117
arg.cshared = &cshared;
118
arg.vshared = &vshared;
120
std::auto_ptr< thread<worker_throbj> > thr(
121
new thread<worker_throbj>(arg, stack_size));
122
threads.push_back_ptr(thr);
124
DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
125
for (size_t i = 0; i < threads.size(); ++i) {
128
DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
129
return std::string();
133
hstcpsvr::stop_workers()
135
vshared.shutdown = 1;
136
for (size_t i = 0; i < threads.size(); ++i) {
143
hstcpsvr_i::create(const config& conf)
145
return hstcpsvr_ptr(new hstcpsvr(conf));