~percona-dev/percona-server/release-5.5.11-20.2-fix-bug-764138

« back to all changes in this revision

Viewing changes to HandlerSocket-Plugin-for-MySQL/handlersocket/hstcpsvr.cpp

  • Committer: Ignacio Nin
  • Date: 2011-03-13 17:18:23 UTC
  • mfrom: (33.3.17 release-5.5.8-20)
  • Revision ID: ignacio.nin@percona.com-20110313171823-m06xs104nekulywb
Merge changes from release-5.5.8-20 to 5.5.9

Merge changes from the release branch of 5.5.8 to 5.5.9. These include
the HandlerSocket and UDF directories and the building scripts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
// vim:sw=2:ai
 
3
 
 
4
/*
 
5
 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
 
6
 * See COPYRIGHT.txt for details.
 
7
 */
 
8
 
 
9
#include <stdlib.h>
 
10
#include <vector>
 
11
#include <sys/types.h>
 
12
#include <sys/socket.h>
 
13
#include <netinet/in.h>
 
14
#include <unistd.h>
 
15
#include <fcntl.h>
 
16
#include <sys/resource.h>
 
17
 
 
18
#include "hstcpsvr.hpp"
 
19
#include "hstcpsvr_worker.hpp"
 
20
#include "thread.hpp"
 
21
#include "fatal.hpp"
 
22
#include "auto_ptrcontainer.hpp"
 
23
 
 
24
#define DBG(x)
 
25
 
 
26
namespace dena {
 
27
 
 
28
struct worker_throbj {
 
29
  worker_throbj(const hstcpsvr_worker_arg& arg)
 
30
    : worker(hstcpsvr_worker_i::create(arg)) { }
 
31
  void operator ()() {
 
32
    worker->run();
 
33
  }
 
34
  hstcpsvr_worker_ptr worker;
 
35
};
 
36
 
 
37
struct hstcpsvr : public hstcpsvr_i, private noncopyable {
 
38
  hstcpsvr(const config& c);
 
39
  ~hstcpsvr();
 
40
  virtual std::string start_listen();
 
41
 private:
 
42
  hstcpsvr_shared_c cshared;
 
43
  volatile hstcpsvr_shared_v vshared;
 
44
  typedef thread<worker_throbj> worker_thread_type;
 
45
  typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
 
46
  threads_type threads;
 
47
  std::vector<unsigned int> thread_num_conns_vec;
 
48
 private:
 
49
  void stop_workers();
 
50
};
 
51
 
 
52
namespace {
 
53
 
 
54
void
 
55
check_nfile(size_t nfile)
 
56
{
 
57
  struct rlimit rl = { };
 
58
  const int r = getrlimit(RLIMIT_NOFILE, &rl);
 
59
  if (r != 0) {
 
60
    fatal_abort("check_nfile: getrlimit failed");
 
61
  }
 
62
  if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
 
63
    fprintf(stderr,
 
64
      "[Warning] handlersocket: open_files_limit is too small.\n");
 
65
  }
 
66
}
 
67
 
 
68
};
 
69
 
 
70
hstcpsvr::hstcpsvr(const config& c)
 
71
  : cshared(), vshared()
 
72
{
 
73
  vshared.shutdown = 0;
 
74
  cshared.conf = c; /* copy */
 
75
  if (cshared.conf["port"] == "") {
 
76
    cshared.conf["port"] = "9999";
 
77
  }
 
78
  cshared.num_threads = cshared.conf.get_int("num_threads", 32);
 
79
  cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
 
80
  cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
 
81
  if (cshared.sockargs.use_epoll) {
 
82
    cshared.sockargs.nonblocking = 1;
 
83
  }
 
84
  cshared.readsize = cshared.conf.get_int("readsize", 1);
 
85
  cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
 
86
  cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
 
87
  cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
 
88
  cshared.require_auth = !cshared.plain_secret.empty();
 
89
  cshared.sockargs.set(cshared.conf);
 
90
  cshared.dbptr = database_i::create(c);
 
91
  check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
 
92
  thread_num_conns_vec.resize(cshared.num_threads);
 
93
  cshared.thread_num_conns = thread_num_conns_vec.empty()
 
94
    ? 0 : &thread_num_conns_vec[0];
 
95
}
 
96
 
 
97
hstcpsvr::~hstcpsvr()
 
98
{
 
99
  stop_workers();
 
100
}
 
101
 
 
102
std::string
 
103
hstcpsvr::start_listen()
 
104
{
 
105
  std::string err;
 
106
  if (threads.size() != 0) {
 
107
    return "start_listen: already running";
 
108
  }
 
109
  if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
 
110
    return "bind: " + err;
 
111
  }
 
112
  DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
 
113
  const size_t stack_size = std::max(
 
114
    cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
 
115
  for (long i = 0; i < cshared.num_threads; ++i) {
 
116
    hstcpsvr_worker_arg arg;
 
117
    arg.cshared = &cshared;
 
118
    arg.vshared = &vshared;
 
119
    arg.worker_id = i;
 
120
    std::auto_ptr< thread<worker_throbj> > thr(
 
121
      new thread<worker_throbj>(arg, stack_size));
 
122
    threads.push_back_ptr(thr);
 
123
  }
 
124
  DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
 
125
  for (size_t i = 0; i < threads.size(); ++i) {
 
126
    threads[i]->start();
 
127
  }
 
128
  DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
 
129
  return std::string();
 
130
}
 
131
 
 
132
void
 
133
hstcpsvr::stop_workers()
 
134
{
 
135
  vshared.shutdown = 1;
 
136
  for (size_t i = 0; i < threads.size(); ++i) {
 
137
    threads[i]->join();
 
138
  }
 
139
  threads.clear();
 
140
}
 
141
 
 
142
hstcpsvr_ptr
 
143
hstcpsvr_i::create(const config& conf)
 
144
{
 
145
  return hstcpsvr_ptr(new hstcpsvr(conf));
 
146
}
 
147
 
 
148
};
 
149