~percona-dev/percona-server/release-5.5.11-20.2-fix-bug-764138

« back to all changes in this revision

Viewing changes to HandlerSocket-Plugin-for-MySQL/handlersocket/hstcpsvr_worker.cpp

  • Committer: Ignacio Nin
  • Date: 2011-03-13 17:18:23 UTC
  • mfrom: (33.3.17 release-5.5.8-20)
  • Revision ID: ignacio.nin@percona.com-20110313171823-m06xs104nekulywb
Merge changes from release-5.5.8-20 to 5.5.9

Merge changes from the release branch of 5.5.8 to 5.5.9. These include
the HandlerSocket and UDF directories and the building scripts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
// vim:sw=2:ai
 
3
 
 
4
/*
 
5
 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
 
6
 * See COPYRIGHT.txt for details.
 
7
 */
 
8
 
 
9
#include <netinet/in.h>
 
10
#include <errno.h>
 
11
#include <poll.h>
 
12
#include <unistd.h>
 
13
#include <fcntl.h>
 
14
#include <stdexcept>
 
15
#include <signal.h>
 
16
#include <list>
 
17
#if __linux__
 
18
#include <sys/epoll.h>
 
19
#endif
 
20
 
 
21
#include "hstcpsvr_worker.hpp"
 
22
#include "string_buffer.hpp"
 
23
#include "auto_ptrcontainer.hpp"
 
24
#include "string_util.hpp"
 
25
#include "escape.hpp"
 
26
 
 
27
#define DBG_FD(x)
 
28
#define DBG_TR(x)
 
29
#define DBG_EP(x)
 
30
 
 
31
/* TODO */
 
32
#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
 
33
#define MSG_NOSIGNAL 0
 
34
#endif
 
35
 
 
36
namespace dena {
 
37
 
 
38
struct dbconnstate {
 
39
  string_buffer readbuf;
 
40
  string_buffer writebuf;
 
41
  std::vector<prep_stmt> prep_stmts;
 
42
  size_t resp_begin_pos;
 
43
  void reset() {
 
44
    readbuf.clear();
 
45
    writebuf.clear();
 
46
    prep_stmts.clear();
 
47
    resp_begin_pos = 0;
 
48
  }
 
49
  dbconnstate() : resp_begin_pos(0) { }
 
50
};
 
51
 
 
52
struct hstcpsvr_conn;
 
53
typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
 
54
 
 
55
struct hstcpsvr_conn : public dbcallback_i {
 
56
 public:
 
57
  auto_file fd;
 
58
  sockaddr_storage addr;
 
59
  socklen_t addr_len;
 
60
  dbconnstate cstate;
 
61
  std::string err;
 
62
  size_t readsize;
 
63
  bool nonblocking;
 
64
  bool read_finished;
 
65
  bool write_finished;
 
66
  time_t nb_last_io;
 
67
  hstcpsvr_conns_type::iterator conns_iter;
 
68
  bool authorized;
 
69
 public:
 
70
  bool closed() const;
 
71
  bool ok_to_close() const;
 
72
  void reset();
 
73
  int accept(const hstcpsvr_shared_c& cshared);
 
74
  bool write_more(bool *more_r = 0);
 
75
  bool read_more(bool *more_r = 0);
 
76
 public:
 
77
  virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
 
78
  virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
 
79
  virtual void dbcb_resp_short(uint32_t code, const char *msg);
 
80
  virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
 
81
  virtual void dbcb_resp_begin(size_t num_flds);
 
82
  virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
 
83
  virtual void dbcb_resp_end();
 
84
  virtual void dbcb_resp_cancel();
 
85
 public:
 
86
  hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
 
87
    nonblocking(false), read_finished(false), write_finished(false),
 
88
    nb_last_io(0), authorized(false) { }
 
89
};
 
90
 
 
91
bool
 
92
hstcpsvr_conn::closed() const
 
93
{
 
94
  return fd.get() < 0;
 
95
}
 
96
 
 
97
bool
 
98
hstcpsvr_conn::ok_to_close() const
 
99
{
 
100
  return write_finished || (read_finished && cstate.writebuf.size() == 0);
 
101
}
 
102
 
 
103
void
 
104
hstcpsvr_conn::reset()
 
105
{
 
106
  addr = sockaddr_storage();
 
107
  addr_len = sizeof(addr);
 
108
  cstate.reset();
 
109
  fd.reset();
 
110
  read_finished = false;
 
111
  write_finished = false;
 
112
}
 
113
 
 
114
int
 
115
hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
 
116
{
 
117
  reset();
 
118
  return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
 
119
    addr_len, err);
 
120
}
 
121
 
 
122
bool
 
123
hstcpsvr_conn::write_more(bool *more_r)
 
124
{
 
125
  if (write_finished || cstate.writebuf.size() == 0) {
 
126
    return false;
 
127
  }
 
128
  const size_t wlen = cstate.writebuf.size();
 
129
  ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
 
130
  if (len <= 0) {
 
131
    if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
 
132
      cstate.writebuf.clear();
 
133
      write_finished = true;
 
134
    }
 
135
    return false;
 
136
  }
 
137
  cstate.writebuf.erase_front(len);
 
138
    /* FIXME: reallocate memory if too large */
 
139
  if (more_r) {
 
140
    *more_r = (static_cast<size_t>(len) == wlen);
 
141
  }
 
142
  return true;
 
143
}
 
144
 
 
145
bool
 
146
hstcpsvr_conn::read_more(bool *more_r)
 
147
{
 
148
  if (read_finished) {
 
149
    return false;
 
150
  }
 
151
  const size_t block_size = readsize > 4096 ? readsize : 4096;
 
152
  char *wp = cstate.readbuf.make_space(block_size);
 
153
  const ssize_t len = read(fd.get(), wp, block_size);
 
154
  if (len <= 0) {
 
155
    if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
 
156
      read_finished = true;
 
157
    }
 
158
    return false;
 
159
  }
 
160
  cstate.readbuf.space_wrote(len);
 
161
  if (more_r) {
 
162
    *more_r = (static_cast<size_t>(len) == block_size);
 
163
  }
 
164
  return true;
 
165
}
 
166
 
 
167
void
 
168
hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
 
169
{
 
170
  if (cstate.prep_stmts.size() <= pst_id) {
 
171
    cstate.prep_stmts.resize(pst_id + 1);
 
172
  }
 
173
  cstate.prep_stmts[pst_id] = v;
 
174
}
 
175
 
 
176
const prep_stmt *
 
177
hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
 
178
{
 
179
  if (cstate.prep_stmts.size() <= pst_id) {
 
180
    return 0;
 
181
  }
 
182
  return &cstate.prep_stmts[pst_id];
 
183
}
 
184
 
 
185
void
 
186
hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
 
187
{
 
188
  write_ui32(cstate.writebuf, code);
 
189
  const size_t msglen = strlen(msg);
 
190
  if (msglen != 0) {
 
191
    cstate.writebuf.append_literal("\t1\t");
 
192
    cstate.writebuf.append(msg, msg + msglen);
 
193
  } else {
 
194
    cstate.writebuf.append_literal("\t1");
 
195
  }
 
196
  cstate.writebuf.append_literal("\n");
 
197
}
 
198
 
 
199
void
 
200
hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
 
201
{
 
202
  write_ui32(cstate.writebuf, code);
 
203
  cstate.writebuf.append_literal("\t1\t");
 
204
  write_ui32(cstate.writebuf, value);
 
205
  cstate.writebuf.append_literal("\n");
 
206
}
 
207
 
 
208
void
 
209
hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
 
210
{
 
211
  cstate.resp_begin_pos = cstate.writebuf.size();
 
212
  cstate.writebuf.append_literal("0\t");
 
213
  write_ui32(cstate.writebuf, num_flds);
 
214
}
 
215
 
 
216
void
 
217
hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
 
218
{
 
219
  if (fld != 0) {
 
220
    cstate.writebuf.append_literal("\t");
 
221
    escape_string(cstate.writebuf, fld, fld + fldlen);
 
222
  } else {
 
223
    static const char t[] = "\t\0";
 
224
    cstate.writebuf.append(t, t + 2);
 
225
  }
 
226
}
 
227
 
 
228
void
 
229
hstcpsvr_conn::dbcb_resp_end()
 
230
{
 
231
  cstate.writebuf.append_literal("\n");
 
232
  cstate.resp_begin_pos = 0;
 
233
}
 
234
 
 
235
void
 
236
hstcpsvr_conn::dbcb_resp_cancel()
 
237
{
 
238
  cstate.writebuf.resize(cstate.resp_begin_pos);
 
239
  cstate.resp_begin_pos = 0;
 
240
}
 
241
 
 
242
struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
 
243
  hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
 
244
  virtual void run();
 
245
 private:
 
246
  const hstcpsvr_shared_c& cshared;
 
247
  volatile hstcpsvr_shared_v& vshared;
 
248
  long worker_id;
 
249
  dbcontext_ptr dbctx;
 
250
  hstcpsvr_conns_type conns; /* conns refs dbctx */
 
251
  time_t last_check_time;
 
252
  std::vector<pollfd> pfds;
 
253
  #ifdef __linux__
 
254
  std::vector<epoll_event> events_vec;
 
255
  auto_file epoll_fd;
 
256
  #endif
 
257
  bool accept_enabled;
 
258
  int accept_balance;
 
259
  std::vector<record_filter> filters_work;
 
260
 private:
 
261
  int run_one_nb();
 
262
  int run_one_ep();
 
263
  void execute_lines(hstcpsvr_conn& conn);
 
264
  void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
 
265
  void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
 
266
  void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
 
267
    char *finish, hstcpsvr_conn& conn);
 
268
  void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
 
269
};
 
270
 
 
271
hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
 
272
  : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
 
273
    dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
 
274
    last_check_time(time(0)), accept_enabled(true), accept_balance(0)
 
275
{
 
276
  #ifdef __linux__
 
277
  if (cshared.sockargs.use_epoll) {
 
278
    epoll_fd.reset(epoll_create(10));
 
279
    if (epoll_fd.get() < 0) {
 
280
      fatal_abort("epoll_create");
 
281
    }
 
282
    epoll_event ev = { };
 
283
    ev.events = EPOLLIN;
 
284
    ev.data.ptr = 0;
 
285
    if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
 
286
      != 0) {
 
287
      fatal_abort("epoll_ctl EPOLL_CTL_ADD");
 
288
    }
 
289
    events_vec.resize(10240);
 
290
  }
 
291
  #endif
 
292
  accept_balance = cshared.conf.get_int("accept_balance", 0);
 
293
}
 
294
 
 
295
namespace {
 
296
 
 
297
struct thr_init {
 
298
  thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
 
299
    dbctx->init_thread(this, shutdown_flag);
 
300
  }
 
301
  ~thr_init() {
 
302
    dbctx->term_thread();
 
303
  }
 
304
  const dbcontext_ptr& dbctx;
 
305
};
 
306
 
 
307
}; // namespace
 
308
 
 
309
void
 
310
hstcpsvr_worker::run()
 
311
{
 
312
  thr_init initobj(dbctx, vshared.shutdown);
 
313
 
 
314
  #ifdef __linux__
 
315
  if (cshared.sockargs.use_epoll) {
 
316
    while (!vshared.shutdown && dbctx->check_alive()) {
 
317
      run_one_ep();
 
318
    }
 
319
  } else if (cshared.sockargs.nonblocking) {
 
320
    while (!vshared.shutdown && dbctx->check_alive()) {
 
321
      run_one_nb();
 
322
    }
 
323
  } else {
 
324
    /* UNUSED */
 
325
    fatal_abort("run_one");
 
326
  }
 
327
  #else
 
328
  while (!vshared.shutdown && dbctx->check_alive()) {
 
329
    run_one_nb();
 
330
  }
 
331
  #endif
 
332
}
 
333
 
 
334
int
 
335
hstcpsvr_worker::run_one_nb()
 
336
{
 
337
  size_t nfds = 0;
 
338
  /* CLIENT SOCKETS */
 
339
  for (hstcpsvr_conns_type::const_iterator i = conns.begin();
 
340
    i != conns.end(); ++i) {
 
341
    if (pfds.size() <= nfds) {
 
342
      pfds.resize(nfds + 1);
 
343
    }
 
344
    pollfd& pfd = pfds[nfds++];
 
345
    pfd.fd = (*i)->fd.get();
 
346
    short ev = 0;
 
347
    if ((*i)->cstate.writebuf.size() != 0) {
 
348
      ev = POLLOUT;
 
349
    } else {
 
350
      ev = POLLIN;
 
351
    }
 
352
    pfd.events = pfd.revents = ev;
 
353
  }
 
354
  /* LISTENER */
 
355
  {
 
356
    const size_t cpt = cshared.nb_conn_per_thread;
 
357
    const short ev = (cpt > nfds) ? POLLIN : 0;
 
358
    if (pfds.size() <= nfds) {
 
359
      pfds.resize(nfds + 1);
 
360
    }
 
361
    pollfd& pfd = pfds[nfds++];
 
362
    pfd.fd = cshared.listen_fd.get();
 
363
    pfd.events = pfd.revents = ev;
 
364
  }
 
365
  /* POLL */
 
366
  const int npollev = poll(&pfds[0], nfds, 1 * 1000);
 
367
  dbctx->set_statistics(conns.size(), npollev);
 
368
  const time_t now = time(0);
 
369
  size_t j = 0;
 
370
  const short mask_in = ~POLLOUT;
 
371
  const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
 
372
  /* READ */
 
373
  for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
 
374
    ++i, ++j) {
 
375
    pollfd& pfd = pfds[j];
 
376
    if ((pfd.revents & mask_in) == 0) {
 
377
      continue;
 
378
    }
 
379
    hstcpsvr_conn& conn = **i;
 
380
    if (conn.read_more()) {
 
381
      if (conn.cstate.readbuf.size() > 0) {
 
382
        const char ch = conn.cstate.readbuf.begin()[0];
 
383
        if (ch == 'Q') {
 
384
          vshared.shutdown = 1;
 
385
        } else if (ch == '/') {
 
386
          conn.cstate.readbuf.clear();
 
387
          conn.cstate.writebuf.clear();
 
388
          conn.read_finished = true;
 
389
          conn.write_finished = true;
 
390
        }
 
391
      }
 
392
      conn.nb_last_io = now;
 
393
    }
 
394
  }
 
395
  /* EXECUTE */
 
396
  j = 0;
 
397
  for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
 
398
    ++i, ++j) {
 
399
    pollfd& pfd = pfds[j];
 
400
    if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
 
401
      continue;
 
402
    }
 
403
    execute_lines(**i);
 
404
  }
 
405
  /* COMMIT */
 
406
  dbctx->unlock_tables_if();
 
407
  const bool commit_error = dbctx->get_commit_error();
 
408
  dbctx->clear_error();
 
409
  /* WRITE/CLOSE */
 
410
  j = 0;
 
411
  for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
 
412
    ++j) {
 
413
    pollfd& pfd = pfds[j];
 
414
    hstcpsvr_conn& conn = **i;
 
415
    hstcpsvr_conns_type::iterator icur = i;
 
416
    ++i;
 
417
    if (commit_error) {
 
418
      conn.reset();
 
419
      continue;
 
420
    }
 
421
    if ((pfd.revents & (mask_out | mask_in)) != 0) {
 
422
      if (conn.write_more()) {
 
423
        conn.nb_last_io = now;
 
424
      }
 
425
    }
 
426
    if (cshared.sockargs.timeout != 0 &&
 
427
      conn.nb_last_io + cshared.sockargs.timeout < now) {
 
428
      conn.reset();
 
429
    }
 
430
    if (conn.closed() || conn.ok_to_close()) {
 
431
      conns.erase_ptr(icur);
 
432
    }
 
433
  }
 
434
  /* ACCEPT */
 
435
  {
 
436
    pollfd& pfd = pfds[nfds - 1];
 
437
    if ((pfd.revents & mask_in) != 0) {
 
438
      std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
 
439
      c->nonblocking = true;
 
440
      c->readsize = cshared.readsize;
 
441
      c->accept(cshared);
 
442
      if (c->fd.get() >= 0) {
 
443
        if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
 
444
          fatal_abort("F_SETFL O_NONBLOCK");
 
445
        }
 
446
        c->nb_last_io = now;
 
447
        conns.push_back_ptr(c);
 
448
      } else {
 
449
        /* errno == 11 (EAGAIN) is not a fatal error. */
 
450
        DENA_VERBOSE(100, fprintf(stderr,
 
451
          "accept failed: errno=%d (not fatal)\n", errno));
 
452
      }
 
453
    }
 
454
  }
 
455
  DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
 
456
    conns.size()));
 
457
  if (conns.empty()) {
 
458
    dbctx->close_tables_if();
 
459
  }
 
460
  dbctx->set_statistics(conns.size(), 0);
 
461
  return 0;
 
462
}
 
463
 
 
464
#ifdef __linux__
 
465
int
 
466
hstcpsvr_worker::run_one_ep()
 
467
{
 
468
  epoll_event *const events = &events_vec[0];
 
469
  const size_t num_events = events_vec.size();
 
470
  const time_t now = time(0);
 
471
  size_t in_count = 0, out_count = 0, accept_count = 0;
 
472
  int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
 
473
  /* READ/ACCEPT */
 
474
  dbctx->set_statistics(conns.size(), nfds);
 
475
  for (int i = 0; i < nfds; ++i) {
 
476
    epoll_event& ev = events[i];
 
477
    if ((ev.events & EPOLLIN) == 0) {
 
478
      continue;
 
479
    }
 
480
    hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
 
481
    if (conn == 0) {
 
482
      /* listener */
 
483
      ++accept_count;
 
484
      DBG_EP(fprintf(stderr, "IN listener\n"));
 
485
      std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
 
486
      c->nonblocking = true;
 
487
      c->readsize = cshared.readsize;
 
488
      c->accept(cshared);
 
489
      if (c->fd.get() >= 0) {
 
490
        if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
 
491
          fatal_abort("F_SETFL O_NONBLOCK");
 
492
        }
 
493
        epoll_event cev = { };
 
494
        cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
 
495
        cev.data.ptr = c.get();
 
496
        c->nb_last_io = now;
 
497
        const int fd = c->fd.get();
 
498
        conns.push_back_ptr(c);
 
499
        conns.back()->conns_iter = --conns.end();
 
500
        if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
 
501
          fatal_abort("epoll_ctl EPOLL_CTL_ADD");
 
502
        }
 
503
      } else {
 
504
        DENA_VERBOSE(100, fprintf(stderr,
 
505
          "accept failed: errno=%d (not fatal)\n", errno));
 
506
      }
 
507
    } else {
 
508
      /* client connection */
 
509
      ++in_count;
 
510
      DBG_EP(fprintf(stderr, "IN client\n"));
 
511
      bool more_data = false;
 
512
      while (conn->read_more(&more_data)) {
 
513
        DBG_EP(fprintf(stderr, "IN client read_more\n"));
 
514
        conn->nb_last_io = now;
 
515
        if (!more_data) {
 
516
          break;
 
517
        }
 
518
      }
 
519
    }
 
520
  }
 
521
  /* EXECUTE */
 
522
  for (int i = 0; i < nfds; ++i) {
 
523
    epoll_event& ev = events[i];
 
524
    hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
 
525
    if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
 
526
      conn->cstate.readbuf.size() == 0) {
 
527
      continue;
 
528
    }
 
529
    const char ch = conn->cstate.readbuf.begin()[0];
 
530
    if (ch == 'Q') {
 
531
      vshared.shutdown = 1;
 
532
    } else if (ch == '/') {
 
533
      conn->cstate.readbuf.clear();
 
534
      conn->cstate.writebuf.clear();
 
535
      conn->read_finished = true;
 
536
      conn->write_finished = true;
 
537
    } else {
 
538
      execute_lines(*conn);
 
539
    }
 
540
  }
 
541
  /* COMMIT */
 
542
  dbctx->unlock_tables_if();
 
543
  const bool commit_error = dbctx->get_commit_error();
 
544
  dbctx->clear_error();
 
545
  /* WRITE */
 
546
  for (int i = 0; i < nfds; ++i) {
 
547
    epoll_event& ev = events[i];
 
548
    hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
 
549
    if (commit_error && conn != 0) {
 
550
      conn->reset();
 
551
      continue;
 
552
    }
 
553
    if ((ev.events & EPOLLOUT) == 0) {
 
554
      continue;
 
555
    }
 
556
    ++out_count;
 
557
    if (conn == 0) {
 
558
      /* listener */
 
559
      DBG_EP(fprintf(stderr, "OUT listener\n"));
 
560
    } else {
 
561
      /* client connection */
 
562
      DBG_EP(fprintf(stderr, "OUT client\n"));
 
563
      bool more_data = false;
 
564
      while (conn->write_more(&more_data)) {
 
565
        DBG_EP(fprintf(stderr, "OUT client write_more\n"));
 
566
        conn->nb_last_io = now;
 
567
        if (!more_data) {
 
568
          break;
 
569
        }
 
570
      }
 
571
    }
 
572
  }
 
573
  /* CLOSE */
 
574
  for (int i = 0; i < nfds; ++i) {
 
575
    epoll_event& ev = events[i];
 
576
    hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
 
577
    if (conn != 0 && conn->ok_to_close()) {
 
578
      DBG_EP(fprintf(stderr, "CLOSE close\n"));
 
579
      conns.erase_ptr(conn->conns_iter);
 
580
    }
 
581
  }
 
582
  /* TIMEOUT & cleanup */
 
583
  if (last_check_time + 10 < now) {
 
584
    for (hstcpsvr_conns_type::iterator i = conns.begin();
 
585
      i != conns.end(); ) {
 
586
      hstcpsvr_conns_type::iterator icur = i;
 
587
      ++i;
 
588
      if (cshared.sockargs.timeout != 0 &&
 
589
        (*icur)->nb_last_io + cshared.sockargs.timeout < now) {
 
590
        conns.erase_ptr((*icur)->conns_iter);
 
591
      }
 
592
    }
 
593
    last_check_time = now;
 
594
    DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
 
595
      conns.size()));
 
596
  }
 
597
  DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
 
598
    this, in_count, out_count, accept_count, conns.size()));
 
599
  if (conns.empty()) {
 
600
    dbctx->close_tables_if();
 
601
  }
 
602
  /* STATISTICS */
 
603
  const size_t num_conns = conns.size();
 
604
  dbctx->set_statistics(num_conns, 0);
 
605
  /* ENABLE/DISABLE ACCEPT */
 
606
  if (accept_balance != 0) {
 
607
    cshared.thread_num_conns[worker_id] = num_conns;
 
608
    size_t total_num_conns = 0;
 
609
    for (long i = 0; i < cshared.num_threads; ++i) {
 
610
      total_num_conns += cshared.thread_num_conns[i];
 
611
    }
 
612
    bool e_acc = false;
 
613
    if (num_conns < 10 ||
 
614
      total_num_conns * 2 > num_conns * cshared.num_threads) {
 
615
      e_acc = true;
 
616
    }
 
617
    epoll_event ev = { };
 
618
    ev.events = EPOLLIN;
 
619
    ev.data.ptr = 0;
 
620
    if (e_acc == accept_enabled) {
 
621
    } else if (e_acc) {
 
622
      if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
 
623
        != 0) {
 
624
        fatal_abort("epoll_ctl EPOLL_CTL_ADD");
 
625
      }
 
626
    } else {
 
627
      if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
 
628
        != 0) {
 
629
        fatal_abort("epoll_ctl EPOLL_CTL_ADD");
 
630
      }
 
631
    }
 
632
    accept_enabled = e_acc;
 
633
  }
 
634
  return 0;
 
635
}
 
636
#endif 
 
637
 
 
638
void
 
639
hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
 
640
{
 
641
  dbconnstate& cstate = conn.cstate;
 
642
  char *buf_end = cstate.readbuf.end();
 
643
  char *line_begin = cstate.readbuf.begin();
 
644
  while (true) {
 
645
    char *const nl = memchr_char(line_begin, '\n', buf_end - line_begin);
 
646
    if (nl == 0) {
 
647
      break;
 
648
    }
 
649
    char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
 
650
    execute_line(line_begin, lf, conn);
 
651
    line_begin = nl + 1;
 
652
  }
 
653
  cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
 
654
}
 
655
 
 
656
void
 
657
hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
 
658
{
 
659
  /* safe to modify, safe to dereference 'finish' */
 
660
  char *const cmd_begin = start;
 
661
  read_token(start, finish);
 
662
  char *const cmd_end = start;
 
663
  skip_one(start, finish);
 
664
  if (cmd_begin == cmd_end) {
 
665
    return conn.dbcb_resp_short(2, "cmd");
 
666
  }
 
667
  if (cmd_begin + 1 == cmd_end) {
 
668
    if (cmd_begin[0] == 'P') {
 
669
      if (cshared.require_auth && !conn.authorized) {
 
670
        return conn.dbcb_resp_short(3, "unauth");
 
671
      }
 
672
      return do_open_index(start, finish, conn);
 
673
    }
 
674
    if (cmd_begin[0] == 'A') {
 
675
      return do_authorization(start, finish, conn);
 
676
    }
 
677
  }
 
678
  if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
 
679
    if (cshared.require_auth && !conn.authorized) {
 
680
      return conn.dbcb_resp_short(3, "unauth");
 
681
    }
 
682
    return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
 
683
  }
 
684
  return conn.dbcb_resp_short(2, "cmd");
 
685
}
 
686
 
 
687
void
 
688
hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
 
689
{
 
690
  const size_t pst_id = read_ui32(start, finish);
 
691
  skip_one(start, finish);
 
692
  /* dbname */
 
693
  char *const dbname_begin = start;
 
694
  read_token(start, finish);
 
695
  char *const dbname_end = start;
 
696
  skip_one(start, finish);
 
697
  /* tblname */
 
698
  char *const tblname_begin = start;
 
699
  read_token(start, finish);
 
700
  char *const tblname_end = start;
 
701
  skip_one(start, finish);
 
702
  /* idxname */
 
703
  char *const idxname_begin = start;
 
704
  read_token(start, finish);
 
705
  char *const idxname_end = start;
 
706
  skip_one(start, finish);
 
707
  /* retfields */
 
708
  char *const retflds_begin = start;
 
709
  read_token(start, finish);
 
710
  char *const retflds_end = start;
 
711
  skip_one(start, finish);
 
712
  /* filfields */
 
713
  char *const filflds_begin = start;
 
714
  read_token(start, finish);
 
715
  char *const filflds_end = start;
 
716
  dbname_end[0] = 0;
 
717
  tblname_end[0] = 0;
 
718
  idxname_end[0] = 0;
 
719
  retflds_end[0] = 0;
 
720
  filflds_end[0] = 0;
 
721
  return dbctx->cmd_open_index(conn, pst_id, dbname_begin, tblname_begin,
 
722
    idxname_begin, retflds_begin, filflds_begin);
 
723
}
 
724
 
 
725
void
 
726
hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
 
727
  char *finish, hstcpsvr_conn& conn)
 
728
{
 
729
  cmd_exec_args args;
 
730
  const size_t pst_id = read_ui32(cmd_begin, cmd_end);
 
731
  if (pst_id >= conn.cstate.prep_stmts.size()) {
 
732
    return conn.dbcb_resp_short(2, "stmtnum");
 
733
  }
 
734
  args.pst = &conn.cstate.prep_stmts[pst_id];
 
735
  char *const op_begin = start;
 
736
  read_token(start, finish);
 
737
  char *const op_end = start;
 
738
  args.op = string_ref(op_begin, op_end);
 
739
  skip_one(start, finish);
 
740
  const uint32_t fldnum = read_ui32(start, finish);
 
741
  string_ref flds[fldnum]; /* GNU */
 
742
  args.kvals = flds;
 
743
  args.kvalslen = fldnum;
 
744
  for (size_t i = 0; i < fldnum; ++i) {
 
745
    skip_one(start, finish);
 
746
    char *const f_begin = start;
 
747
    read_token(start, finish);
 
748
    char *const f_end = start;
 
749
    if (is_null_expression(f_begin, f_end)) {
 
750
      /* null */
 
751
      flds[i] = string_ref();
 
752
    } else {
 
753
      /* non-null */
 
754
      char *wp = f_begin;
 
755
      unescape_string(wp, f_begin, f_end);
 
756
      flds[i] = string_ref(f_begin, wp - f_begin);
 
757
    }
 
758
  }
 
759
  skip_one(start, finish);
 
760
  args.limit = read_ui32(start, finish);
 
761
  skip_one(start, finish);
 
762
  args.skip = read_ui32(start, finish);
 
763
  if (start == finish) {
 
764
    /* simple query */
 
765
    return dbctx->cmd_exec_on_index(conn, args);
 
766
  }
 
767
  /* has filters or modops */
 
768
  skip_one(start, finish);
 
769
  /* filters */
 
770
  size_t filters_count = 0;
 
771
  while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
 
772
    char *const filter_type_begin = start;
 
773
    read_token(start, finish);
 
774
    char *const filter_type_end = start;
 
775
    skip_one(start, finish);
 
776
    char *const filter_op_begin = start;
 
777
    read_token(start, finish);
 
778
    char *const filter_op_end = start;
 
779
    skip_one(start, finish);
 
780
    const uint32_t ff_offset = read_ui32(start, finish);
 
781
    skip_one(start, finish);
 
782
    char *const filter_val_begin = start;
 
783
    read_token(start, finish);
 
784
    char *const filter_val_end = start;
 
785
    skip_one(start, finish);
 
786
    if (filters_work.size() <= filters_count) {
 
787
      filters_work.resize(filters_count + 1);
 
788
    }
 
789
    record_filter& fi = filters_work[filters_count];
 
790
    if (filter_type_end != filter_type_begin + 1) {
 
791
      return conn.dbcb_resp_short(2, "filtertype");
 
792
    }
 
793
    fi.filter_type = (filter_type_begin[0] == 'W')
 
794
      ? record_filter_type_break : record_filter_type_skip;
 
795
    const uint32_t num_filflds = args.pst->get_filter_fields().size();
 
796
    if (ff_offset >= num_filflds) {
 
797
      return conn.dbcb_resp_short(2, "filterfld");
 
798
    }
 
799
    fi.op = string_ref(filter_op_begin, filter_op_end);
 
800
    fi.ff_offset = ff_offset;
 
801
    if (is_null_expression(filter_val_begin, filter_val_end)) {
 
802
      /* null */
 
803
      fi.val = string_ref();
 
804
    } else {
 
805
      /* non-null */
 
806
      char *wp = filter_val_begin;
 
807
      unescape_string(wp, filter_val_begin, filter_val_end);
 
808
      fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
 
809
    }
 
810
    ++filters_count;
 
811
  }
 
812
  if (filters_count > 0) {
 
813
    if (filters_work.size() <= filters_count) {
 
814
      filters_work.resize(filters_count + 1);
 
815
    }
 
816
    filters_work[filters_count].op = string_ref(); /* sentinel */
 
817
    args.filters = &filters_work[0];
 
818
  } else {
 
819
    args.filters = 0;
 
820
  }
 
821
  if (start == finish) {
 
822
    /* no modops */
 
823
    return dbctx->cmd_exec_on_index(conn, args);
 
824
  }
 
825
  /* has modops */
 
826
  char *const mod_op_begin = start;
 
827
  read_token(start, finish);
 
828
  char *const mod_op_end = start;
 
829
  args.mod_op = string_ref(mod_op_begin, mod_op_end);
 
830
  const size_t num_uvals = args.pst->get_ret_fields().size();
 
831
  string_ref uflds[num_uvals]; /* GNU */
 
832
  for (size_t i = 0; i < num_uvals; ++i) {
 
833
    skip_one(start, finish);
 
834
    char *const f_begin = start;
 
835
    read_token(start, finish);
 
836
    char *const f_end = start;
 
837
    if (is_null_expression(f_begin, f_end)) {
 
838
      /* null */
 
839
      uflds[i] = string_ref();
 
840
    } else {
 
841
      /* non-null */
 
842
      char *wp = f_begin;
 
843
      unescape_string(wp, f_begin, f_end);
 
844
      uflds[i] = string_ref(f_begin, wp - f_begin);
 
845
    }
 
846
  }
 
847
  args.uvals = uflds;
 
848
  return dbctx->cmd_exec_on_index(conn, args);
 
849
}
 
850
 
 
851
void
 
852
hstcpsvr_worker::do_authorization(char *start, char *finish,
 
853
  hstcpsvr_conn& conn)
 
854
{
 
855
  /* auth type */
 
856
  char *const authtype_begin = start;
 
857
  read_token(start, finish);
 
858
  char *const authtype_end = start;
 
859
  const size_t authtype_len = authtype_end - authtype_begin;
 
860
  skip_one(start, finish);
 
861
  /* key */
 
862
  char *const key_begin = start;
 
863
  read_token(start, finish);
 
864
  char *const key_end = start;
 
865
  const size_t key_len = key_end - key_begin;
 
866
  authtype_end[0] = 0;
 
867
  key_end[0] = 0;
 
868
  char *wp = key_begin;
 
869
  unescape_string(wp, key_begin, key_end);
 
870
  if (authtype_len != 1 || authtype_begin[0] != '1') {
 
871
    return conn.dbcb_resp_short(2, "authtype");
 
872
  }
 
873
  if (cshared.plain_secret.size() == key_len &&
 
874
    memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
 
875
    conn.authorized = true;
 
876
  } else {
 
877
    conn.authorized = false;
 
878
  }
 
879
  if (!conn.authorized) {
 
880
    return conn.dbcb_resp_short(3, "unauth");
 
881
  } else {
 
882
    return conn.dbcb_resp_short(0, "");
 
883
  }
 
884
}
 
885
 
 
886
hstcpsvr_worker_ptr
 
887
hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
 
888
{
 
889
  return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));
 
890
}
 
891
 
 
892
};
 
893