1
/* Copyright (C) 2007 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
20
#include <sys/types.h>
22
#ifdef HAVE_SYS_FILIO_H
24
* required for FIONREAD on solaris
26
#include <sys/filio.h>
29
#include <sys/ioctl.h>
30
#include <sys/socket.h>
32
#include <netinet/in.h>
33
#include <netinet/tcp.h>
49
#include "network-mysqld.h"
53
* 4.1 uses other defines
55
* this should be one step to get closer to backward-compatibility
57
#if defined(COM_EXECUTE) && !defined(COM_STMT_EXECUTE) && \
58
defined(COM_PREPARE) && !defined(COM_STMT_PREPARE) && \
59
defined(COM_CLOSE_STMT) && !defined(COM_STMT_CLOSE) && \
60
defined(COM_LONG_DATA) && !defined(COM_STMT_SEND_LONG_DATA) && \
61
defined(COM_RESET_STMT) && !defined(COM_STMT_RESET)
62
#define COM_STMT_EXECUTE COM_EXECUTE
63
#define COM_STMT_PREPARE COM_PREPARE
64
#define COM_STMT_CLOSE COM_CLOSE_STMT
65
#define COM_STMT_SEND_LONG_DATA COM_LONG_DATA
66
#define COM_STMT_RESET COM_RESET_STMT
71
* use closesocket() to close sockets to be compatible with win32
73
#define closesocket(x) close(x)
77
extern volatile int agent_shutdown;
79
extern volatile sig_atomic_t agent_shutdown;
82
#define C(x) x, sizeof(x) - 1
84
gboolean g_hash_table_true(gpointer UNUSED_PARAM(key), gpointer UNUSED_PARAM(value), gpointer UNUSED_PARAM(u)) {
88
void g_list_string_free(gpointer data, gpointer UNUSED_PARAM(user_data)) {
89
g_string_free(data, TRUE);
92
network_mysqld_index_status *network_mysqld_index_status_init() {
93
network_mysqld_index_status *s;
95
s = g_new0(network_mysqld_index_status, 1);
100
void network_mysqld_index_status_free(network_mysqld_index_status *s) {
106
retval_t plugin_call_cleanup(network_mysqld *srv, network_mysqld_con *con) {
107
NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL;
109
func = con->plugins.con_cleanup;
111
if (!func) return RET_SUCCESS;
113
return (*func)(srv, con);
116
network_queue *network_queue_init() {
117
network_queue *queue;
119
queue = g_new0(network_queue, 1);
121
queue->chunks = g_queue_new();
126
void network_queue_free(network_queue *queue) {
131
while ((packet = g_queue_pop_head(queue->chunks))) g_string_free(packet, TRUE);
133
g_queue_free(queue->chunks);
138
network_socket *network_socket_init() {
141
s = g_new0(network_socket, 1);
143
s->send_queue = network_queue_init();
144
s->recv_queue = network_queue_init();
145
s->recv_raw_queue = network_queue_init();
147
s->mysqld_version = 50112;
149
s->packet_len = PACKET_LEN_UNSET;
154
void network_socket_free(network_socket *s) {
157
network_queue_free(s->send_queue);
158
network_queue_free(s->recv_queue);
159
network_queue_free(s->recv_raw_queue);
175
network_mysqld_con *network_mysqld_con_init(network_mysqld *srv) {
176
network_mysqld_con *con;
178
con = g_new0(network_mysqld_con, 1);
180
con->default_db = g_string_new(NULL);
183
g_ptr_array_add(srv->cons, con);
188
void network_mysqld_con_free(network_mysqld_con *con) {
191
g_string_free(con->default_db, TRUE);
194
g_free(con->filename);
197
if (con->server) network_socket_free(con->server);
198
if (con->client) network_socket_free(con->client);
200
/* we are still in the conns-array */
202
g_ptr_array_remove_fast(con->srv->cons, con);
210
* the free functions used by g_hash_table_new_full()
212
static void network_mysqld_index_status_free_void(void *s) {
213
network_mysqld_index_status_free(s);
216
static void network_mysqld_tables_free_void(void *s) {
217
network_mysqld_table_free(s);
220
network_mysqld *network_mysqld_init() {
223
m = g_new0(network_mysqld, 1);
225
m->event_base = event_init();
227
m->index_usage = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, network_mysqld_index_status_free_void);
228
m->tables = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, network_mysqld_tables_free_void);
230
m->cons = g_ptr_array_new();
235
void network_mysqld_free(network_mysqld *m) {
240
for (i = 0; i < m->cons->len; i++) {
241
network_mysqld_con *con = m->cons->pdata[i];
243
plugin_call_cleanup(m, con);
244
network_mysqld_con_free(con);
247
g_ptr_array_free(m->cons, TRUE);
249
g_hash_table_destroy(m->tables);
250
g_hash_table_destroy(m->index_usage);
252
if (m->config.proxy.backend_addresses) {
253
for (i = 0; m->config.proxy.backend_addresses[i]; i++) {
254
g_free(m->config.proxy.backend_addresses[i]);
256
g_free(m->config.proxy.backend_addresses);
259
if (m->config.proxy.address) g_free(m->config.proxy.address);
260
if (m->config.admin.address) g_free(m->config.admin.address);
262
/* only recent versions have this call */
263
event_base_free(m->event_base);
272
* connect to the proxy backend */
273
int network_mysqld_con_set_address(network_address *addr, gchar *address) {
277
/* split the address:port */
278
if (NULL != (s = strchr(address, ':'))) {
279
port = strtoul(s + 1, NULL, 10);
282
g_critical("<ip>:<port>, port is invalid or 0, has to be > 0, got '%s'", address);
286
g_critical("<ip>:<port>, port is too large, has to be < 65536, got '%s'", address);
291
memset(&addr->addr.ipv4, 0, sizeof(struct sockaddr_in));
294
0 == strcmp("0.0.0.0", address)) {
296
addr->addr.ipv4.sin_addr.s_addr = htonl(INADDR_ANY);
301
he = gethostbyname(address);
305
g_error("resolving proxy-address '%s' failed: ", address);
308
g_assert(he->h_addrtype == AF_INET);
309
g_assert(he->h_length == sizeof(struct in_addr));
311
memcpy(&(addr->addr.ipv4.sin_addr.s_addr), he->h_addr_list[0], he->h_length);
314
addr->addr.ipv4.sin_family = AF_INET;
315
addr->addr.ipv4.sin_port = htons(port);
316
addr->len = sizeof(struct sockaddr_in);
318
addr->str = g_strdup(address);
320
} else if (address[0] == '/') {
321
if (strlen(address) >= sizeof(addr->addr.un.sun_path) - 1) {
322
g_critical("unix-path is too long: %s", address);
326
addr->addr.un.sun_family = AF_UNIX;
327
strcpy(addr->addr.un.sun_path, address);
328
addr->len = sizeof(struct sockaddr_un);
329
addr->str = g_strdup(address);
332
/* might be a unix socket */
333
g_critical("address has to contain a <ip>:<port>, got '%s'", address);
341
* connect to the address defined in con->addr
343
* @see network_mysqld_set_address
345
int network_mysqld_con_connect(network_mysqld *UNUSED_PARAM(srv), network_socket * con) {
348
g_assert(con->addr.len);
351
* con->addr.addr.ipv4.sin_family is always mapped to the same field
352
* even if it is not a IPv4 address as we use a union
354
if (-1 == (con->fd = socket(con->addr.addr.ipv4.sin_family, SOCK_STREAM, 0))) {
355
g_critical("%s.%d: socket(%s) failed: %s",
357
con->addr.str, strerror(errno));
361
setsockopt(con->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val) );
363
if (-1 == connect(con->fd, (struct sockaddr *) &(con->addr.addr), con->addr.len)) {
364
g_critical("%s.%d: connect(%s) failed: %s",
374
int network_mysqld_con_bind(network_mysqld *UNUSED_PARAM(srv), network_socket * con) {
377
g_assert(con->addr.len);
379
if (-1 == (con->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP))) {
380
g_critical("socket() failed");
384
setsockopt(con->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
385
setsockopt(con->fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
387
if (-1 == bind(con->fd, (struct sockaddr *) &(con->addr.addr), con->addr.len)) {
388
g_critical("%s.%d: bind(%s) failed: %s",
395
if (-1 == listen(con->fd, 8)) {
396
g_critical("%s.%d: listen() failed: %s",
406
static void dump_str(const char *msg, const unsigned char *s, size_t len) {
410
hex = g_string_new(NULL);
412
for (i = 0; i < len; i++) {
413
g_string_append_printf(hex, "%02x", s[i]);
415
if ((i + 1) % 16 == 0) {
416
g_string_append(hex, "\n");
418
g_string_append_c(hex, ' ');
423
g_message("(%s): %s", msg, hex->str);
425
g_string_free(hex, TRUE);
429
int network_mysqld_packet_set_header(unsigned char *header, size_t len, unsigned char id) {
430
g_assert(len <= PACKET_LEN_MAX);
432
header[0] = (len >> 0) & 0xFF;
433
header[1] = (len >> 8) & 0xFF;
434
header[2] = (len >> 16) & 0xFF;
440
size_t network_mysqld_packet_get_header(unsigned char *header) {
441
return header[0] | header[1] << 8 | header[2] << 16;
444
int network_queue_append(network_queue *queue, const char *data, size_t len, int packet_id) {
445
unsigned char header[4];
448
network_mysqld_packet_set_header(header, len, packet_id);
450
s = g_string_sized_new(len + 4);
452
g_string_append_len(s, (gchar *)header, 4);
453
g_string_append_len(s, data, len);
455
g_queue_push_tail(queue->chunks, s);
460
int network_queue_append_chunk(network_queue *queue, GString *chunk) {
461
g_queue_push_tail(queue->chunks, chunk);
466
int network_mysqld_con_send_ok(network_socket *con) {
467
const unsigned char packet_ok[] =
468
"\x00" /* field-count */
469
"\x00" /* affected rows */
470
"\x00" /* insert-id */
471
"\x02\x00" /* server-status */
472
"\x00\x00" /* warnings */
475
network_queue_append(con->send_queue, (gchar *)packet_ok, (sizeof(packet_ok) - 1), con->packet_id);
480
int network_mysqld_con_send_error(network_socket *con, const char *errmsg, gsize errmsg_len) {
483
packet = g_string_sized_new(10 + errmsg_len);
485
g_string_append_len(packet,
486
C("\xff" /* type: error */
487
"\x00\x00" /* errno */
489
"00S00" /* SQLSTATE */
492
if (errmsg_len < 250) {
493
g_string_append_c(packet, (guchar)errmsg_len);
494
g_string_append_len(packet, errmsg, errmsg_len);
496
g_string_append_c(packet, 0);
499
network_queue_append(con->send_queue, packet->str, packet->len, con->packet_id);
501
g_string_free(packet, TRUE);
507
retval_t network_mysqld_read_raw(network_mysqld *UNUSED_PARAM(srv), network_socket *con, char *dest, size_t we_want) {
510
network_queue *queue = con->recv_raw_queue;
514
* 1. we read all we can get into a local buffer,
515
* 2. we split it into header + data
522
s = g_string_sized_new(con->to_read + 1);
524
if (-1 == (len = read(con->fd, s->str, s->allocated_len - 1))) {
525
g_string_free(s, TRUE);
528
return RET_WAIT_FOR_EVENT;
532
} else if (len == 0) {
533
g_string_free(s, TRUE);
538
s->str[s->len] = '\0';
539
if (len > con->to_read) {
540
/* between ioctl() and read() might be a cap and we might read more than we expected */
546
network_queue_append_chunk(queue, s);
550
/* check if we have enough data */
551
for (chunk = queue->chunks->head, we_have = 0; chunk; chunk = chunk->next) {
552
GString *s = chunk->data;
554
if (chunk == queue->chunks->head) {
555
g_assert(queue->offset < s->len);
557
we_have += (s->len - queue->offset);
562
if (we_have >= we_want) break;
565
if (we_have < we_want) {
566
/* we don't have enough */
568
return RET_WAIT_FOR_EVENT;
571
for (chunk = queue->chunks->head, we_have = 0; chunk && we_want; ) {
572
GString *s = chunk->data;
574
size_t chunk_has = s->len - queue->offset;
575
size_t to_read = we_want > chunk_has ? chunk_has : we_want;
577
memcpy(dest + we_have, s->str + queue->offset, to_read);
581
queue->offset += to_read;
583
if (queue->offset == s->len) {
584
/* this chunk is empty now */
585
g_string_free(s, TRUE);
587
g_queue_delete_link(queue->chunks, chunk);
590
chunk = queue->chunks->head;
598
retval_t network_mysqld_read(network_mysqld *srv, network_socket *con) {
599
GString *packet = NULL;
601
if (con->packet_len == PACKET_LEN_UNSET) {
602
switch (network_mysqld_read_raw(srv, con, (gchar *)con->header, NET_HEADER_SIZE)) {
603
case RET_WAIT_FOR_EVENT:
604
return RET_WAIT_FOR_EVENT;
609
case RET_ERROR_RETRY:
610
g_error("RET_ERROR_RETRY wasn't expected");
614
con->packet_len = network_mysqld_packet_get_header(con->header);
615
con->packet_id = con->header[3]; /* packet-id if the next packet */
617
packet = g_string_sized_new(con->packet_len + NET_HEADER_SIZE);
618
g_string_append_len(packet, (gchar *)con->header, NET_HEADER_SIZE); /* copy the header */
620
network_queue_append_chunk(con->recv_queue, packet);
622
packet = con->recv_queue->chunks->tail->data;
625
g_assert(packet->allocated_len >= con->packet_len + NET_HEADER_SIZE);
627
switch (network_mysqld_read_raw(srv, con, packet->str + NET_HEADER_SIZE, con->packet_len)) {
628
case RET_WAIT_FOR_EVENT:
629
return RET_WAIT_FOR_EVENT;
634
case RET_ERROR_RETRY:
635
g_error("RET_ERROR_RETRY wasn't expected");
640
packet->len += con->packet_len;
645
retval_t network_mysqld_write_len(network_mysqld *UNUSED_PARAM(srv), network_socket *con, int send_chunks) {
646
/* send the whole queue */
649
if (send_chunks == 0) return RET_SUCCESS;
651
for (chunk = con->send_queue->chunks->head; chunk; ) {
652
GString *s = chunk->data;
655
g_assert(con->send_queue->offset < s->len);
657
if (-1 == (len = write(con->fd, s->str + con->send_queue->offset, s->len - con->send_queue->offset))) {
660
return RET_WAIT_FOR_EVENT;
664
} else if (len == 0) {
668
con->send_queue->offset += len;
670
if (con->send_queue->offset == s->len) {
671
g_string_free(s, TRUE);
673
g_queue_delete_link(con->send_queue->chunks, chunk);
674
con->send_queue->offset = 0;
676
if (send_chunks > 0 && --send_chunks == 0) break;
678
chunk = con->send_queue->chunks->head;
680
return RET_WAIT_FOR_EVENT;
687
retval_t network_mysqld_write(network_mysqld *srv, network_socket *con) {
693
setsockopt(con->fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
695
ret = network_mysqld_write_len(srv, con, -1);
698
setsockopt(con->fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
704
int g_string_lenenc_append_len(GString *dest, const char *s, size_t len) {
705
g_string_append_c(dest, len);
706
if (len) g_string_append_len(dest, s, len);
711
int g_string_lenenc_append(GString *dest, const char *s) {
712
return g_string_lenenc_append_len(dest, s, s ? strlen(s) : 0);
716
* call the hooks of the plugins for each state
718
* if the plugin doesn't implement a hook, we provide a default operation
720
retval_t plugin_call(network_mysqld *srv, network_mysqld_con *con, int state) {
721
NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL;
725
func = con->plugins.con_init;
727
if (!func) { /* default implementation */
728
con->state = CON_STATE_CONNECT_SERVER;
731
case CON_STATE_CONNECT_SERVER:
732
func = con->plugins.con_connect_server;
734
if (!func) { /* default implementation */
735
con->state = CON_STATE_READ_HANDSHAKE;
740
case CON_STATE_SEND_HANDSHAKE:
741
func = con->plugins.con_send_handshake;
743
if (!func) { /* default implementation */
744
con->state = CON_STATE_READ_AUTH;
748
case CON_STATE_READ_HANDSHAKE:
749
func = con->plugins.con_read_handshake;
752
case CON_STATE_READ_AUTH:
753
func = con->plugins.con_read_auth;
756
case CON_STATE_SEND_AUTH:
757
func = con->plugins.con_send_auth;
759
if (!func) { /* default implementation */
760
con->state = CON_STATE_READ_AUTH_RESULT;
763
case CON_STATE_READ_AUTH_RESULT:
764
func = con->plugins.con_read_auth_result;
766
case CON_STATE_SEND_AUTH_RESULT:
767
func = con->plugins.con_send_auth_result;
769
if (!func) { /* default implementation */
770
switch (con->parse.state.auth_result.state) {
771
case MYSQLD_PACKET_OK:
772
con->state = CON_STATE_READ_QUERY;
774
case MYSQLD_PACKET_ERR:
775
con->state = CON_STATE_ERROR;
777
case MYSQLD_PACKET_EOF:
779
* the MySQL 4.0 hash in a MySQL 4.1+ connection
781
con->state = CON_STATE_READ_AUTH_OLD_PASSWORD;
784
g_error("%s.%d: unexpected state for SEND_AUTH_RESULT: %02x",
786
con->parse.state.auth_result.state);
790
case CON_STATE_READ_AUTH_OLD_PASSWORD: {
791
/** move the packet to the send queue */
794
network_socket *recv_sock, *send_sock;
796
recv_sock = con->client;
797
send_sock = con->server;
799
chunk = recv_sock->recv_queue->chunks->head;
800
packet = chunk->data;
802
/* we aren't finished yet */
803
if (packet->len != recv_sock->packet_len + NET_HEADER_SIZE) return RET_SUCCESS;
805
network_queue_append_chunk(send_sock->send_queue, packet);
807
recv_sock->packet_len = PACKET_LEN_UNSET;
808
g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);
811
* send it out to the client
813
con->state = CON_STATE_SEND_AUTH_OLD_PASSWORD;
815
case CON_STATE_SEND_AUTH_OLD_PASSWORD:
817
* data is at the server, read the response next
819
con->state = CON_STATE_READ_AUTH_RESULT;
821
case CON_STATE_READ_QUERY:
822
func = con->plugins.con_read_query;
824
case CON_STATE_READ_QUERY_RESULT:
825
func = con->plugins.con_read_query_result;
827
case CON_STATE_SEND_QUERY_RESULT:
828
func = con->plugins.con_send_query_result;
830
if (!func) { /* default implementation */
831
con->state = CON_STATE_READ_QUERY;
835
g_error("%s.%d: unhandled state: %d",
839
if (!func) return RET_SUCCESS;
841
return (*func)(srv, con);
845
* handle the different states of the MySQL protocol
847
void network_mysqld_con_handle(int event_fd, short events, void *user_data) {
849
network_mysqld_con *con = user_data;
850
network_mysqld *srv = con->srv;
855
if (events == EV_READ) {
858
if (ioctl(event_fd, FIONREAD, &b)) {
859
g_critical("ioctl(%d, FIONREAD, ...) failed: %s", event_fd, strerror(errno));
861
con->state = CON_STATE_ERROR;
863
if (con->client && event_fd == con->client->fd) {
864
con->client->to_read = b;
865
} else if (con->server && event_fd == con->server->fd) {
866
con->server->to_read = b;
868
g_error("%s.%d: neither nor", __FILE__, __LINE__);
871
con->state = CON_STATE_ERROR;
875
#define WAIT_FOR_EVENT(ev_struct, ev_type, timeout) \
876
event_set(&(ev_struct->event), ev_struct->fd, ev_type, network_mysqld_con_handle, user_data); \
877
event_base_set(srv->event_base, &(ev_struct->event));\
878
event_add(&(ev_struct->event), timeout);
882
switch (con->state) {
883
case CON_STATE_ERROR:
884
/* we can't go on, close the connection */
885
plugin_call_cleanup(srv, con);
886
network_mysqld_con_free(con);
892
/* if we are a proxy ask the remote server for the hand-shake packet
893
* if not, we generate one */
895
switch (plugin_call(srv, con, con->state)) {
900
* no luck, let's close the connection
902
g_critical("%s.%d: plugin_call(CON_STATE_INIT) != RET_SUCCESS", __FILE__, __LINE__);
904
con->state = CON_STATE_ERROR;
910
case CON_STATE_CONNECT_SERVER:
911
switch (plugin_call(srv, con, con->state)) {
914
case RET_ERROR_RETRY:
915
/* hack to force a retry */
916
ostate = CON_STATE_INIT;
920
g_error("%s.%d: plugin_call(CON_STATE_CONNECT_SERVER) returned an error", __FILE__, __LINE__);
923
g_error("%s.%d: ...", __FILE__, __LINE__);
927
g_assert(con->server);
930
case CON_STATE_READ_HANDSHAKE: {
932
* read auth data from the remote mysql-server
934
network_socket *recv_sock;
936
recv_sock = con->server;
937
g_assert(events == 0 || event_fd == recv_sock->fd);
939
switch (network_mysqld_read(srv, recv_sock)) {
942
case RET_WAIT_FOR_EVENT:
943
/* call us again when you have a event */
944
WAIT_FOR_EVENT(con->server, EV_READ, NULL);
947
case RET_ERROR_RETRY:
949
g_error("%s.%d: plugin_call(CON_STATE_CONNECT_SERVER) returned an error", __FILE__, __LINE__);
953
switch (plugin_call(srv, con, con->state)) {
957
g_error("%s.%d: ...", __FILE__, __LINE__);
962
case CON_STATE_SEND_HANDSHAKE:
963
/* send the hand-shake to the client and wait for a response */
965
switch (network_mysqld_write(srv, con->client)) {
968
case RET_WAIT_FOR_EVENT:
969
WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);
972
case RET_ERROR_RETRY:
974
g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_HANDSHAKE) returned an error", __FILE__, __LINE__);
978
switch (plugin_call(srv, con, con->state)) {
982
g_error("%s.%d: plugin_call(CON_STATE_SEND_HANDSHAKE) != RET_SUCCESS", __FILE__, __LINE__);
987
case CON_STATE_READ_AUTH: {
988
/* read auth from client */
989
network_socket *recv_sock;
991
recv_sock = con->client;
993
g_assert(events == 0 || event_fd == recv_sock->fd);
995
switch (network_mysqld_read(srv, recv_sock)) {
998
case RET_WAIT_FOR_EVENT:
999
WAIT_FOR_EVENT(con->client, EV_READ, NULL);
1002
case RET_ERROR_RETRY:
1004
g_error("%s.%d: network_mysqld_read(CON_STATE_READ_AUTH) returned an error", __FILE__, __LINE__);
1008
switch (plugin_call(srv, con, con->state)) {
1012
g_error("%s.%d: plugin_call(CON_STATE_READ_AUTH) != RET_SUCCESS", __FILE__, __LINE__);
1017
case CON_STATE_SEND_AUTH:
1018
/* send the auth-response to the server */
1019
switch (network_mysqld_write(srv, con->server)) {
1022
case RET_WAIT_FOR_EVENT:
1023
WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);
1026
case RET_ERROR_RETRY:
1028
/* might be a connection close, we should just close the connection and be happy */
1029
g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_AUTH) returned an error", __FILE__, __LINE__);
1033
switch (plugin_call(srv, con, con->state)) {
1037
g_error("%s.%d: plugin_call(CON_STATE_SEND_AUTH) != RET_SUCCESS", __FILE__, __LINE__);
1042
case CON_STATE_READ_AUTH_RESULT: {
1043
/* read the auth result from the server */
1044
network_socket *recv_sock;
1048
recv_sock = con->server;
1050
g_assert(events == 0 || event_fd == recv_sock->fd);
1052
switch (network_mysqld_read(srv, recv_sock)) {
1055
case RET_WAIT_FOR_EVENT:
1056
WAIT_FOR_EVENT(con->server, EV_READ, NULL);
1058
case RET_ERROR_RETRY:
1060
g_error("%s.%d: network_mysqld_read(CON_STATE_READ_AUTH_RESULT) returned an error", __FILE__, __LINE__);
1065
* depending on the result-set we have different exit-points
1066
* - OK -> READ_QUERY
1067
* - EOF -> (read old password hash)
1070
chunk = recv_sock->recv_queue->chunks->head;
1071
packet = chunk->data;
1073
g_assert(packet->len > NET_HEADER_SIZE);
1074
con->parse.state.auth_result.state = packet->str[NET_HEADER_SIZE];
1076
switch (plugin_call(srv, con, con->state)) {
1080
g_critical("%s.%d: plugin_call(CON_STATE_READ_AUTH_RESULT) != RET_SUCCESS", __FILE__, __LINE__);
1082
con->state = CON_STATE_ERROR;
1087
case CON_STATE_SEND_AUTH_RESULT: {
1088
/* send the hand-shake to the client and wait for a response */
1090
switch (network_mysqld_write(srv, con->client)) {
1093
case RET_WAIT_FOR_EVENT:
1094
WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);
1096
case RET_ERROR_RETRY:
1098
g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_AUTH_RESULT) returned an error", __FILE__, __LINE__);
1102
switch (plugin_call(srv, con, con->state)) {
1106
g_error("%s.%d: ...", __FILE__, __LINE__);
1111
case CON_STATE_READ_AUTH_OLD_PASSWORD:
1112
/* read auth from client */
1114
switch (network_mysqld_read(srv, con->client)) {
1117
case RET_WAIT_FOR_EVENT:
1118
WAIT_FOR_EVENT(con->client, EV_READ, NULL);
1121
case RET_ERROR_RETRY:
1123
g_error("%s.%d: network_mysqld_read(CON_STATE_READ_AUTH_OLD_PASSWORD) returned an error", __FILE__, __LINE__);
1127
switch (plugin_call(srv, con, con->state)) {
1131
g_error("%s.%d: plugin_call(CON_STATE_READ_AUTH_OLD_PASSWORD) != RET_SUCCESS", __FILE__, __LINE__);
1136
case CON_STATE_SEND_AUTH_OLD_PASSWORD:
1137
/* send the auth-response to the server */
1138
switch (network_mysqld_write(srv, con->server)) {
1141
case RET_WAIT_FOR_EVENT:
1142
WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);
1145
case RET_ERROR_RETRY:
1147
/* might be a connection close, we should just close the connection and be happy */
1148
g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_AUTH_OLD_PASSWORD) returned an error", __FILE__, __LINE__);
1152
switch (plugin_call(srv, con, con->state)) {
1156
g_error("%s.%d: plugin_call(CON_STATE_SEND_AUTH_OLD_PASSWORD) != RET_SUCCESS", __FILE__, __LINE__);
1162
case CON_STATE_READ_QUERY: {
1163
network_socket *recv_sock;
1165
recv_sock = con->client;
1167
g_assert(events == 0 || event_fd == recv_sock->fd);
1169
switch (network_mysqld_read(srv, recv_sock)) {
1172
case RET_WAIT_FOR_EVENT:
1173
WAIT_FOR_EVENT(con->client, EV_READ, NULL);
1175
case RET_ERROR_RETRY:
1177
g_error("%s.%d: network_mysqld_read(CON_STATE_READ_QUERY) returned an error", __FILE__, __LINE__);
1181
switch (plugin_call(srv, con, con->state)) {
1185
g_error("%s.%d: ...", __FILE__, __LINE__);
1190
case CON_STATE_SEND_QUERY:
1191
/* send the query to the server */
1193
if (con->server->send_queue->offset == 0) {
1194
/* only parse the packets once */
1198
chunk = con->server->send_queue->chunks->head;
1201
/* only parse once and don't care about the blocking read */
1202
if (con->parse.command == COM_QUERY &&
1203
con->parse.state.query == PARSE_COM_QUERY_LOAD_DATA) {
1204
/* is this a LOAD DATA INFILE ... extra round ? */
1205
/* this isn't a command packet, but a LOAD DATA INFILE data-packet */
1206
if (s->str[0] == 0 && s->str[1] == 0 && s->str[2] == 0) {
1207
con->parse.state.query = PARSE_COM_QUERY_LOAD_DATA_END_DATA;
1209
} else if (con->is_overlong_packet) {
1210
/* the last packet was a over-long packet
1211
* this is the same command, just more data */
1213
if (con->parse.len != PACKET_LEN_MAX) {
1214
con->is_overlong_packet = 0;
1218
con->parse.command = s->str[4];
1220
if (con->parse.len == PACKET_LEN_MAX) {
1221
con->is_overlong_packet = 1;
1224
/* init the parser for the commands */
1225
switch (con->parse.command) {
1227
case COM_STMT_EXECUTE:
1228
con->parse.state.query = PARSE_COM_QUERY_INIT;
1230
case COM_STMT_PREPARE:
1231
con->parse.state.prepare.first_packet = 1;
1239
switch (network_mysqld_write_len(srv, con->server, 1)) {
1242
case RET_WAIT_FOR_EVENT:
1243
WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);
1245
case RET_ERROR_RETRY:
1247
g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_QUERY) returned an error", __FILE__, __LINE__);
1251
if (con->is_overlong_packet) {
1252
con->state = CON_STATE_READ_QUERY;
1256
/* some statements don't have a server response */
1257
switch (con->parse.command) {
1258
case COM_STMT_SEND_LONG_DATA: /* not acked */
1259
case COM_STMT_CLOSE:
1260
con->state = CON_STATE_READ_QUERY;
1263
if (con->parse.state.query == PARSE_COM_QUERY_LOAD_DATA) {
1264
con->state = CON_STATE_READ_QUERY;
1266
con->state = CON_STATE_READ_QUERY_RESULT;
1270
con->state = CON_STATE_READ_QUERY_RESULT;
1275
case CON_STATE_READ_QUERY_RESULT:
1277
network_socket *recv_sock;
1279
recv_sock = con->server;
1281
g_assert(events == 0 || event_fd == recv_sock->fd);
1283
switch (network_mysqld_read(srv, recv_sock)) {
1286
case RET_WAIT_FOR_EVENT:
1287
WAIT_FOR_EVENT(con->server, EV_READ, NULL);
1289
case RET_ERROR_RETRY:
1291
g_error("%s.%d: network_mysqld_read(CON_STATE_READ_QUERY_RESULT) returned an error", __FILE__, __LINE__);
1295
switch (plugin_call(srv, con, con->state)) {
1299
g_error("%s.%d: ...", __FILE__, __LINE__);
1303
} while (con->state == CON_STATE_READ_QUERY_RESULT);
1306
case CON_STATE_SEND_QUERY_RESULT:
1308
* send the query result-set to the client */
1310
switch (network_mysqld_write(srv, con->client)) {
1313
case RET_WAIT_FOR_EVENT:
1314
WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);
1316
case RET_ERROR_RETRY:
1318
g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_QUERY_RESULT) returned an error", __FILE__, __LINE__);
1322
switch (plugin_call(srv, con, con->state)) {
1326
g_error("%s.%d: ...", __FILE__, __LINE__);
1335
} while (ostate != con->state);
1341
* we will be called by the event handler
1345
void network_mysqld_con_accept(int event_fd, short events, void *user_data) {
1346
network_mysqld_con *con = user_data;
1347
network_mysqld_con *client_con;
1349
struct sockaddr_in ipv4;
1353
g_assert(events == EV_READ);
1354
g_assert(con->server);
1356
addr_len = sizeof(struct sockaddr_in);
1358
if (-1 == (fd = accept(event_fd, (struct sockaddr *)&ipv4, &addr_len))) {
1362
fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
1364
/* looks like we open a client connection */
1365
client_con = network_mysqld_con_init(con->srv);
1366
client_con->client = network_socket_init();
1367
client_con->client->addr.addr.ipv4 = ipv4;
1368
client_con->client->addr.len = addr_len;
1369
client_con->client->fd = fd;
1371
/* copy the config */
1372
client_con->config = con->config;
1373
client_con->config.network_type = con->config.network_type;
1375
switch (con->config.network_type) {
1376
case NETWORK_TYPE_SERVER:
1377
network_mysqld_server_connection_init(NULL, client_con);
1379
case NETWORK_TYPE_PROXY:
1380
network_mysqld_proxy_connection_init(NULL, client_con);
1383
g_error("%s.%d", __FILE__, __LINE__);
1387
network_mysqld_con_handle(-1, 0, client_con);
1393
void handle_timeout() {
1394
if (!agent_shutdown) return;
1396
/* we have to shutdown, disable all events to leave the dispatch */
1399
void *network_mysqld_thread(void *_srv) {
1400
network_mysqld *srv = _srv;
1402
/* create the connection array */
1405
/* setup the different handlers */
1407
if (srv->config.admin.address) {
1408
network_mysqld_con *con = NULL;
1410
con = network_mysqld_con_init(srv);
1411
con->config = srv->config;
1412
con->config.network_type = NETWORK_TYPE_SERVER;
1414
con->server = network_socket_init();
1416
if (0 != network_mysqld_server_init(srv, con->server)) {
1417
g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__);
1422
/* keep the listen socket alive */
1423
event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
1424
event_base_set(srv->event_base, &(con->server->event));
1425
event_add(&(con->server->event), NULL);
1428
if (srv->config.proxy.address) {
1429
network_mysqld_con *con = NULL;
1431
con = network_mysqld_con_init(srv);
1432
con->config = srv->config;
1433
con->config.network_type = NETWORK_TYPE_PROXY;
1435
con->server = network_socket_init();
1437
if (0 != network_mysqld_proxy_init(srv, con->server)) {
1438
g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__);
1442
/* keep the listen socket alive */
1443
event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
1444
event_base_set(srv->event_base, &(con->server->event));
1445
event_add(&(con->server->event), NULL);
1448
while (!agent_shutdown) {
1449
struct timeval timeout;
1453
timeout.tv_usec = 0;
1455
g_assert(event_base_loopexit(srv->event_base, &timeout) == 0);
1457
r = event_base_dispatch(srv->event_base);
1460
if (errno == EINTR) continue;
1469
int network_mysqld_con_send_resultset(network_socket *con, GPtrArray *fields, GPtrArray *rows) {
1473
g_assert(fields->len > 0 && fields->len < 251);
1475
s = g_string_new(NULL);
1485
* \21@@version_comment
1489
* \34\0\0\0 - length
1497
* \34MySQL Community Server (GPL)
1502
g_string_append_c(s, fields->len); /* the field-count */
1503
network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
1505
for (i = 0; i < fields->len; i++) {
1506
MYSQL_FIELD *field = fields->pdata[i];
1508
g_string_truncate(s, 0);
1510
g_string_lenenc_append(s, field->catalog ? field->catalog : "def"); /* catalog */
1511
g_string_lenenc_append(s, field->db); /* database */
1512
g_string_lenenc_append(s, field->table); /* table */
1513
g_string_lenenc_append(s, field->org_table); /* org_table */
1514
g_string_lenenc_append(s, field->name); /* name */
1515
g_string_lenenc_append(s, field->org_name); /* org_name */
1517
g_string_append_c(s, '\x0c'); /* length of the following block, 12 byte */
1518
g_string_append_len(s, "\x08\x00", 2); /* charset */
1519
g_string_append_c(s, (field->length >> 0) & 0xff); /* len */
1520
g_string_append_c(s, (field->length >> 8) & 0xff); /* len */
1521
g_string_append_c(s, (field->length >> 16) & 0xff); /* len */
1522
g_string_append_c(s, (field->length >> 24) & 0xff); /* len */
1523
g_string_append_c(s, field->type); /* type */
1524
g_string_append_c(s, field->flags & 0xff); /* flags */
1525
g_string_append_c(s, (field->flags >> 8) & 0xff); /* flags */
1526
g_string_append_c(s, 0); /* decimals */
1527
g_string_append_len(s, "\x00\x00", 2); /* filler */
1529
/* this is in the docs, but not on the network */
1530
g_string_lenenc_append(s, field->def); /* default-value */
1532
network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
1535
g_string_truncate(s, 0);
1538
g_string_append_len(s, "\xfe", 1); /* EOF */
1539
g_string_append_len(s, "\x00\x00", 2); /* warning count */
1540
g_string_append_len(s, "\x02\x00", 2); /* flags */
1542
network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
1544
for (i = 0; i < rows->len; i++) {
1545
GPtrArray *row = rows->pdata[i];
1547
g_string_truncate(s, 0);
1549
for (j = 0; j < row->len; j++) {
1550
g_string_lenenc_append(s, row->pdata[j]);
1552
network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
1555
g_string_truncate(s, 0);
1558
g_string_append_len(s, "\xfe", 1); /* EOF */
1559
g_string_append_len(s, "\x00\x00", 2); /* warning count */
1560
g_string_append_len(s, "\x02\x00", 2); /* flags */
1562
network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
1564
g_string_free(s, TRUE);