5
* Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6
* See COPYRIGHT.txt for details.
9
#include <netinet/in.h>
18
#include <sys/epoll.h>
21
#include "hstcpsvr_worker.hpp"
22
#include "string_buffer.hpp"
23
#include "auto_ptrcontainer.hpp"
24
#include "string_util.hpp"
32
#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
33
#define MSG_NOSIGNAL 0
39
string_buffer readbuf;
40
string_buffer writebuf;
41
std::vector<prep_stmt> prep_stmts;
42
size_t resp_begin_pos;
49
dbconnstate() : resp_begin_pos(0) { }
53
typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
55
struct hstcpsvr_conn : public dbcallback_i {
58
sockaddr_storage addr;
67
hstcpsvr_conns_type::iterator conns_iter;
71
bool ok_to_close() const;
73
int accept(const hstcpsvr_shared_c& cshared);
74
bool write_more(bool *more_r = 0);
75
bool read_more(bool *more_r = 0);
77
virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
78
virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
79
virtual void dbcb_resp_short(uint32_t code, const char *msg);
80
virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
81
virtual void dbcb_resp_begin(size_t num_flds);
82
virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
83
virtual void dbcb_resp_end();
84
virtual void dbcb_resp_cancel();
86
hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
87
nonblocking(false), read_finished(false), write_finished(false),
88
nb_last_io(0), authorized(false) { }
92
hstcpsvr_conn::closed() const
98
hstcpsvr_conn::ok_to_close() const
100
return write_finished || (read_finished && cstate.writebuf.size() == 0);
104
hstcpsvr_conn::reset()
106
addr = sockaddr_storage();
107
addr_len = sizeof(addr);
110
read_finished = false;
111
write_finished = false;
115
hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
118
return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
123
hstcpsvr_conn::write_more(bool *more_r)
125
if (write_finished || cstate.writebuf.size() == 0) {
128
const size_t wlen = cstate.writebuf.size();
129
ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
131
if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
132
cstate.writebuf.clear();
133
write_finished = true;
137
cstate.writebuf.erase_front(len);
138
/* FIXME: reallocate memory if too large */
140
*more_r = (static_cast<size_t>(len) == wlen);
146
hstcpsvr_conn::read_more(bool *more_r)
151
const size_t block_size = readsize > 4096 ? readsize : 4096;
152
char *wp = cstate.readbuf.make_space(block_size);
153
const ssize_t len = read(fd.get(), wp, block_size);
155
if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
156
read_finished = true;
160
cstate.readbuf.space_wrote(len);
162
*more_r = (static_cast<size_t>(len) == block_size);
168
hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
170
if (cstate.prep_stmts.size() <= pst_id) {
171
cstate.prep_stmts.resize(pst_id + 1);
173
cstate.prep_stmts[pst_id] = v;
177
hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
179
if (cstate.prep_stmts.size() <= pst_id) {
182
return &cstate.prep_stmts[pst_id];
186
hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
188
write_ui32(cstate.writebuf, code);
189
const size_t msglen = strlen(msg);
191
cstate.writebuf.append_literal("\t1\t");
192
cstate.writebuf.append(msg, msg + msglen);
194
cstate.writebuf.append_literal("\t1");
196
cstate.writebuf.append_literal("\n");
200
hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
202
write_ui32(cstate.writebuf, code);
203
cstate.writebuf.append_literal("\t1\t");
204
write_ui32(cstate.writebuf, value);
205
cstate.writebuf.append_literal("\n");
209
hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
211
cstate.resp_begin_pos = cstate.writebuf.size();
212
cstate.writebuf.append_literal("0\t");
213
write_ui32(cstate.writebuf, num_flds);
217
hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
220
cstate.writebuf.append_literal("\t");
221
escape_string(cstate.writebuf, fld, fld + fldlen);
223
static const char t[] = "\t\0";
224
cstate.writebuf.append(t, t + 2);
229
hstcpsvr_conn::dbcb_resp_end()
231
cstate.writebuf.append_literal("\n");
232
cstate.resp_begin_pos = 0;
236
hstcpsvr_conn::dbcb_resp_cancel()
238
cstate.writebuf.resize(cstate.resp_begin_pos);
239
cstate.resp_begin_pos = 0;
242
struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
243
hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
246
const hstcpsvr_shared_c& cshared;
247
volatile hstcpsvr_shared_v& vshared;
250
hstcpsvr_conns_type conns; /* conns refs dbctx */
251
time_t last_check_time;
252
std::vector<pollfd> pfds;
254
std::vector<epoll_event> events_vec;
259
std::vector<record_filter> filters_work;
263
void execute_lines(hstcpsvr_conn& conn);
264
void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
265
void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
266
void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
267
char *finish, hstcpsvr_conn& conn);
268
void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
271
hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
272
: cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
273
dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
274
last_check_time(time(0)), accept_enabled(true), accept_balance(0)
277
if (cshared.sockargs.use_epoll) {
278
epoll_fd.reset(epoll_create(10));
279
if (epoll_fd.get() < 0) {
280
fatal_abort("epoll_create");
282
epoll_event ev = { };
285
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
287
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
289
events_vec.resize(10240);
292
accept_balance = cshared.conf.get_int("accept_balance", 0);
298
thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
299
dbctx->init_thread(this, shutdown_flag);
302
dbctx->term_thread();
304
const dbcontext_ptr& dbctx;
310
hstcpsvr_worker::run()
312
thr_init initobj(dbctx, vshared.shutdown);
315
if (cshared.sockargs.use_epoll) {
316
while (!vshared.shutdown && dbctx->check_alive()) {
319
} else if (cshared.sockargs.nonblocking) {
320
while (!vshared.shutdown && dbctx->check_alive()) {
325
fatal_abort("run_one");
328
while (!vshared.shutdown && dbctx->check_alive()) {
335
hstcpsvr_worker::run_one_nb()
339
for (hstcpsvr_conns_type::const_iterator i = conns.begin();
340
i != conns.end(); ++i) {
341
if (pfds.size() <= nfds) {
342
pfds.resize(nfds + 1);
344
pollfd& pfd = pfds[nfds++];
345
pfd.fd = (*i)->fd.get();
347
if ((*i)->cstate.writebuf.size() != 0) {
352
pfd.events = pfd.revents = ev;
356
const size_t cpt = cshared.nb_conn_per_thread;
357
const short ev = (cpt > nfds) ? POLLIN : 0;
358
if (pfds.size() <= nfds) {
359
pfds.resize(nfds + 1);
361
pollfd& pfd = pfds[nfds++];
362
pfd.fd = cshared.listen_fd.get();
363
pfd.events = pfd.revents = ev;
366
const int npollev = poll(&pfds[0], nfds, 1 * 1000);
367
dbctx->set_statistics(conns.size(), npollev);
368
const time_t now = time(0);
370
const short mask_in = ~POLLOUT;
371
const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
373
for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
375
pollfd& pfd = pfds[j];
376
if ((pfd.revents & mask_in) == 0) {
379
hstcpsvr_conn& conn = **i;
380
if (conn.read_more()) {
381
if (conn.cstate.readbuf.size() > 0) {
382
const char ch = conn.cstate.readbuf.begin()[0];
384
vshared.shutdown = 1;
385
} else if (ch == '/') {
386
conn.cstate.readbuf.clear();
387
conn.cstate.writebuf.clear();
388
conn.read_finished = true;
389
conn.write_finished = true;
392
conn.nb_last_io = now;
397
for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
399
pollfd& pfd = pfds[j];
400
if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
406
dbctx->unlock_tables_if();
407
const bool commit_error = dbctx->get_commit_error();
408
dbctx->clear_error();
411
for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
413
pollfd& pfd = pfds[j];
414
hstcpsvr_conn& conn = **i;
415
hstcpsvr_conns_type::iterator icur = i;
421
if ((pfd.revents & (mask_out | mask_in)) != 0) {
422
if (conn.write_more()) {
423
conn.nb_last_io = now;
426
if (cshared.sockargs.timeout != 0 &&
427
conn.nb_last_io + cshared.sockargs.timeout < now) {
430
if (conn.closed() || conn.ok_to_close()) {
431
conns.erase_ptr(icur);
436
pollfd& pfd = pfds[nfds - 1];
437
if ((pfd.revents & mask_in) != 0) {
438
std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
439
c->nonblocking = true;
440
c->readsize = cshared.readsize;
442
if (c->fd.get() >= 0) {
443
if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
444
fatal_abort("F_SETFL O_NONBLOCK");
447
conns.push_back_ptr(c);
449
/* errno == 11 (EAGAIN) is not a fatal error. */
450
DENA_VERBOSE(100, fprintf(stderr,
451
"accept failed: errno=%d (not fatal)\n", errno));
455
DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
458
dbctx->close_tables_if();
460
dbctx->set_statistics(conns.size(), 0);
466
hstcpsvr_worker::run_one_ep()
468
epoll_event *const events = &events_vec[0];
469
const size_t num_events = events_vec.size();
470
const time_t now = time(0);
471
size_t in_count = 0, out_count = 0, accept_count = 0;
472
int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
474
dbctx->set_statistics(conns.size(), nfds);
475
for (int i = 0; i < nfds; ++i) {
476
epoll_event& ev = events[i];
477
if ((ev.events & EPOLLIN) == 0) {
480
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
484
DBG_EP(fprintf(stderr, "IN listener\n"));
485
std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
486
c->nonblocking = true;
487
c->readsize = cshared.readsize;
489
if (c->fd.get() >= 0) {
490
if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
491
fatal_abort("F_SETFL O_NONBLOCK");
493
epoll_event cev = { };
494
cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
495
cev.data.ptr = c.get();
497
const int fd = c->fd.get();
498
conns.push_back_ptr(c);
499
conns.back()->conns_iter = --conns.end();
500
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
501
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
504
DENA_VERBOSE(100, fprintf(stderr,
505
"accept failed: errno=%d (not fatal)\n", errno));
508
/* client connection */
510
DBG_EP(fprintf(stderr, "IN client\n"));
511
bool more_data = false;
512
while (conn->read_more(&more_data)) {
513
DBG_EP(fprintf(stderr, "IN client read_more\n"));
514
conn->nb_last_io = now;
522
for (int i = 0; i < nfds; ++i) {
523
epoll_event& ev = events[i];
524
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
525
if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
526
conn->cstate.readbuf.size() == 0) {
529
const char ch = conn->cstate.readbuf.begin()[0];
531
vshared.shutdown = 1;
532
} else if (ch == '/') {
533
conn->cstate.readbuf.clear();
534
conn->cstate.writebuf.clear();
535
conn->read_finished = true;
536
conn->write_finished = true;
538
execute_lines(*conn);
542
dbctx->unlock_tables_if();
543
const bool commit_error = dbctx->get_commit_error();
544
dbctx->clear_error();
546
for (int i = 0; i < nfds; ++i) {
547
epoll_event& ev = events[i];
548
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
549
if (commit_error && conn != 0) {
553
if ((ev.events & EPOLLOUT) == 0) {
559
DBG_EP(fprintf(stderr, "OUT listener\n"));
561
/* client connection */
562
DBG_EP(fprintf(stderr, "OUT client\n"));
563
bool more_data = false;
564
while (conn->write_more(&more_data)) {
565
DBG_EP(fprintf(stderr, "OUT client write_more\n"));
566
conn->nb_last_io = now;
574
for (int i = 0; i < nfds; ++i) {
575
epoll_event& ev = events[i];
576
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
577
if (conn != 0 && conn->ok_to_close()) {
578
DBG_EP(fprintf(stderr, "CLOSE close\n"));
579
conns.erase_ptr(conn->conns_iter);
582
/* TIMEOUT & cleanup */
583
if (last_check_time + 10 < now) {
584
for (hstcpsvr_conns_type::iterator i = conns.begin();
585
i != conns.end(); ) {
586
hstcpsvr_conns_type::iterator icur = i;
588
if (cshared.sockargs.timeout != 0 &&
589
(*icur)->nb_last_io + cshared.sockargs.timeout < now) {
590
conns.erase_ptr((*icur)->conns_iter);
593
last_check_time = now;
594
DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
597
DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
598
this, in_count, out_count, accept_count, conns.size()));
600
dbctx->close_tables_if();
603
const size_t num_conns = conns.size();
604
dbctx->set_statistics(num_conns, 0);
605
/* ENABLE/DISABLE ACCEPT */
606
if (accept_balance != 0) {
607
cshared.thread_num_conns[worker_id] = num_conns;
608
size_t total_num_conns = 0;
609
for (long i = 0; i < cshared.num_threads; ++i) {
610
total_num_conns += cshared.thread_num_conns[i];
613
if (num_conns < 10 ||
614
total_num_conns * 2 > num_conns * cshared.num_threads) {
617
epoll_event ev = { };
620
if (e_acc == accept_enabled) {
622
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
624
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
627
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
629
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
632
accept_enabled = e_acc;
639
hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
641
dbconnstate& cstate = conn.cstate;
642
char *buf_end = cstate.readbuf.end();
643
char *line_begin = cstate.readbuf.begin();
645
char *const nl = memchr_char(line_begin, '\n', buf_end - line_begin);
649
char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
650
execute_line(line_begin, lf, conn);
653
cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
657
hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
659
/* safe to modify, safe to dereference 'finish' */
660
char *const cmd_begin = start;
661
read_token(start, finish);
662
char *const cmd_end = start;
663
skip_one(start, finish);
664
if (cmd_begin == cmd_end) {
665
return conn.dbcb_resp_short(2, "cmd");
667
if (cmd_begin + 1 == cmd_end) {
668
if (cmd_begin[0] == 'P') {
669
if (cshared.require_auth && !conn.authorized) {
670
return conn.dbcb_resp_short(3, "unauth");
672
return do_open_index(start, finish, conn);
674
if (cmd_begin[0] == 'A') {
675
return do_authorization(start, finish, conn);
678
if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
679
if (cshared.require_auth && !conn.authorized) {
680
return conn.dbcb_resp_short(3, "unauth");
682
return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
684
return conn.dbcb_resp_short(2, "cmd");
688
hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
690
const size_t pst_id = read_ui32(start, finish);
691
skip_one(start, finish);
693
char *const dbname_begin = start;
694
read_token(start, finish);
695
char *const dbname_end = start;
696
skip_one(start, finish);
698
char *const tblname_begin = start;
699
read_token(start, finish);
700
char *const tblname_end = start;
701
skip_one(start, finish);
703
char *const idxname_begin = start;
704
read_token(start, finish);
705
char *const idxname_end = start;
706
skip_one(start, finish);
708
char *const retflds_begin = start;
709
read_token(start, finish);
710
char *const retflds_end = start;
711
skip_one(start, finish);
713
char *const filflds_begin = start;
714
read_token(start, finish);
715
char *const filflds_end = start;
721
return dbctx->cmd_open_index(conn, pst_id, dbname_begin, tblname_begin,
722
idxname_begin, retflds_begin, filflds_begin);
726
hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
727
char *finish, hstcpsvr_conn& conn)
730
const size_t pst_id = read_ui32(cmd_begin, cmd_end);
731
if (pst_id >= conn.cstate.prep_stmts.size()) {
732
return conn.dbcb_resp_short(2, "stmtnum");
734
args.pst = &conn.cstate.prep_stmts[pst_id];
735
char *const op_begin = start;
736
read_token(start, finish);
737
char *const op_end = start;
738
args.op = string_ref(op_begin, op_end);
739
skip_one(start, finish);
740
const uint32_t fldnum = read_ui32(start, finish);
741
string_ref flds[fldnum]; /* GNU */
743
args.kvalslen = fldnum;
744
for (size_t i = 0; i < fldnum; ++i) {
745
skip_one(start, finish);
746
char *const f_begin = start;
747
read_token(start, finish);
748
char *const f_end = start;
749
if (is_null_expression(f_begin, f_end)) {
751
flds[i] = string_ref();
755
unescape_string(wp, f_begin, f_end);
756
flds[i] = string_ref(f_begin, wp - f_begin);
759
skip_one(start, finish);
760
args.limit = read_ui32(start, finish);
761
skip_one(start, finish);
762
args.skip = read_ui32(start, finish);
763
if (start == finish) {
765
return dbctx->cmd_exec_on_index(conn, args);
767
/* has filters or modops */
768
skip_one(start, finish);
770
size_t filters_count = 0;
771
while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
772
char *const filter_type_begin = start;
773
read_token(start, finish);
774
char *const filter_type_end = start;
775
skip_one(start, finish);
776
char *const filter_op_begin = start;
777
read_token(start, finish);
778
char *const filter_op_end = start;
779
skip_one(start, finish);
780
const uint32_t ff_offset = read_ui32(start, finish);
781
skip_one(start, finish);
782
char *const filter_val_begin = start;
783
read_token(start, finish);
784
char *const filter_val_end = start;
785
skip_one(start, finish);
786
if (filters_work.size() <= filters_count) {
787
filters_work.resize(filters_count + 1);
789
record_filter& fi = filters_work[filters_count];
790
if (filter_type_end != filter_type_begin + 1) {
791
return conn.dbcb_resp_short(2, "filtertype");
793
fi.filter_type = (filter_type_begin[0] == 'W')
794
? record_filter_type_break : record_filter_type_skip;
795
const uint32_t num_filflds = args.pst->get_filter_fields().size();
796
if (ff_offset >= num_filflds) {
797
return conn.dbcb_resp_short(2, "filterfld");
799
fi.op = string_ref(filter_op_begin, filter_op_end);
800
fi.ff_offset = ff_offset;
801
if (is_null_expression(filter_val_begin, filter_val_end)) {
803
fi.val = string_ref();
806
char *wp = filter_val_begin;
807
unescape_string(wp, filter_val_begin, filter_val_end);
808
fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
812
if (filters_count > 0) {
813
if (filters_work.size() <= filters_count) {
814
filters_work.resize(filters_count + 1);
816
filters_work[filters_count].op = string_ref(); /* sentinel */
817
args.filters = &filters_work[0];
821
if (start == finish) {
823
return dbctx->cmd_exec_on_index(conn, args);
826
char *const mod_op_begin = start;
827
read_token(start, finish);
828
char *const mod_op_end = start;
829
args.mod_op = string_ref(mod_op_begin, mod_op_end);
830
const size_t num_uvals = args.pst->get_ret_fields().size();
831
string_ref uflds[num_uvals]; /* GNU */
832
for (size_t i = 0; i < num_uvals; ++i) {
833
skip_one(start, finish);
834
char *const f_begin = start;
835
read_token(start, finish);
836
char *const f_end = start;
837
if (is_null_expression(f_begin, f_end)) {
839
uflds[i] = string_ref();
843
unescape_string(wp, f_begin, f_end);
844
uflds[i] = string_ref(f_begin, wp - f_begin);
848
return dbctx->cmd_exec_on_index(conn, args);
852
hstcpsvr_worker::do_authorization(char *start, char *finish,
856
char *const authtype_begin = start;
857
read_token(start, finish);
858
char *const authtype_end = start;
859
const size_t authtype_len = authtype_end - authtype_begin;
860
skip_one(start, finish);
862
char *const key_begin = start;
863
read_token(start, finish);
864
char *const key_end = start;
865
const size_t key_len = key_end - key_begin;
868
char *wp = key_begin;
869
unescape_string(wp, key_begin, key_end);
870
if (authtype_len != 1 || authtype_begin[0] != '1') {
871
return conn.dbcb_resp_short(2, "authtype");
873
if (cshared.plain_secret.size() == key_len &&
874
memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
875
conn.authorized = true;
877
conn.authorized = false;
879
if (!conn.authorized) {
880
return conn.dbcb_resp_short(3, "unauth");
882
return conn.dbcb_resp_short(0, "");
887
hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
889
return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));