~jan-kneschke/mysql-proxy/packet-tracking-assertions

« back to all changes in this revision

Viewing changes to tags/mysql-proxy-0.5.1/src/network-mysqld.c

  • Committer: Kay Roepke
  • Author(s): Jan Kneschke
  • Date: 2008-01-23 22:00:28 UTC
  • Revision ID: kay@mysql.com-20080123220028-hq2xqb69apa75fnx
first round on mysql-shell based on the proxy code

this is mostly a verification if the proxy-code is flexible enough to handle 
all three scenarios of: client, server and forwarding (proxy)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */ 
 
15
 
 
16
#ifdef HAVE_CONFIG_H
 
17
#include "config.h"
 
18
#endif
 
19
 
 
20
#include <sys/types.h>
 
21
 
 
22
#ifdef HAVE_SYS_FILIO_H
 
23
/**
 
24
 * required for FIONREAD on solaris
 
25
 */
 
26
#include <sys/filio.h>
 
27
#endif
 
28
 
 
29
#include <sys/ioctl.h>
 
30
#include <sys/socket.h>
 
31
 
 
32
#include <netinet/in.h>
 
33
#include <netinet/tcp.h>
 
34
 
 
35
#include <netdb.h>
 
36
 
 
37
#include <stdlib.h>
 
38
#include <unistd.h>
 
39
#include <string.h>
 
40
#include <stdio.h>
 
41
#include <fcntl.h>
 
42
#include <errno.h>
 
43
#include <signal.h>
 
44
 
 
45
#include <glib.h>
 
46
 
 
47
#include <mysql.h>
 
48
 
 
49
#include "network-mysqld.h"
 
50
 
 
51
 
 
52
/**
 
53
 * 4.1 uses other defines
 
54
 *
 
55
 * this should be one step to get closer to backward-compatibility
 
56
 */
 
57
#if defined(COM_EXECUTE)    && !defined(COM_STMT_EXECUTE) && \
 
58
    defined(COM_PREPARE)    && !defined(COM_STMT_PREPARE) && \
 
59
    defined(COM_CLOSE_STMT) && !defined(COM_STMT_CLOSE) && \
 
60
    defined(COM_LONG_DATA)  && !defined(COM_STMT_SEND_LONG_DATA) && \
 
61
    defined(COM_RESET_STMT) && !defined(COM_STMT_RESET)
 
62
#define COM_STMT_EXECUTE        COM_EXECUTE
 
63
#define COM_STMT_PREPARE        COM_PREPARE
 
64
#define COM_STMT_CLOSE          COM_CLOSE_STMT
 
65
#define COM_STMT_SEND_LONG_DATA COM_LONG_DATA
 
66
#define COM_STMT_RESET          COM_RESET_STMT
 
67
#endif
 
68
 
 
69
#ifndef _WIN32
 
70
/**
 
71
 * use closesocket() to close sockets to be compatible with win32
 
72
 */
 
73
#define closesocket(x) close(x)
 
74
#endif
 
75
 
 
76
#ifdef _WIN32
 
77
extern volatile int agent_shutdown;
 
78
#else
 
79
extern volatile sig_atomic_t agent_shutdown;
 
80
#endif
 
81
 
 
82
#define C(x) x, sizeof(x) - 1
 
83
 
 
84
gboolean g_hash_table_true(gpointer UNUSED_PARAM(key), gpointer UNUSED_PARAM(value), gpointer UNUSED_PARAM(u)) {
 
85
        return TRUE;
 
86
}       
 
87
 
 
88
void g_list_string_free(gpointer data, gpointer UNUSED_PARAM(user_data)) {
 
89
        g_string_free(data, TRUE);
 
90
}
 
91
 
 
92
network_mysqld_index_status *network_mysqld_index_status_init() {
 
93
        network_mysqld_index_status *s;
 
94
 
 
95
        s = g_new0(network_mysqld_index_status, 1);
 
96
 
 
97
        return s;
 
98
}
 
99
 
 
100
void network_mysqld_index_status_free(network_mysqld_index_status *s) {
 
101
        if (!s) return;
 
102
 
 
103
        g_free(s);
 
104
}
 
105
 
 
106
retval_t plugin_call_cleanup(network_mysqld *srv, network_mysqld_con *con) {
 
107
        NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL;
 
108
 
 
109
        func = con->plugins.con_cleanup;
 
110
        
 
111
        if (!func) return RET_SUCCESS;
 
112
 
 
113
        return (*func)(srv, con);
 
114
}
 
115
 
 
116
network_queue *network_queue_init() {
 
117
        network_queue *queue;
 
118
 
 
119
        queue = g_new0(network_queue, 1);
 
120
 
 
121
        queue->chunks = g_queue_new();
 
122
        
 
123
        return queue;
 
124
}
 
125
 
 
126
void network_queue_free(network_queue *queue) {
 
127
        GString *packet;
 
128
 
 
129
        if (!queue) return;
 
130
 
 
131
        while ((packet = g_queue_pop_head(queue->chunks))) g_string_free(packet, TRUE);
 
132
 
 
133
        g_queue_free(queue->chunks);
 
134
 
 
135
        g_free(queue);
 
136
}
 
137
 
 
138
network_socket *network_socket_init() {
 
139
        network_socket *s;
 
140
        
 
141
        s = g_new0(network_socket, 1);
 
142
 
 
143
        s->send_queue = network_queue_init();
 
144
        s->recv_queue = network_queue_init();
 
145
        s->recv_raw_queue = network_queue_init();
 
146
 
 
147
        s->mysqld_version = 50112;
 
148
 
 
149
        s->packet_len = PACKET_LEN_UNSET;
 
150
        
 
151
        return s;
 
152
}
 
153
 
 
154
void network_socket_free(network_socket *s) {
 
155
        if (!s) return;
 
156
 
 
157
        network_queue_free(s->send_queue);
 
158
        network_queue_free(s->recv_queue);
 
159
        network_queue_free(s->recv_raw_queue);
 
160
#if 0
 
161
        /* */
 
162
        if (s->addr.str) {
 
163
                g_free(s->addr.str);
 
164
        }
 
165
#endif
 
166
 
 
167
        if (s->fd != -1) {
 
168
                closesocket(s->fd);
 
169
        }
 
170
 
 
171
        g_free(s);
 
172
}
 
173
 
 
174
 
 
175
network_mysqld_con *network_mysqld_con_init(network_mysqld *srv) {
 
176
        network_mysqld_con *con;
 
177
 
 
178
        con = g_new0(network_mysqld_con, 1);
 
179
 
 
180
        con->default_db = g_string_new(NULL);
 
181
        con->srv = srv;
 
182
 
 
183
        g_ptr_array_add(srv->cons, con);
 
184
 
 
185
        return con;
 
186
}
 
187
 
 
188
void network_mysqld_con_free(network_mysqld_con *con) {
 
189
        if (!con) return;
 
190
 
 
191
        g_string_free(con->default_db, TRUE);
 
192
 
 
193
        if (con->filename) {
 
194
                g_free(con->filename);
 
195
        }
 
196
 
 
197
        if (con->server) network_socket_free(con->server);
 
198
        if (con->client) network_socket_free(con->client);
 
199
 
 
200
        /* we are still in the conns-array */
 
201
 
 
202
        g_ptr_array_remove_fast(con->srv->cons, con);
 
203
 
 
204
        g_free(con);
 
205
}
 
206
 
 
207
 
 
208
 
 
209
/**
 
210
 * the free functions used by g_hash_table_new_full()
 
211
 */
 
212
static void network_mysqld_index_status_free_void(void *s) {
 
213
        network_mysqld_index_status_free(s);
 
214
}
 
215
 
 
216
static void network_mysqld_tables_free_void(void *s) {
 
217
        network_mysqld_table_free(s);
 
218
}
 
219
 
 
220
network_mysqld *network_mysqld_init() {
 
221
        network_mysqld *m;
 
222
 
 
223
        m = g_new0(network_mysqld, 1);
 
224
 
 
225
        m->event_base  = event_init();
 
226
 
 
227
        m->index_usage = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, network_mysqld_index_status_free_void);
 
228
        m->tables      = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, network_mysqld_tables_free_void);
 
229
        
 
230
        m->cons        = g_ptr_array_new();
 
231
        
 
232
        return m;
 
233
}
 
234
 
 
235
void network_mysqld_free(network_mysqld *m) {
 
236
        guint i;
 
237
 
 
238
        if (!m) return;
 
239
 
 
240
        for (i = 0; i < m->cons->len; i++) {
 
241
                network_mysqld_con *con = m->cons->pdata[i];
 
242
 
 
243
                plugin_call_cleanup(m, con);
 
244
                network_mysqld_con_free(con);
 
245
        }
 
246
 
 
247
        g_ptr_array_free(m->cons, TRUE);
 
248
 
 
249
        g_hash_table_destroy(m->tables);
 
250
        g_hash_table_destroy(m->index_usage);
 
251
 
 
252
        if (m->config.proxy.backend_addresses) {
 
253
                for (i = 0; m->config.proxy.backend_addresses[i]; i++) {
 
254
                        g_free(m->config.proxy.backend_addresses[i]);
 
255
                }
 
256
                g_free(m->config.proxy.backend_addresses);
 
257
        }
 
258
 
 
259
        if (m->config.proxy.address) g_free(m->config.proxy.address);
 
260
        if (m->config.admin.address) g_free(m->config.admin.address);
 
261
#if 0
 
262
        /* only recent versions have this call */
 
263
        event_base_free(m->event_base);
 
264
#endif
 
265
        
 
266
        g_free(m);
 
267
}
 
268
 
 
269
 
 
270
 
 
271
/**
 
272
 * connect to the proxy backend */
 
273
int network_mysqld_con_set_address(network_address *addr, gchar *address) {
 
274
        gchar *s;
 
275
        uint   port;
 
276
 
 
277
        /* split the address:port */
 
278
        if (NULL != (s = strchr(address, ':'))) {
 
279
                port = strtoul(s + 1, NULL, 10);
 
280
 
 
281
                if (port == 0) {
 
282
                        g_critical("<ip>:<port>, port is invalid or 0, has to be > 0, got '%s'", address);
 
283
                        return -1;
 
284
                }
 
285
                if (port > 65535) {
 
286
                        g_critical("<ip>:<port>, port is too large, has to be < 65536, got '%s'", address);
 
287
 
 
288
                        return -1;
 
289
                }
 
290
 
 
291
                memset(&addr->addr.ipv4, 0, sizeof(struct sockaddr_in));
 
292
 
 
293
                if (address == s || 
 
294
                    0 == strcmp("0.0.0.0", address)) {
 
295
                        /* no ip */
 
296
                        addr->addr.ipv4.sin_addr.s_addr = htonl(INADDR_ANY);
 
297
                } else {
 
298
                        struct hostent *he;
 
299
 
 
300
                        *s = '\0';
 
301
                        he = gethostbyname(address);
 
302
                        *s = ':';
 
303
 
 
304
                        if (NULL == he)  {
 
305
                                g_error("resolving proxy-address '%s' failed: ", address);
 
306
                        }
 
307
 
 
308
                        g_assert(he->h_addrtype == AF_INET);
 
309
                        g_assert(he->h_length == sizeof(struct in_addr));
 
310
 
 
311
                        memcpy(&(addr->addr.ipv4.sin_addr.s_addr), he->h_addr_list[0], he->h_length);
 
312
                }
 
313
 
 
314
                addr->addr.ipv4.sin_family = AF_INET;
 
315
                addr->addr.ipv4.sin_port = htons(port);
 
316
                addr->len = sizeof(struct sockaddr_in);
 
317
 
 
318
                addr->str = g_strdup(address);
 
319
#ifdef HAVE_SYS_UN_H
 
320
        } else if (address[0] == '/') {
 
321
                if (strlen(address) >= sizeof(addr->addr.un.sun_path) - 1) {
 
322
                        g_critical("unix-path is too long: %s", address);
 
323
                        return -1;
 
324
                }
 
325
 
 
326
                addr->addr.un.sun_family = AF_UNIX;
 
327
                strcpy(addr->addr.un.sun_path, address);
 
328
                addr->len = sizeof(struct sockaddr_un);
 
329
                addr->str = g_strdup(address);
 
330
#endif
 
331
        } else {
 
332
                /* might be a unix socket */
 
333
                g_critical("address has to contain a <ip>:<port>, got '%s'", address);
 
334
                return -1;
 
335
        }
 
336
 
 
337
        return 0;
 
338
}
 
339
 
 
340
/**
 
341
 * connect to the address defined in con->addr
 
342
 *
 
343
 * @see network_mysqld_set_address 
 
344
 */
 
345
int network_mysqld_con_connect(network_mysqld *UNUSED_PARAM(srv), network_socket * con) {
 
346
        int val = 1;
 
347
 
 
348
        g_assert(con->addr.len);
 
349
 
 
350
        /**
 
351
         * con->addr.addr.ipv4.sin_family is always mapped to the same field 
 
352
         * even if it is not a IPv4 address as we use a union
 
353
         */
 
354
        if (-1 == (con->fd = socket(con->addr.addr.ipv4.sin_family, SOCK_STREAM, 0))) {
 
355
                g_critical("%s.%d: socket(%s) failed: %s", 
 
356
                                __FILE__, __LINE__,
 
357
                                con->addr.str, strerror(errno));
 
358
                return -1;
 
359
        }
 
360
 
 
361
        setsockopt(con->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val) );
 
362
 
 
363
        if (-1 == connect(con->fd, (struct sockaddr *) &(con->addr.addr), con->addr.len)) {
 
364
                g_critical("%s.%d: connect(%s) failed: %s", 
 
365
                                __FILE__, __LINE__,
 
366
                                con->addr.str,
 
367
                                strerror(errno));
 
368
                return -1;
 
369
        }
 
370
 
 
371
        return 0;
 
372
}
 
373
 
 
374
int network_mysqld_con_bind(network_mysqld *UNUSED_PARAM(srv), network_socket * con) {
 
375
        int val = 1;
 
376
 
 
377
        g_assert(con->addr.len);
 
378
 
 
379
        if (-1 == (con->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP))) {
 
380
                g_critical("socket() failed");
 
381
                return -1;
 
382
        }
 
383
 
 
384
        setsockopt(con->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
 
385
        setsockopt(con->fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
 
386
 
 
387
        if (-1 == bind(con->fd, (struct sockaddr *) &(con->addr.addr), con->addr.len)) {
 
388
                g_critical("%s.%d: bind(%s) failed: %s", 
 
389
                                __FILE__, __LINE__,
 
390
                                con->addr.str,
 
391
                                strerror(errno));
 
392
                return -1;
 
393
        }
 
394
 
 
395
        if (-1 == listen(con->fd, 8)) {
 
396
                g_critical("%s.%d: listen() failed: %s",
 
397
                                __FILE__, __LINE__,
 
398
                                strerror(errno));
 
399
                return -1;
 
400
        }
 
401
 
 
402
        return 0;
 
403
}
 
404
 
 
405
 
 
406
static void dump_str(const char *msg, const unsigned char *s, size_t len) {
 
407
        GString *hex;
 
408
        size_t i;
 
409
                
 
410
        hex = g_string_new(NULL);
 
411
 
 
412
        for (i = 0; i < len; i++) {
 
413
                g_string_append_printf(hex, "%02x", s[i]);
 
414
 
 
415
                if ((i + 1) % 16 == 0) {
 
416
                        g_string_append(hex, "\n");
 
417
                } else {
 
418
                        g_string_append_c(hex, ' ');
 
419
                }
 
420
 
 
421
        }
 
422
 
 
423
        g_message("(%s): %s", msg, hex->str);
 
424
 
 
425
        g_string_free(hex, TRUE);
 
426
 
 
427
}
 
428
 
 
429
int network_mysqld_packet_set_header(unsigned char *header, size_t len, unsigned char id) {
 
430
        g_assert(len <= PACKET_LEN_MAX);
 
431
 
 
432
        header[0] = (len >>  0) & 0xFF;
 
433
        header[1] = (len >>  8) & 0xFF;
 
434
        header[2] = (len >> 16) & 0xFF;
 
435
        header[3] = id;
 
436
 
 
437
        return 0;
 
438
}
 
439
 
 
440
size_t network_mysqld_packet_get_header(unsigned char *header) {
 
441
        return header[0] | header[1] << 8 | header[2] << 16;
 
442
}
 
443
 
 
444
int network_queue_append(network_queue *queue, const char *data, size_t len, int packet_id) {
 
445
        unsigned char header[4];
 
446
        GString *s;
 
447
 
 
448
        network_mysqld_packet_set_header(header, len, packet_id);
 
449
 
 
450
        s = g_string_sized_new(len + 4);
 
451
 
 
452
        g_string_append_len(s, (gchar *)header, 4);
 
453
        g_string_append_len(s, data, len);
 
454
 
 
455
        g_queue_push_tail(queue->chunks, s);
 
456
 
 
457
        return 0;
 
458
}
 
459
 
 
460
int network_queue_append_chunk(network_queue *queue, GString *chunk) {
 
461
        g_queue_push_tail(queue->chunks, chunk);
 
462
 
 
463
        return 0;
 
464
}
 
465
 
 
466
int network_mysqld_con_send_ok(network_socket *con) {
 
467
        const unsigned char packet_ok[] = 
 
468
                "\x00" /* field-count */
 
469
                "\x00" /* affected rows */
 
470
                "\x00" /* insert-id */
 
471
                "\x02\x00" /* server-status */
 
472
                "\x00\x00" /* warnings */
 
473
                ;
 
474
 
 
475
        network_queue_append(con->send_queue, (gchar *)packet_ok, (sizeof(packet_ok) - 1), con->packet_id);
 
476
 
 
477
        return 0;
 
478
}
 
479
 
 
480
int network_mysqld_con_send_error(network_socket *con, const char *errmsg, gsize errmsg_len) {
 
481
        GString *packet;
 
482
        
 
483
        packet = g_string_sized_new(10 + errmsg_len);
 
484
        
 
485
        g_string_append_len(packet, 
 
486
                        C("\xff"     /* type: error */
 
487
                          "\x00\x00" /* errno */
 
488
                          "#"        /* insert-id */
 
489
                          "00S00"    /* SQLSTATE */
 
490
                         ));
 
491
 
 
492
        if (errmsg_len < 250) {
 
493
                g_string_append_c(packet, (guchar)errmsg_len);
 
494
                g_string_append_len(packet, errmsg, errmsg_len);
 
495
        } else {
 
496
                g_string_append_c(packet, 0);
 
497
        }
 
498
 
 
499
        network_queue_append(con->send_queue, packet->str, packet->len, con->packet_id);
 
500
 
 
501
        g_string_free(packet, TRUE);
 
502
 
 
503
        return 0;
 
504
}
 
505
 
 
506
 
 
507
retval_t network_mysqld_read_raw(network_mysqld *UNUSED_PARAM(srv), network_socket *con, char *dest, size_t we_want) {
 
508
        GList *chunk;
 
509
        ssize_t len;
 
510
        network_queue *queue = con->recv_raw_queue;
 
511
        size_t we_have;
 
512
 
 
513
        /**
 
514
         * 1. we read all we can get into a local buffer,
 
515
         * 2. we split it into header + data
 
516
         * 
 
517
         * */
 
518
 
 
519
        if (con->to_read) {
 
520
                GString *s;
 
521
 
 
522
                s = g_string_sized_new(con->to_read + 1);
 
523
 
 
524
                if (-1 == (len = read(con->fd, s->str, s->allocated_len - 1))) {
 
525
                        g_string_free(s, TRUE);
 
526
                        switch (errno) {
 
527
                        case EAGAIN:
 
528
                                return RET_WAIT_FOR_EVENT;
 
529
                        default:
 
530
                                return RET_ERROR;
 
531
                        }
 
532
                } else if (len == 0) {
 
533
                        g_string_free(s, TRUE);
 
534
                        return RET_ERROR;
 
535
                }
 
536
 
 
537
                s->len = len;
 
538
                s->str[s->len] = '\0';
 
539
                if (len > con->to_read) {
 
540
                        /* between ioctl() and read() might be a cap and we might read more than we expected */
 
541
                        con->to_read = 0;
 
542
                } else {
 
543
                        con->to_read -= len;
 
544
                }
 
545
 
 
546
                network_queue_append_chunk(queue, s);
 
547
        }
 
548
 
 
549
 
 
550
        /* check if we have enough data */
 
551
        for (chunk = queue->chunks->head, we_have = 0; chunk; chunk = chunk->next) {
 
552
                GString *s = chunk->data;
 
553
 
 
554
                if (chunk == queue->chunks->head) {
 
555
                        g_assert(queue->offset < s->len);
 
556
                
 
557
                        we_have += (s->len - queue->offset);
 
558
                } else {
 
559
                        we_have += s->len;
 
560
                }
 
561
 
 
562
                if (we_have >= we_want) break;
 
563
        }
 
564
 
 
565
        if (we_have < we_want) {
 
566
                /* we don't have enough */
 
567
 
 
568
                return RET_WAIT_FOR_EVENT;
 
569
        }
 
570
 
 
571
        for (chunk = queue->chunks->head, we_have = 0; chunk && we_want; ) {
 
572
                GString *s = chunk->data;
 
573
 
 
574
                size_t chunk_has = s->len - queue->offset;
 
575
                size_t to_read   = we_want > chunk_has ? chunk_has : we_want;
 
576
                                
 
577
                memcpy(dest + we_have, s->str + queue->offset, to_read);
 
578
                we_have += to_read;
 
579
                we_want -= to_read;
 
580
 
 
581
                queue->offset += to_read;
 
582
 
 
583
                if (queue->offset == s->len) {
 
584
                        /* this chunk is empty now */
 
585
                        g_string_free(s, TRUE);
 
586
 
 
587
                        g_queue_delete_link(queue->chunks, chunk);
 
588
                        queue->offset = 0;
 
589
 
 
590
                        chunk = queue->chunks->head;
 
591
                }
 
592
        }
 
593
 
 
594
        return RET_SUCCESS;
 
595
}
 
596
 
 
597
 
 
598
retval_t network_mysqld_read(network_mysqld *srv, network_socket *con) {
 
599
        GString *packet = NULL;
 
600
 
 
601
        if (con->packet_len == PACKET_LEN_UNSET) {
 
602
                switch (network_mysqld_read_raw(srv, con, (gchar *)con->header, NET_HEADER_SIZE)) {
 
603
                case RET_WAIT_FOR_EVENT:
 
604
                        return RET_WAIT_FOR_EVENT;
 
605
                case RET_ERROR:
 
606
                        return RET_ERROR;
 
607
                case RET_SUCCESS:
 
608
                        break;
 
609
                case RET_ERROR_RETRY:
 
610
                        g_error("RET_ERROR_RETRY wasn't expected");
 
611
                        break;
 
612
                }
 
613
 
 
614
                con->packet_len = network_mysqld_packet_get_header(con->header);
 
615
                con->packet_id  = con->header[3]; /* packet-id if the next packet */
 
616
 
 
617
                packet = g_string_sized_new(con->packet_len + NET_HEADER_SIZE);
 
618
                g_string_append_len(packet, (gchar *)con->header, NET_HEADER_SIZE); /* copy the header */
 
619
 
 
620
                network_queue_append_chunk(con->recv_queue, packet);
 
621
        } else {
 
622
                packet = con->recv_queue->chunks->tail->data;
 
623
        }
 
624
 
 
625
        g_assert(packet->allocated_len >= con->packet_len + NET_HEADER_SIZE);
 
626
 
 
627
        switch (network_mysqld_read_raw(srv, con, packet->str + NET_HEADER_SIZE, con->packet_len)) {
 
628
        case RET_WAIT_FOR_EVENT:
 
629
                return RET_WAIT_FOR_EVENT;
 
630
        case RET_ERROR:
 
631
                return RET_ERROR;
 
632
        case RET_SUCCESS:
 
633
                break;
 
634
        case RET_ERROR_RETRY:
 
635
                g_error("RET_ERROR_RETRY wasn't expected");
 
636
                break;
 
637
 
 
638
        }
 
639
 
 
640
        packet->len += con->packet_len;
 
641
 
 
642
        return RET_SUCCESS;
 
643
}
 
644
 
 
645
retval_t network_mysqld_write_len(network_mysqld *UNUSED_PARAM(srv), network_socket *con, int send_chunks) {
 
646
        /* send the whole queue */
 
647
        GList *chunk;
 
648
 
 
649
        if (send_chunks == 0) return RET_SUCCESS;
 
650
 
 
651
        for (chunk = con->send_queue->chunks->head; chunk; ) {
 
652
                GString *s = chunk->data;
 
653
                ssize_t len;
 
654
 
 
655
                g_assert(con->send_queue->offset < s->len);
 
656
 
 
657
                if (-1 == (len = write(con->fd, s->str + con->send_queue->offset, s->len - con->send_queue->offset))) {
 
658
                        switch (errno) {
 
659
                        case EAGAIN:
 
660
                                return RET_WAIT_FOR_EVENT;
 
661
                        default:
 
662
                                return RET_ERROR;
 
663
                        }
 
664
                } else if (len == 0) {
 
665
                        return RET_ERROR;
 
666
                }
 
667
 
 
668
                con->send_queue->offset += len;
 
669
 
 
670
                if (con->send_queue->offset == s->len) {
 
671
                        g_string_free(s, TRUE);
 
672
                        
 
673
                        g_queue_delete_link(con->send_queue->chunks, chunk);
 
674
                        con->send_queue->offset = 0;
 
675
 
 
676
                        if (send_chunks > 0 && --send_chunks == 0) break;
 
677
 
 
678
                        chunk = con->send_queue->chunks->head;
 
679
                } else {
 
680
                        return RET_WAIT_FOR_EVENT;
 
681
                }
 
682
        }
 
683
 
 
684
        return RET_SUCCESS;
 
685
}
 
686
 
 
687
retval_t network_mysqld_write(network_mysqld *srv, network_socket *con) {
 
688
        retval_t ret;
 
689
        int corked;
 
690
 
 
691
#ifdef TCP_CORK
 
692
        corked = 1;
 
693
        setsockopt(con->fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
 
694
#endif
 
695
        ret = network_mysqld_write_len(srv, con, -1);
 
696
#ifdef TCP_CORK
 
697
        corked = 0;
 
698
        setsockopt(con->fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
 
699
#endif
 
700
 
 
701
        return ret;
 
702
}
 
703
 
 
704
int g_string_lenenc_append_len(GString *dest, const char *s, size_t len) {
 
705
        g_string_append_c(dest, len);
 
706
        if (len) g_string_append_len(dest, s, len);
 
707
 
 
708
        return 0;
 
709
}
 
710
 
 
711
int g_string_lenenc_append(GString *dest, const char *s) {
 
712
        return g_string_lenenc_append_len(dest, s, s ? strlen(s) : 0);
 
713
}
 
714
 
 
715
/**
 
716
 * call the hooks of the plugins for each state
 
717
 *
 
718
 * if the plugin doesn't implement a hook, we provide a default operation
 
719
 */
 
720
retval_t plugin_call(network_mysqld *srv, network_mysqld_con *con, int state) {
 
721
        NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL;
 
722
 
 
723
        switch (state) {
 
724
        case CON_STATE_INIT:
 
725
                func = con->plugins.con_init;
 
726
 
 
727
                if (!func) { /* default implementation */
 
728
                        con->state = CON_STATE_CONNECT_SERVER;
 
729
                }
 
730
                break;
 
731
        case CON_STATE_CONNECT_SERVER:
 
732
                func = con->plugins.con_connect_server;
 
733
 
 
734
                if (!func) { /* default implementation */
 
735
                        con->state = CON_STATE_READ_HANDSHAKE;
 
736
                }
 
737
 
 
738
                break;
 
739
 
 
740
        case CON_STATE_SEND_HANDSHAKE:
 
741
                func = con->plugins.con_send_handshake;
 
742
 
 
743
                if (!func) { /* default implementation */
 
744
                        con->state = CON_STATE_READ_AUTH;
 
745
                }
 
746
 
 
747
                break;
 
748
        case CON_STATE_READ_HANDSHAKE:
 
749
                func = con->plugins.con_read_handshake;
 
750
 
 
751
                break;
 
752
        case CON_STATE_READ_AUTH:
 
753
                func = con->plugins.con_read_auth;
 
754
 
 
755
                break;
 
756
        case CON_STATE_SEND_AUTH:
 
757
                func = con->plugins.con_send_auth;
 
758
 
 
759
                if (!func) { /* default implementation */
 
760
                        con->state = CON_STATE_READ_AUTH_RESULT;
 
761
                }
 
762
                break;
 
763
        case CON_STATE_READ_AUTH_RESULT:
 
764
                func = con->plugins.con_read_auth_result;
 
765
                break;
 
766
        case CON_STATE_SEND_AUTH_RESULT:
 
767
                func = con->plugins.con_send_auth_result;
 
768
 
 
769
                if (!func) { /* default implementation */
 
770
                        switch (con->parse.state.auth_result.state) {
 
771
                        case MYSQLD_PACKET_OK:
 
772
                                con->state = CON_STATE_READ_QUERY;
 
773
                                break;
 
774
                        case MYSQLD_PACKET_ERR:
 
775
                                con->state = CON_STATE_ERROR;
 
776
                                break;
 
777
                        case MYSQLD_PACKET_EOF:
 
778
                                /**
 
779
                                 * the MySQL 4.0 hash in a MySQL 4.1+ connection
 
780
                                 */
 
781
                                con->state = CON_STATE_READ_AUTH_OLD_PASSWORD;
 
782
                                break;
 
783
                        default:
 
784
                                g_error("%s.%d: unexpected state for SEND_AUTH_RESULT: %02x", 
 
785
                                                __FILE__, __LINE__,
 
786
                                                con->parse.state.auth_result.state);
 
787
                        }
 
788
                }
 
789
                break;
 
790
        case CON_STATE_READ_AUTH_OLD_PASSWORD: {
 
791
                /** move the packet to the send queue */
 
792
                GString *packet;
 
793
                GList *chunk;
 
794
                network_socket *recv_sock, *send_sock;
 
795
 
 
796
                recv_sock = con->client;
 
797
                send_sock = con->server;
 
798
 
 
799
                chunk = recv_sock->recv_queue->chunks->head;
 
800
                packet = chunk->data;
 
801
 
 
802
                /* we aren't finished yet */
 
803
                if (packet->len != recv_sock->packet_len + NET_HEADER_SIZE) return RET_SUCCESS;
 
804
 
 
805
                network_queue_append_chunk(send_sock->send_queue, packet);
 
806
 
 
807
                recv_sock->packet_len = PACKET_LEN_UNSET;
 
808
                g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);
 
809
 
 
810
                /**
 
811
                 * send it out to the client 
 
812
                 */
 
813
                con->state = CON_STATE_SEND_AUTH_OLD_PASSWORD;
 
814
                break; }
 
815
        case CON_STATE_SEND_AUTH_OLD_PASSWORD:
 
816
                /**
 
817
                 * data is at the server, read the response next 
 
818
                 */
 
819
                con->state = CON_STATE_READ_AUTH_RESULT;
 
820
                break;
 
821
        case CON_STATE_READ_QUERY:
 
822
                func = con->plugins.con_read_query;
 
823
                break;
 
824
        case CON_STATE_READ_QUERY_RESULT:
 
825
                func = con->plugins.con_read_query_result;
 
826
                break;
 
827
        case CON_STATE_SEND_QUERY_RESULT:
 
828
                func = con->plugins.con_send_query_result;
 
829
 
 
830
                if (!func) { /* default implementation */
 
831
                        con->state = CON_STATE_READ_QUERY;
 
832
                }
 
833
                break;
 
834
        default:
 
835
                g_error("%s.%d: unhandled state: %d", 
 
836
                                __FILE__, __LINE__,
 
837
                                state);
 
838
        }
 
839
        if (!func) return RET_SUCCESS;
 
840
 
 
841
        return (*func)(srv, con);
 
842
}
 
843
 
 
844
/**
 
845
 * handle the different states of the MySQL protocol
 
846
 */
 
847
void network_mysqld_con_handle(int event_fd, short events, void *user_data) {
 
848
        guint ostate;
 
849
        network_mysqld_con *con = user_data;
 
850
        network_mysqld *srv = con->srv;
 
851
 
 
852
        g_assert(srv);
 
853
        g_assert(con);
 
854
 
 
855
        if (events == EV_READ) {
 
856
                int b = -1;
 
857
 
 
858
                if (ioctl(event_fd, FIONREAD, &b)) {
 
859
                        g_critical("ioctl(%d, FIONREAD, ...) failed: %s", event_fd, strerror(errno));
 
860
 
 
861
                        con->state = CON_STATE_ERROR;
 
862
                } else if (b != 0) {
 
863
                        if (con->client && event_fd == con->client->fd) {
 
864
                                con->client->to_read = b;
 
865
                        } else if (con->server && event_fd == con->server->fd) {
 
866
                                con->server->to_read = b;
 
867
                        } else {
 
868
                                g_error("%s.%d: neither nor", __FILE__, __LINE__);
 
869
                        }
 
870
                } else {
 
871
                        con->state = CON_STATE_ERROR;
 
872
                }
 
873
        }
 
874
 
 
875
#define WAIT_FOR_EVENT(ev_struct, ev_type, timeout) \
 
876
        event_set(&(ev_struct->event), ev_struct->fd, ev_type, network_mysqld_con_handle, user_data); \
 
877
        event_base_set(srv->event_base, &(ev_struct->event));\
 
878
        event_add(&(ev_struct->event), timeout);
 
879
 
 
880
        do {
 
881
                ostate = con->state;
 
882
                switch (con->state) {
 
883
                case CON_STATE_ERROR:
 
884
                        /* we can't go on, close the connection */
 
885
                        plugin_call_cleanup(srv, con);
 
886
                        network_mysqld_con_free(con);
 
887
 
 
888
                        con = NULL;
 
889
 
 
890
                        return;
 
891
                case CON_STATE_INIT:
 
892
                        /* if we are a proxy ask the remote server for the hand-shake packet 
 
893
                         * if not, we generate one */
 
894
 
 
895
                        switch (plugin_call(srv, con, con->state)) {
 
896
                        case RET_SUCCESS:
 
897
                                break;
 
898
                        default:
 
899
                                /**
 
900
                                 * no luck, let's close the connection
 
901
                                 */
 
902
                                g_critical("%s.%d: plugin_call(CON_STATE_INIT) != RET_SUCCESS", __FILE__, __LINE__);
 
903
 
 
904
                                con->state = CON_STATE_ERROR;
 
905
                                
 
906
                                break;
 
907
                        }
 
908
 
 
909
                        break;
 
910
                case CON_STATE_CONNECT_SERVER:
 
911
                        switch (plugin_call(srv, con, con->state)) {
 
912
                        case RET_SUCCESS:
 
913
                                break;
 
914
                        case RET_ERROR_RETRY:
 
915
                                /* hack to force a retry */
 
916
                                ostate = CON_STATE_INIT;
 
917
 
 
918
                                break;
 
919
                        case RET_ERROR:
 
920
                                g_error("%s.%d: plugin_call(CON_STATE_CONNECT_SERVER) returned an error", __FILE__, __LINE__);
 
921
                                break;
 
922
                        default:
 
923
                                g_error("%s.%d: ...", __FILE__, __LINE__);
 
924
                                break;
 
925
                        }
 
926
 
 
927
                        g_assert(con->server);
 
928
 
 
929
                        break;
 
930
                case CON_STATE_READ_HANDSHAKE: {
 
931
                        /**
 
932
                         * read auth data from the remote mysql-server 
 
933
                         */
 
934
                        network_socket *recv_sock;
 
935
 
 
936
                        recv_sock = con->server;
 
937
                        g_assert(events == 0 || event_fd == recv_sock->fd);
 
938
 
 
939
                        switch (network_mysqld_read(srv, recv_sock)) {
 
940
                        case RET_SUCCESS:
 
941
                                break;
 
942
                        case RET_WAIT_FOR_EVENT:
 
943
                                /* call us again when you have a event */
 
944
                                WAIT_FOR_EVENT(con->server, EV_READ, NULL);
 
945
 
 
946
                                return;
 
947
                        case RET_ERROR_RETRY:
 
948
                        case RET_ERROR:
 
949
                                g_error("%s.%d: plugin_call(CON_STATE_CONNECT_SERVER) returned an error", __FILE__, __LINE__);
 
950
                                break;
 
951
                        }
 
952
 
 
953
                        switch (plugin_call(srv, con, con->state)) {
 
954
                        case RET_SUCCESS:
 
955
                                break;
 
956
                        default:
 
957
                                g_error("%s.%d: ...", __FILE__, __LINE__);
 
958
                                break;
 
959
                        }
 
960
        
 
961
                        break; }
 
962
                case CON_STATE_SEND_HANDSHAKE: 
 
963
                        /* send the hand-shake to the client and wait for a response */
 
964
                        
 
965
                        switch (network_mysqld_write(srv, con->client)) {
 
966
                        case RET_SUCCESS:
 
967
                                break;
 
968
                        case RET_WAIT_FOR_EVENT:
 
969
                                WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);
 
970
                                
 
971
                                return;
 
972
                        case RET_ERROR_RETRY:
 
973
                        case RET_ERROR:
 
974
                                g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_HANDSHAKE) returned an error", __FILE__, __LINE__);
 
975
                                break;
 
976
                        }
 
977
 
 
978
                        switch (plugin_call(srv, con, con->state)) {
 
979
                        case RET_SUCCESS:
 
980
                                break;
 
981
                        default:
 
982
                                g_error("%s.%d: plugin_call(CON_STATE_SEND_HANDSHAKE) != RET_SUCCESS", __FILE__, __LINE__);
 
983
                                break;
 
984
                        }
 
985
 
 
986
                        break;
 
987
                case CON_STATE_READ_AUTH: {
 
988
                        /* read auth from client */
 
989
                        network_socket *recv_sock;
 
990
 
 
991
                        recv_sock = con->client;
 
992
 
 
993
                        g_assert(events == 0 || event_fd == recv_sock->fd);
 
994
 
 
995
                        switch (network_mysqld_read(srv, recv_sock)) {
 
996
                        case RET_SUCCESS:
 
997
                                break;
 
998
                        case RET_WAIT_FOR_EVENT:
 
999
                                WAIT_FOR_EVENT(con->client, EV_READ, NULL);
 
1000
 
 
1001
                                return;
 
1002
                        case RET_ERROR_RETRY:
 
1003
                        case RET_ERROR:
 
1004
                                g_error("%s.%d: network_mysqld_read(CON_STATE_READ_AUTH) returned an error", __FILE__, __LINE__);
 
1005
                                return;
 
1006
                        }
 
1007
 
 
1008
                        switch (plugin_call(srv, con, con->state)) {
 
1009
                        case RET_SUCCESS:
 
1010
                                break;
 
1011
                        default:
 
1012
                                g_error("%s.%d: plugin_call(CON_STATE_READ_AUTH) != RET_SUCCESS", __FILE__, __LINE__);
 
1013
                                break;
 
1014
                        }
 
1015
 
 
1016
                        break; }
 
1017
                case CON_STATE_SEND_AUTH:
 
1018
                        /* send the auth-response to the server */
 
1019
                        switch (network_mysqld_write(srv, con->server)) {
 
1020
                        case RET_SUCCESS:
 
1021
                                break;
 
1022
                        case RET_WAIT_FOR_EVENT:
 
1023
                                WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);
 
1024
 
 
1025
                                return;
 
1026
                        case RET_ERROR_RETRY:
 
1027
                        case RET_ERROR:
 
1028
                                /* might be a connection close, we should just close the connection and be happy */
 
1029
                                g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_AUTH) returned an error", __FILE__, __LINE__);
 
1030
                                return;
 
1031
                        }
 
1032
 
 
1033
                        switch (plugin_call(srv, con, con->state)) {
 
1034
                        case RET_SUCCESS:
 
1035
                                break;
 
1036
                        default:
 
1037
                                g_error("%s.%d: plugin_call(CON_STATE_SEND_AUTH) != RET_SUCCESS", __FILE__, __LINE__);
 
1038
                                break;
 
1039
                        }
 
1040
 
 
1041
                        break;
 
1042
                case CON_STATE_READ_AUTH_RESULT: {
 
1043
                        /* read the auth result from the server */
 
1044
                        network_socket *recv_sock;
 
1045
                        GList *chunk;
 
1046
                        GString *packet;
 
1047
 
 
1048
                        recv_sock = con->server;
 
1049
 
 
1050
                        g_assert(events == 0 || event_fd == recv_sock->fd);
 
1051
 
 
1052
                        switch (network_mysqld_read(srv, recv_sock)) {
 
1053
                        case RET_SUCCESS:
 
1054
                                break;
 
1055
                        case RET_WAIT_FOR_EVENT:
 
1056
                                WAIT_FOR_EVENT(con->server, EV_READ, NULL);
 
1057
                                return;
 
1058
                        case RET_ERROR_RETRY:
 
1059
                        case RET_ERROR:
 
1060
                                g_error("%s.%d: network_mysqld_read(CON_STATE_READ_AUTH_RESULT) returned an error", __FILE__, __LINE__);
 
1061
                                return;
 
1062
                        }
 
1063
 
 
1064
                        /**
 
1065
                         * depending on the result-set we have different exit-points
 
1066
                         * - OK  -> READ_QUERY
 
1067
                         * - EOF -> (read old password hash) 
 
1068
                         * - ERR -> ERROR
 
1069
                         */
 
1070
                        chunk = recv_sock->recv_queue->chunks->head;
 
1071
                        packet = chunk->data;
 
1072
                        g_assert(packet);
 
1073
                        g_assert(packet->len > NET_HEADER_SIZE);
 
1074
                        con->parse.state.auth_result.state = packet->str[NET_HEADER_SIZE];
 
1075
 
 
1076
                        switch (plugin_call(srv, con, con->state)) {
 
1077
                        case RET_SUCCESS:
 
1078
                                break;
 
1079
                        default:
 
1080
                                g_critical("%s.%d: plugin_call(CON_STATE_READ_AUTH_RESULT) != RET_SUCCESS", __FILE__, __LINE__);
 
1081
 
 
1082
                                con->state = CON_STATE_ERROR;
 
1083
                                break;
 
1084
                        }
 
1085
 
 
1086
                        break; }
 
1087
                case CON_STATE_SEND_AUTH_RESULT: {
 
1088
                        /* send the hand-shake to the client and wait for a response */
 
1089
 
 
1090
                        switch (network_mysqld_write(srv, con->client)) {
 
1091
                        case RET_SUCCESS:
 
1092
                                break;
 
1093
                        case RET_WAIT_FOR_EVENT:
 
1094
                                WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);
 
1095
                                return;
 
1096
                        case RET_ERROR_RETRY:
 
1097
                        case RET_ERROR:
 
1098
                                g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_AUTH_RESULT) returned an error", __FILE__, __LINE__);
 
1099
                                return;
 
1100
                        }
 
1101
 
 
1102
                        switch (plugin_call(srv, con, con->state)) {
 
1103
                        case RET_SUCCESS:
 
1104
                                break;
 
1105
                        default:
 
1106
                                g_error("%s.%d: ...", __FILE__, __LINE__);
 
1107
                                break;
 
1108
                        }
 
1109
                                
 
1110
                        break; }
 
1111
                case CON_STATE_READ_AUTH_OLD_PASSWORD: 
 
1112
                        /* read auth from client */
 
1113
 
 
1114
                        switch (network_mysqld_read(srv, con->client)) {
 
1115
                        case RET_SUCCESS:
 
1116
                                break;
 
1117
                        case RET_WAIT_FOR_EVENT:
 
1118
                                WAIT_FOR_EVENT(con->client, EV_READ, NULL);
 
1119
 
 
1120
                                return;
 
1121
                        case RET_ERROR_RETRY:
 
1122
                        case RET_ERROR:
 
1123
                                g_error("%s.%d: network_mysqld_read(CON_STATE_READ_AUTH_OLD_PASSWORD) returned an error", __FILE__, __LINE__);
 
1124
                                return;
 
1125
                        }
 
1126
 
 
1127
                        switch (plugin_call(srv, con, con->state)) {
 
1128
                        case RET_SUCCESS:
 
1129
                                break;
 
1130
                        default:
 
1131
                                g_error("%s.%d: plugin_call(CON_STATE_READ_AUTH_OLD_PASSWORD) != RET_SUCCESS", __FILE__, __LINE__);
 
1132
                                break;
 
1133
                        }
 
1134
 
 
1135
                        break; 
 
1136
                case CON_STATE_SEND_AUTH_OLD_PASSWORD:
 
1137
                        /* send the auth-response to the server */
 
1138
                        switch (network_mysqld_write(srv, con->server)) {
 
1139
                        case RET_SUCCESS:
 
1140
                                break;
 
1141
                        case RET_WAIT_FOR_EVENT:
 
1142
                                WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);
 
1143
 
 
1144
                                return;
 
1145
                        case RET_ERROR_RETRY:
 
1146
                        case RET_ERROR:
 
1147
                                /* might be a connection close, we should just close the connection and be happy */
 
1148
                                g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_AUTH_OLD_PASSWORD) returned an error", __FILE__, __LINE__);
 
1149
                                return;
 
1150
                        }
 
1151
 
 
1152
                        switch (plugin_call(srv, con, con->state)) {
 
1153
                        case RET_SUCCESS:
 
1154
                                break;
 
1155
                        default:
 
1156
                                g_error("%s.%d: plugin_call(CON_STATE_SEND_AUTH_OLD_PASSWORD) != RET_SUCCESS", __FILE__, __LINE__);
 
1157
                                break;
 
1158
                        }
 
1159
 
 
1160
                        break;
 
1161
 
 
1162
                case CON_STATE_READ_QUERY: {
 
1163
                        network_socket *recv_sock;
 
1164
 
 
1165
                        recv_sock = con->client;
 
1166
 
 
1167
                        g_assert(events == 0 || event_fd == recv_sock->fd);
 
1168
 
 
1169
                        switch (network_mysqld_read(srv, recv_sock)) {
 
1170
                        case RET_SUCCESS:
 
1171
                                break;
 
1172
                        case RET_WAIT_FOR_EVENT:
 
1173
                                WAIT_FOR_EVENT(con->client, EV_READ, NULL);
 
1174
                                return;
 
1175
                        case RET_ERROR_RETRY:
 
1176
                        case RET_ERROR:
 
1177
                                g_error("%s.%d: network_mysqld_read(CON_STATE_READ_QUERY) returned an error", __FILE__, __LINE__);
 
1178
                                return;
 
1179
                        }
 
1180
 
 
1181
                        switch (plugin_call(srv, con, con->state)) {
 
1182
                        case RET_SUCCESS:
 
1183
                                break;
 
1184
                        default:
 
1185
                                g_error("%s.%d: ...", __FILE__, __LINE__);
 
1186
                                break;
 
1187
                        }
 
1188
 
 
1189
                        break; }
 
1190
                case CON_STATE_SEND_QUERY: 
 
1191
                        /* send the query to the server */
 
1192
 
 
1193
                        if (con->server->send_queue->offset == 0) {
 
1194
                                /* only parse the packets once */
 
1195
                                GString *s;
 
1196
                                GList *chunk;
 
1197
 
 
1198
                                chunk = con->server->send_queue->chunks->head;
 
1199
                                s = chunk->data;
 
1200
 
 
1201
                                /* only parse once and don't care about the blocking read */
 
1202
                                if (con->parse.command == COM_QUERY &&
 
1203
                                    con->parse.state.query == PARSE_COM_QUERY_LOAD_DATA) {
 
1204
                                        /* is this a LOAD DATA INFILE ... extra round ? */
 
1205
                                        /* this isn't a command packet, but a LOAD DATA INFILE data-packet */
 
1206
                                        if (s->str[0] == 0 && s->str[1] == 0 && s->str[2] == 0) {
 
1207
                                                con->parse.state.query = PARSE_COM_QUERY_LOAD_DATA_END_DATA;
 
1208
                                        }
 
1209
                                } else if (con->is_overlong_packet) {
 
1210
                                        /* the last packet was a over-long packet
 
1211
                                         * this is the same command, just more data */
 
1212
        
 
1213
                                        if (con->parse.len != PACKET_LEN_MAX) {
 
1214
                                                con->is_overlong_packet = 0;
 
1215
                                        }
 
1216
        
 
1217
                                } else {
 
1218
                                        con->parse.command = s->str[4];
 
1219
        
 
1220
                                        if (con->parse.len == PACKET_LEN_MAX) {
 
1221
                                                con->is_overlong_packet = 1;
 
1222
                                        }
 
1223
                
 
1224
                                        /* init the parser for the commands */
 
1225
                                        switch (con->parse.command) {
 
1226
                                        case COM_QUERY:
 
1227
                                        case COM_STMT_EXECUTE:
 
1228
                                                con->parse.state.query = PARSE_COM_QUERY_INIT;
 
1229
                                                break;
 
1230
                                        case COM_STMT_PREPARE:
 
1231
                                                con->parse.state.prepare.first_packet = 1;
 
1232
                                                break;
 
1233
                                        default:
 
1234
                                                break;
 
1235
                                        }
 
1236
                                }
 
1237
                        }
 
1238
        
 
1239
                        switch (network_mysqld_write_len(srv, con->server, 1)) {
 
1240
                        case RET_SUCCESS:
 
1241
                                break;
 
1242
                        case RET_WAIT_FOR_EVENT:
 
1243
                                WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);
 
1244
                                return;
 
1245
                        case RET_ERROR_RETRY:
 
1246
                        case RET_ERROR:
 
1247
                                g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_QUERY) returned an error", __FILE__, __LINE__);
 
1248
                                return;
 
1249
                        }
 
1250
 
 
1251
                        if (con->is_overlong_packet) {
 
1252
                                con->state = CON_STATE_READ_QUERY;
 
1253
                                break;
 
1254
                        }
 
1255
 
 
1256
                        /* some statements don't have a server response */
 
1257
                        switch (con->parse.command) {
 
1258
                        case COM_STMT_SEND_LONG_DATA: /* not acked */
 
1259
                        case COM_STMT_CLOSE:
 
1260
                                con->state = CON_STATE_READ_QUERY;
 
1261
                                break;
 
1262
                        case COM_QUERY:
 
1263
                                if (con->parse.state.query == PARSE_COM_QUERY_LOAD_DATA) {
 
1264
                                        con->state = CON_STATE_READ_QUERY;
 
1265
                                } else {
 
1266
                                        con->state = CON_STATE_READ_QUERY_RESULT;
 
1267
                                }
 
1268
                                break;
 
1269
                        default:
 
1270
                                con->state = CON_STATE_READ_QUERY_RESULT;
 
1271
                                break;
 
1272
                        }
 
1273
                                
 
1274
                        break; 
 
1275
                case CON_STATE_READ_QUERY_RESULT: 
 
1276
                        do {
 
1277
                                network_socket *recv_sock;
 
1278
 
 
1279
                                recv_sock = con->server;
 
1280
 
 
1281
                                g_assert(events == 0 || event_fd == recv_sock->fd);
 
1282
 
 
1283
                                switch (network_mysqld_read(srv, recv_sock)) {
 
1284
                                case RET_SUCCESS:
 
1285
                                        break;
 
1286
                                case RET_WAIT_FOR_EVENT:
 
1287
                                        WAIT_FOR_EVENT(con->server, EV_READ, NULL);
 
1288
                                        return;
 
1289
                                case RET_ERROR_RETRY:
 
1290
                                case RET_ERROR:
 
1291
                                        g_error("%s.%d: network_mysqld_read(CON_STATE_READ_QUERY_RESULT) returned an error", __FILE__, __LINE__);
 
1292
                                        return;
 
1293
                                }
 
1294
 
 
1295
                                switch (plugin_call(srv, con, con->state)) {
 
1296
                                case RET_SUCCESS:
 
1297
                                        break;
 
1298
                                default:
 
1299
                                        g_error("%s.%d: ...", __FILE__, __LINE__);
 
1300
                                        break;
 
1301
                                }
 
1302
 
 
1303
                        } while (con->state == CON_STATE_READ_QUERY_RESULT);
 
1304
        
 
1305
                        break; 
 
1306
                case CON_STATE_SEND_QUERY_RESULT:
 
1307
                        /**
 
1308
                         * send the query result-set to the client */
 
1309
 
 
1310
                        switch (network_mysqld_write(srv, con->client)) {
 
1311
                        case RET_SUCCESS:
 
1312
                                break;
 
1313
                        case RET_WAIT_FOR_EVENT:
 
1314
                                WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);
 
1315
                                return;
 
1316
                        case RET_ERROR_RETRY:
 
1317
                        case RET_ERROR:
 
1318
                                g_error("%s.%d: network_mysqld_write(CON_STATE_SEND_QUERY_RESULT) returned an error", __FILE__, __LINE__);
 
1319
                                return;
 
1320
                        }
 
1321
 
 
1322
                        switch (plugin_call(srv, con, con->state)) {
 
1323
                        case RET_SUCCESS:
 
1324
                                break;
 
1325
                        default:
 
1326
                                g_error("%s.%d: ...", __FILE__, __LINE__);
 
1327
                                break;
 
1328
                        }
 
1329
 
 
1330
                        break;
 
1331
                }
 
1332
 
 
1333
                event_fd = -1;
 
1334
                events   = 0;
 
1335
        } while (ostate != con->state);
 
1336
 
 
1337
        return;
 
1338
}
 
1339
 
 
1340
/**
 
1341
 * we will be called by the event handler 
 
1342
 *
 
1343
 *
 
1344
 */
 
1345
void network_mysqld_con_accept(int event_fd, short events, void *user_data) {
 
1346
        network_mysqld_con *con = user_data;
 
1347
        network_mysqld_con *client_con;
 
1348
        socklen_t addr_len;
 
1349
        struct sockaddr_in ipv4;
 
1350
        int fd;
 
1351
 
 
1352
 
 
1353
        g_assert(events == EV_READ);
 
1354
        g_assert(con->server);
 
1355
 
 
1356
        addr_len = sizeof(struct sockaddr_in);
 
1357
 
 
1358
        if (-1 == (fd = accept(event_fd, (struct sockaddr *)&ipv4, &addr_len))) {
 
1359
                return ;
 
1360
        }
 
1361
 
 
1362
        fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
 
1363
 
 
1364
        /* looks like we open a client connection */
 
1365
        client_con = network_mysqld_con_init(con->srv);
 
1366
        client_con->client = network_socket_init();
 
1367
        client_con->client->addr.addr.ipv4 = ipv4;
 
1368
        client_con->client->addr.len = addr_len;
 
1369
        client_con->client->fd   = fd;
 
1370
 
 
1371
        /* copy the config */
 
1372
        client_con->config = con->config;
 
1373
        client_con->config.network_type = con->config.network_type;
 
1374
 
 
1375
        switch (con->config.network_type) {
 
1376
        case NETWORK_TYPE_SERVER:
 
1377
                network_mysqld_server_connection_init(NULL, client_con);
 
1378
                break;
 
1379
        case NETWORK_TYPE_PROXY:
 
1380
                network_mysqld_proxy_connection_init(NULL, client_con);
 
1381
                break;
 
1382
        default:
 
1383
                g_error("%s.%d", __FILE__, __LINE__);
 
1384
                break;
 
1385
        }
 
1386
 
 
1387
        network_mysqld_con_handle(-1, 0, client_con);
 
1388
 
 
1389
        return;
 
1390
}
 
1391
 
 
1392
 
 
1393
void handle_timeout() {
 
1394
        if (!agent_shutdown) return;
 
1395
 
 
1396
        /* we have to shutdown, disable all events to leave the dispatch */
 
1397
}
 
1398
 
 
1399
void *network_mysqld_thread(void *_srv) {
 
1400
        network_mysqld *srv = _srv;
 
1401
 
 
1402
        /* create the connection array */
 
1403
 
 
1404
 
 
1405
        /* setup the different handlers */
 
1406
 
 
1407
        if (srv->config.admin.address) {
 
1408
                network_mysqld_con *con = NULL;
 
1409
 
 
1410
                con = network_mysqld_con_init(srv);
 
1411
                con->config = srv->config;
 
1412
                con->config.network_type = NETWORK_TYPE_SERVER;
 
1413
                
 
1414
                con->server = network_socket_init();
 
1415
 
 
1416
                if (0 != network_mysqld_server_init(srv, con->server)) {
 
1417
                        g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__);
 
1418
 
 
1419
                        return NULL;
 
1420
                }
 
1421
 
 
1422
                /* keep the listen socket alive */
 
1423
                event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
 
1424
                event_base_set(srv->event_base, &(con->server->event));
 
1425
                event_add(&(con->server->event), NULL);
 
1426
        }
 
1427
 
 
1428
        if (srv->config.proxy.address) {
 
1429
                network_mysqld_con *con = NULL;
 
1430
 
 
1431
                con = network_mysqld_con_init(srv);
 
1432
                con->config = srv->config;
 
1433
                con->config.network_type = NETWORK_TYPE_PROXY;
 
1434
                
 
1435
                con->server = network_socket_init();
 
1436
 
 
1437
                if (0 != network_mysqld_proxy_init(srv, con->server)) {
 
1438
                        g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__);
 
1439
                        return NULL;
 
1440
                }
 
1441
        
 
1442
                /* keep the listen socket alive */
 
1443
                event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con);
 
1444
                event_base_set(srv->event_base, &(con->server->event));
 
1445
                event_add(&(con->server->event), NULL);
 
1446
        }
 
1447
 
 
1448
        while (!agent_shutdown) {
 
1449
                struct timeval timeout;
 
1450
                int r;
 
1451
 
 
1452
                timeout.tv_sec = 1;
 
1453
                timeout.tv_usec = 0;
 
1454
 
 
1455
                g_assert(event_base_loopexit(srv->event_base, &timeout) == 0);
 
1456
 
 
1457
                r = event_base_dispatch(srv->event_base);
 
1458
 
 
1459
                if (r == -1) {
 
1460
                        if (errno == EINTR) continue;
 
1461
 
 
1462
                        break;
 
1463
                }
 
1464
        }
 
1465
 
 
1466
        return NULL;
 
1467
}
 
1468
 
 
1469
int network_mysqld_con_send_resultset(network_socket *con, GPtrArray *fields, GPtrArray *rows) {
 
1470
        GString *s;
 
1471
        gsize i, j;
 
1472
 
 
1473
        g_assert(fields->len > 0 && fields->len < 251);
 
1474
 
 
1475
        s = g_string_new(NULL);
 
1476
 
 
1477
        /* - len = 99
 
1478
         *  \1\0\0\1 
 
1479
         *    \1 - one field
 
1480
         *  \'\0\0\2 
 
1481
         *    \3def 
 
1482
         *    \0 
 
1483
         *    \0 
 
1484
         *    \0 
 
1485
         *    \21@@version_comment 
 
1486
         *    \0            - org-name
 
1487
         *    \f            - filler
 
1488
         *    \10\0         - charset
 
1489
         *    \34\0\0\0     - length
 
1490
         *    \375          - type 
 
1491
         *    \1\0          - flags
 
1492
         *    \37           - decimals
 
1493
         *    \0\0          - filler 
 
1494
         *  \5\0\0\3 
 
1495
         *    \376\0\0\2\0
 
1496
         *  \35\0\0\4
 
1497
         *    \34MySQL Community Server (GPL)
 
1498
         *  \5\0\0\5
 
1499
         *    \376\0\0\2\0
 
1500
         */
 
1501
 
 
1502
        g_string_append_c(s, fields->len); /* the field-count */
 
1503
        network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
 
1504
 
 
1505
        for (i = 0; i < fields->len; i++) {
 
1506
                MYSQL_FIELD *field = fields->pdata[i];
 
1507
                
 
1508
                g_string_truncate(s, 0);
 
1509
 
 
1510
                g_string_lenenc_append(s, field->catalog ? field->catalog : "def");        /* catalog */
 
1511
                g_string_lenenc_append(s, field->db);          /* database */
 
1512
                g_string_lenenc_append(s, field->table);       /* table */
 
1513
                g_string_lenenc_append(s, field->org_table);   /* org_table */
 
1514
                g_string_lenenc_append(s, field->name);        /* name */
 
1515
                g_string_lenenc_append(s, field->org_name);    /* org_name */
 
1516
 
 
1517
                g_string_append_c(s, '\x0c');                  /* length of the following block, 12 byte */
 
1518
                g_string_append_len(s, "\x08\x00", 2);         /* charset */
 
1519
                g_string_append_c(s, (field->length >> 0) & 0xff); /* len */
 
1520
                g_string_append_c(s, (field->length >> 8) & 0xff); /* len */
 
1521
                g_string_append_c(s, (field->length >> 16) & 0xff); /* len */
 
1522
                g_string_append_c(s, (field->length >> 24) & 0xff); /* len */
 
1523
                g_string_append_c(s, field->type);             /* type */
 
1524
                g_string_append_c(s, field->flags & 0xff);     /* flags */
 
1525
                g_string_append_c(s, (field->flags >> 8) & 0xff); /* flags */
 
1526
                g_string_append_c(s, 0);                       /* decimals */
 
1527
                g_string_append_len(s, "\x00\x00", 2);         /* filler */
 
1528
#if 0
 
1529
                /* this is in the docs, but not on the network */
 
1530
                g_string_lenenc_append(s, field->def);         /* default-value */
 
1531
#endif
 
1532
                network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
 
1533
        }
 
1534
 
 
1535
        g_string_truncate(s, 0);
 
1536
        
 
1537
        /* EOF */       
 
1538
        g_string_append_len(s, "\xfe", 1); /* EOF */
 
1539
        g_string_append_len(s, "\x00\x00", 2); /* warning count */
 
1540
        g_string_append_len(s, "\x02\x00", 2); /* flags */
 
1541
        
 
1542
        network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
 
1543
 
 
1544
        for (i = 0; i < rows->len; i++) {
 
1545
                GPtrArray *row = rows->pdata[i];
 
1546
 
 
1547
                g_string_truncate(s, 0);
 
1548
 
 
1549
                for (j = 0; j < row->len; j++) {
 
1550
                        g_string_lenenc_append(s, row->pdata[j]);
 
1551
                }
 
1552
                network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
 
1553
        }
 
1554
 
 
1555
        g_string_truncate(s, 0);
 
1556
 
 
1557
        /* EOF */       
 
1558
        g_string_append_len(s, "\xfe", 1); /* EOF */
 
1559
        g_string_append_len(s, "\x00\x00", 2); /* warning count */
 
1560
        g_string_append_len(s, "\x02\x00", 2); /* flags */
 
1561
 
 
1562
        network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);
 
1563
 
 
1564
        g_string_free(s, TRUE);
 
1565
 
 
1566
        return 0;
 
1567
}
 
1568
 
 
1569