~percona-dev/percona-server/release-5.5.11-20.2-fix-bug-764138

« back to all changes in this revision

Viewing changes to HandlerSocket-Plugin-for-MySQL/libhsclient/hstcpcli.cpp

  • Committer: Ignacio Nin
  • Date: 2011-03-13 17:18:23 UTC
  • mfrom: (33.3.17 release-5.5.8-20)
  • Revision ID: ignacio.nin@percona.com-20110313171823-m06xs104nekulywb
Merge changes from release-5.5.8-20 to 5.5.9

Merge changes from the release branch of 5.5.8 to 5.5.9. These include
the HandlerSocket and UDF directories and the building scripts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
// vim:sw=2:ai
 
3
 
 
4
/*
 
5
 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
 
6
 * See COPYRIGHT.txt for details.
 
7
 */
 
8
 
 
9
#include <stdexcept>
 
10
 
 
11
#include "hstcpcli.hpp"
 
12
#include "auto_file.hpp"
 
13
#include "string_util.hpp"
 
14
#include "auto_addrinfo.hpp"
 
15
#include "escape.hpp"
 
16
#include "util.hpp"
 
17
 
 
18
/* TODO */
 
19
#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
 
20
#define MSG_NOSIGNAL 0
 
21
#endif
 
22
 
 
23
#define DBG(x)
 
24
 
 
25
namespace dena {
 
26
 
 
27
struct hstcpcli : public hstcpcli_i, private noncopyable {
 
28
  hstcpcli(const socket_args& args);
 
29
  virtual void close();
 
30
  virtual int reconnect();
 
31
  virtual bool stable_point();
 
32
  virtual void request_buf_open_index(size_t pst_id, const char *dbn,
 
33
    const char *tbl, const char *idx, const char *retflds, const char *filflds);
 
34
  #if 0
 
35
  virtual void request_buf_find(size_t pst_id, const string_ref& op,
 
36
    const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip);
 
37
  virtual void request_buf_insert(size_t pst_id, const string_ref *fvs,
 
38
    size_t fvslen);
 
39
  virtual void request_buf_update(size_t pst_id, const string_ref& op,
 
40
    const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip,
 
41
    const string_ref *mvs, size_t mvslen);
 
42
  virtual void request_buf_delete(size_t pst_id, const string_ref& op,
 
43
    const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip);
 
44
  #endif
 
45
  virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op,
 
46
    const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip,
 
47
    const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
 
48
    const hstcpcli_filter *fils, size_t filslen);
 
49
  virtual int request_send();
 
50
  virtual int response_recv(size_t& num_flds_r);
 
51
  virtual const string_ref *get_next_row();
 
52
  virtual void response_buf_remove();
 
53
  virtual int get_error_code();
 
54
  virtual std::string get_error();
 
55
 private:
 
56
  int read_more();
 
57
  void clear_error();
 
58
  int set_error(int code, const std::string& str);
 
59
 private:
 
60
  auto_file fd;
 
61
  socket_args sargs;
 
62
  string_buffer readbuf;
 
63
  string_buffer writebuf;
 
64
  size_t response_end_offset; /* incl newline */
 
65
  size_t cur_row_offset;
 
66
  size_t num_flds;
 
67
  size_t num_req_bufd; /* buffered but not yet sent */
 
68
  size_t num_req_sent; /* sent but not yet received */
 
69
  size_t num_req_rcvd; /* received but not yet removed */
 
70
  int error_code;
 
71
  std::string error_str;
 
72
  std::vector<string_ref> flds;
 
73
};
 
74
 
 
75
hstcpcli::hstcpcli(const socket_args& args)
 
76
  : sargs(args), response_end_offset(0), cur_row_offset(0), num_flds(0),
 
77
    num_req_bufd(0), num_req_sent(0), num_req_rcvd(0), error_code(0)
 
78
{
 
79
  std::string err;
 
80
  if (socket_connect(fd, sargs, err) != 0) {
 
81
    set_error(-1, err);
 
82
  }
 
83
}
 
84
 
 
85
void
 
86
hstcpcli::close()
 
87
{
 
88
  fd.close();
 
89
  readbuf.clear();
 
90
  writebuf.clear();
 
91
  flds.clear();
 
92
  response_end_offset = 0;
 
93
  cur_row_offset = 0;
 
94
  num_flds = 0;
 
95
  num_req_bufd = 0;
 
96
  num_req_sent = 0;
 
97
  num_req_rcvd = 0;
 
98
}
 
99
 
 
100
int
 
101
hstcpcli::reconnect()
 
102
{
 
103
  clear_error();
 
104
  close();
 
105
  std::string err;
 
106
  if (socket_connect(fd, sargs, err) != 0) {
 
107
    set_error(-1, err);
 
108
  }
 
109
  return error_code;
 
110
}
 
111
 
 
112
bool
 
113
hstcpcli::stable_point()
 
114
{
 
115
  /* returns true if cli can send a new request */
 
116
  return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 &&
 
117
    num_req_rcvd == 0 && response_end_offset == 0;
 
118
}
 
119
 
 
120
int
 
121
hstcpcli::get_error_code()
 
122
{
 
123
  return error_code;
 
124
}
 
125
 
 
126
std::string
 
127
hstcpcli::get_error()
 
128
{
 
129
  return error_str;
 
130
}
 
131
 
 
132
int
 
133
hstcpcli::read_more()
 
134
{
 
135
  const size_t block_size = 4096; // FIXME
 
136
  char *const wp = readbuf.make_space(block_size);
 
137
  const ssize_t rlen = read(fd.get(), wp, block_size);
 
138
  if (rlen <= 0) {
 
139
    if (rlen < 0) {
 
140
      error_str = "read: failed";
 
141
    } else {
 
142
      error_str = "read: eof";
 
143
    }
 
144
    return rlen;
 
145
  }
 
146
  readbuf.space_wrote(rlen);
 
147
  return rlen;
 
148
}
 
149
 
 
150
void
 
151
hstcpcli::clear_error()
 
152
{
 
153
  DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code));
 
154
  error_code = 0;
 
155
  error_str.clear();
 
156
}
 
157
 
 
158
int
 
159
hstcpcli::set_error(int code, const std::string& str)
 
160
{
 
161
  DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
 
162
  error_code = code;
 
163
  error_str = str;
 
164
  return error_code;
 
165
}
 
166
 
 
167
void
 
168
hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn,
 
169
  const char *tbl, const char *idx, const char *retflds, const char *filflds)
 
170
{
 
171
  if (num_req_sent > 0 || num_req_rcvd > 0) {
 
172
    close();
 
173
    set_error(-1, "request_buf_open_index: protocol out of sync");
 
174
    return;
 
175
  }
 
176
  const string_ref dbn_ref(dbn, strlen(dbn));
 
177
  const string_ref tbl_ref(tbl, strlen(tbl));
 
178
  const string_ref idx_ref(idx, strlen(idx));
 
179
  const string_ref rfs_ref(retflds, strlen(retflds));
 
180
  writebuf.append_literal("P\t");
 
181
  append_uint32(writebuf, pst_id); // FIXME size_t ?
 
182
  writebuf.append_literal("\t");
 
183
  writebuf.append(dbn_ref.begin(), dbn_ref.end());
 
184
  writebuf.append_literal("\t");
 
185
  writebuf.append(tbl_ref.begin(), tbl_ref.end());
 
186
  writebuf.append_literal("\t");
 
187
  writebuf.append(idx_ref.begin(), idx_ref.end());
 
188
  writebuf.append_literal("\t");
 
189
  writebuf.append(rfs_ref.begin(), rfs_ref.end());
 
190
  if (filflds != 0) {
 
191
    const string_ref fls_ref(filflds, strlen(filflds));
 
192
    writebuf.append_literal("\t");
 
193
    writebuf.append(fls_ref.begin(), fls_ref.end());
 
194
  }
 
195
  writebuf.append_literal("\n");
 
196
  ++num_req_bufd;
 
197
}
 
198
 
 
199
namespace {
 
200
 
 
201
void
 
202
append_delim_value(string_buffer& buf, const char *start, const char *finish)
 
203
{
 
204
  if (start == 0) {
 
205
    /* null */
 
206
    const char t[] = "\t\0";
 
207
    buf.append(t, t + 2);
 
208
  } else {
 
209
    /* non-null */
 
210
    buf.append_literal("\t");
 
211
    escape_string(buf, start, finish);
 
212
  }
 
213
}
 
214
 
 
215
};
 
216
 
 
217
void
 
218
hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op,
 
219
  const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip,
 
220
  const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
 
221
  const hstcpcli_filter *fils, size_t filslen)
 
222
{
 
223
  if (num_req_sent > 0 || num_req_rcvd > 0) {
 
224
    close();
 
225
    set_error(-1, "request_buf_exec_generic: protocol out of sync");
 
226
    return;
 
227
  }
 
228
  append_uint32(writebuf, pst_id); // FIXME size_t ?
 
229
  writebuf.append_literal("\t");
 
230
  writebuf.append(op.begin(), op.end());
 
231
  writebuf.append_literal("\t");
 
232
  append_uint32(writebuf, kvslen); // FIXME size_t ?
 
233
  for (size_t i = 0; i < kvslen; ++i) {
 
234
    const string_ref& kv = kvs[i];
 
235
    append_delim_value(writebuf, kv.begin(), kv.end());
 
236
  }
 
237
  if (limit != 0 || skip != 0 || mod_op.size() != 0 || filslen != 0) {
 
238
    writebuf.append_literal("\t");
 
239
    append_uint32(writebuf, limit); // FIXME size_t ?
 
240
    if (skip != 0 || mod_op.size() != 0 || filslen != 0) {
 
241
      writebuf.append_literal("\t");
 
242
      append_uint32(writebuf, skip); // FIXME size_t ?
 
243
    }
 
244
    for (size_t i = 0; i < filslen; ++i) {
 
245
      const hstcpcli_filter& f = fils[i];
 
246
      writebuf.append_literal("\t");
 
247
      writebuf.append(f.filter_type.begin(), f.filter_type.end());
 
248
      writebuf.append_literal("\t");
 
249
      writebuf.append(f.op.begin(), f.op.end());
 
250
      writebuf.append_literal("\t");
 
251
      append_uint32(writebuf, f.ff_offset);
 
252
      append_delim_value(writebuf, f.val.begin(), f.val.end());
 
253
    }
 
254
    if (mod_op.size() != 0) {
 
255
      writebuf.append_literal("\t");
 
256
      writebuf.append(mod_op.begin(), mod_op.end());
 
257
      for (size_t i = 0; i < mvslen; ++i) {
 
258
        const string_ref& mv = mvs[i];
 
259
        append_delim_value(writebuf, mv.begin(), mv.end());
 
260
      }
 
261
    }
 
262
  }
 
263
  writebuf.append_literal("\n");
 
264
  ++num_req_bufd;
 
265
}
 
266
 
 
267
#if 0
 
268
void
 
269
hstcpcli::request_buf_find(size_t pst_id, const string_ref& op,
 
270
  const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip)
 
271
{
 
272
  return request_buf_exec_generic(pst_id, op, kvs, kvslen, limit, skip,
 
273
    0, 0, 0);
 
274
}
 
275
 
 
276
void
 
277
hstcpcli::request_buf_insert(size_t pst_id, const string_ref *fvs,
 
278
  size_t fvslen)
 
279
{
 
280
  const string_ref insert_op("+", 1);
 
281
  return request_buf_exec_generic(pst_id, insert_op, fvs, fvslen,
 
282
    0, 0, string_ref(), 0, 0);
 
283
}
 
284
 
 
285
void
 
286
hstcpcli::request_buf_update(size_t pst_id, const string_ref& op,
 
287
  const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip,
 
288
  const string_ref *mvs, size_t mvslen)
 
289
{
 
290
  const string_ref modop_update("U", 1);
 
291
  return request_buf_exec_generic(pst_id, op, kvs, kvslen, limit, skip,
 
292
    modop_update, mvs, mvslen);
 
293
}
 
294
 
 
295
void
 
296
hstcpcli::request_buf_delete(size_t pst_id, const string_ref& op,
 
297
  const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip)
 
298
{
 
299
  const string_ref modop_delete("D", 1);
 
300
  return request_buf_exec_generic(pst_id, op, kvs, kvslen, limit, skip,
 
301
    modop_delete, 0, 0);
 
302
}
 
303
#endif
 
304
 
 
305
int
 
306
hstcpcli::request_send()
 
307
{
 
308
  if (error_code < 0) {
 
309
    return error_code;
 
310
  }
 
311
  clear_error();
 
312
  if (fd.get() < 0) {
 
313
    close();
 
314
    return set_error(-1, "write: closed");
 
315
  }
 
316
  if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) {
 
317
    close();
 
318
    return set_error(-1, "request_send: protocol out of sync");
 
319
  }
 
320
  const size_t wrlen = writebuf.size();
 
321
  const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL);
 
322
  if (r <= 0) {
 
323
    close();
 
324
    return set_error(-1, r < 0 ? "write: failed" : "write: eof");
 
325
  }
 
326
  writebuf.erase_front(r);
 
327
  if (static_cast<size_t>(r) != wrlen) {
 
328
    close();
 
329
    return set_error(-1, "write: incomplete");
 
330
  }
 
331
  num_req_sent = num_req_bufd;
 
332
  num_req_bufd = 0;
 
333
  DBG(fprintf(stderr, "REQSEND 0\n"));
 
334
  return 0;
 
335
}
 
336
 
 
337
int
 
338
hstcpcli::response_recv(size_t& num_flds_r)
 
339
{
 
340
  if (error_code < 0) {
 
341
    return error_code;
 
342
  }
 
343
  clear_error();
 
344
  if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 ||
 
345
    response_end_offset != 0) {
 
346
    close();
 
347
    return set_error(-1, "response_recv: protocol out of sync");
 
348
  }
 
349
  cur_row_offset = 0;
 
350
  num_flds_r = num_flds = 0;
 
351
  if (fd.get() < 0) {
 
352
    return set_error(-1, "read: closed");
 
353
  }
 
354
  size_t offset = 0;
 
355
  while (true) {
 
356
    const char *const lbegin = readbuf.begin() + offset;
 
357
    const char *const lend = readbuf.end();
 
358
    const char *const nl = memchr_char(lbegin, '\n', lend - lbegin);
 
359
    if (nl != 0) {
 
360
      offset = (nl + 1) - readbuf.begin();
 
361
      break;
 
362
    }
 
363
    if (read_more() <= 0) {
 
364
      close();
 
365
      return set_error(-1, "read: eof");
 
366
    }
 
367
  }
 
368
  response_end_offset = offset;
 
369
  --num_req_sent;
 
370
  ++num_req_rcvd;
 
371
  char *start = readbuf.begin();
 
372
  char *const finish = start + response_end_offset - 1;
 
373
  const size_t resp_code = read_ui32(start, finish);
 
374
  skip_one(start, finish);
 
375
  num_flds_r = num_flds = read_ui32(start, finish);
 
376
  if (resp_code != 0) {
 
377
    skip_one(start, finish);
 
378
    char *const err_begin = start;
 
379
    read_token(start, finish);
 
380
    char *const err_end = start;
 
381
    std::string e = std::string(err_begin, err_end - err_begin);
 
382
    if (e.empty()) {
 
383
      e = "unknown_error";
 
384
    }
 
385
    return set_error(resp_code, e);
 
386
  }
 
387
  cur_row_offset = start - readbuf.begin();
 
388
  DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n",
 
389
    std::string(readbuf.begin(), readbuf.begin() + response_end_offset)
 
390
      .c_str(),
 
391
    cur_row_offset, response_end_offset));
 
392
  DBG(fprintf(stderr, "RES 0\n"));
 
393
  return 0;
 
394
}
 
395
 
 
396
const string_ref *
 
397
hstcpcli::get_next_row()
 
398
{
 
399
  if (num_flds == 0) {
 
400
    DBG(fprintf(stderr, "GNR NF 0\n"));
 
401
    return 0;
 
402
  }
 
403
  if (flds.size() < num_flds) {
 
404
    flds.resize(num_flds);
 
405
  }
 
406
  char *start = readbuf.begin() + cur_row_offset;
 
407
  char *const finish = readbuf.begin() + response_end_offset - 1;
 
408
  if (start >= finish) { /* start[0] == nl */
 
409
    DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
 
410
    return 0;
 
411
  }
 
412
  for (size_t i = 0; i < num_flds; ++i) {
 
413
    skip_one(start, finish);
 
414
    char *const fld_begin = start;
 
415
    read_token(start, finish);
 
416
    char *const fld_end = start;
 
417
    char *wp = fld_begin;
 
418
    if (is_null_expression(fld_begin, fld_end)) {
 
419
      /* null */
 
420
      flds[i] = string_ref();
 
421
    } else {
 
422
      unescape_string(wp, fld_begin, fld_end); /* in-place */
 
423
      flds[i] = string_ref(fld_begin, wp);
 
424
    }
 
425
  }
 
426
  cur_row_offset = start - readbuf.begin();
 
427
  return &flds[0];
 
428
}
 
429
 
 
430
void
 
431
hstcpcli::response_buf_remove()
 
432
{
 
433
  if (response_end_offset == 0) {
 
434
    close();
 
435
    set_error(-1, "response_buf_remove: protocol out of sync");
 
436
    return;
 
437
  }
 
438
  readbuf.erase_front(response_end_offset);
 
439
  response_end_offset = 0;
 
440
  --num_req_rcvd;
 
441
  cur_row_offset = 0;
 
442
  num_flds = 0;
 
443
  flds.clear();
 
444
}
 
445
 
 
446
hstcpcli_ptr
 
447
hstcpcli_i::create(const socket_args& args)
 
448
{
 
449
  return hstcpcli_ptr(new hstcpcli(args));
 
450
}
 
451
 
 
452
};
 
453