120
124
template<typename Result= Slow_Query_Log_Processor>
121
class Parser : public IParser
124
Parser(String_Queue &a_string_queue
125
, std::size_t a_page_size
126
, Result *a_result) :
127
m_thread(this, &Parser<Result>::work, "Slow Query Log Parser")
128
, m_page(a_page_size)
132
, m_queue(a_string_queue)
133
, m_result(*a_result)
160
Transaction(Parser* a_parser) :
162
, m_current(m_parser->begin_transaction())
168
if (false == m_complete)
177
std::ostringstream message;
178
message << "Transaction " << m_current << " already complete";
179
throw Error(message.str());
195
m_parser->commit_transaction(m_current);
209
m_parser->rollback_transaction(m_current);
215
Transaction& operator=(Transaction const &other) { return *this; }
218
unsigned int m_current;
221
friend class Transaction;
223
unsigned int begin_transaction()
225
/*unsigned int result= m_next_transaction_id++;
125
class Parser : public IParser
128
Parser(String_Queue &a_string_queue
129
, std::size_t a_page_size
130
, Result *a_result) :
131
m_thread(this, &Parser<Result>::work, "Slow Query Log Parser")
132
, m_page(a_page_size)
136
, m_queue(a_string_queue)
137
, m_result(*a_result)
164
Transaction(Parser* a_parser) :
166
, m_current(m_parser->begin_transaction())
172
if (false == m_complete)
181
std::ostringstream message;
182
message << "Transaction " << m_current << " already complete";
183
throw Error(message.str());
199
m_parser->commit_transaction(m_current);
213
m_parser->rollback_transaction(m_current);
219
Transaction& operator=(Transaction const &other) { return *this; }
222
unsigned int m_current;
225
friend class Transaction;
227
unsigned int begin_transaction()
229
/*unsigned int result= m_next_transaction_id++;
226
230
m_stack.push_back(std::make_pair(result, m_current));
230
std::ostream& trace(std::ostream& s) const
232
//s << "Parser buffer: " << m_buffer << "\n";
234
line= m_buffer.data() + m_current;
235
line.resize(m_end - m_current);
236
s << "Parser line: '" << line << "'\n";
237
s << "Parser current: " << m_current;
238
s << " end: " << m_end;
239
s << " eof: " << (m_eof ? "true" : "false");
234
std::ostream& trace(std::ostream& s) const
236
//s << "Parser buffer: " << m_buffer << "\n";
238
line= m_buffer.data() + m_current;
239
line.resize(m_end - m_current);
240
s << "Parser line: '" << line << "'\n";
241
s << "Parser current: " << m_current;
242
s << " end: " << m_end;
243
s << " eof: " << (m_eof ? "true" : "false");
240
244
/* s << "\nParser transaction stack:[";
241
for(Stack::const_iterator i= m_stack.begin(), end= m_stack.end(); end != i; ++i)
245
for(Stack::const_iterator i= m_stack.begin(), end= m_stack.end(); end != i; ++i)
242
246
s << " (" << i->first << "," << i->second << ")";
246
void commit_transaction(unsigned int)
250
void commit_transaction(unsigned int)
248
252
/* if (m_stack.empty()
249
|| (false == m_stack.empty() && id != m_stack.back().first))
253
|| (false == m_stack.empty() && id != m_stack.back().first))
251
std::ostringstream message;
252
trace(message) << "\n[ERROR] Can't commit transaction: " << id;
253
throw Error(message.str());
255
std::ostringstream message;
256
trace(message) << "\n[ERROR] Can't commit transaction: " << id;
257
throw Error(message.str());
255
259
m_stack.pop_back();*/
257
261
void rollback_transaction(unsigned int to)
259
263
/* if (m_stack.empty()
260
|| (false == m_stack.empty() && id != m_stack.back().first))
262
std::ostringstream message;
263
trace(message) << "\n[ERROR] Can't rollback transaction: " << id;
264
throw Error(message.str());
266
m_current= m_stack.back().second;
267
m_stack.pop_back();*/
264
|| (false == m_stack.empty() && id != m_stack.back().first))
266
std::ostringstream message;
267
trace(message) << "\n[ERROR] Can't rollback transaction: " << id;
268
throw Error(message.str());
270
m_current= m_stack.back().second;
271
m_stack.pop_back();*/
270
274
#define has_count(count) (m_current + count < m_end + 1)
271
275
#define has_char() has_count(sizeof(char))
272
276
#define has_word(word) has_count(sizeof(word) - 1)
277
std::ostringstream message;
278
trace(message) << "\nCan't call 'current()'";
279
throw Error(message.str());
281
return m_buffer.data()[m_current];
281
std::ostringstream message;
282
trace(message) << "\nCan't call 'current()'";
283
throw Error(message.str());
285
return m_buffer.data()[m_current];
283
287
#define is_char(c) (has_char() && (current() == c))
284
288
/*const char* current_word(const char* word, std::size_t word_strlen, std::size_t word_sizeof, std::size_t line)
286
290
char buffer[1024];
287
291
std::size_t buffer_length= std::min(sizeof(buffer) - 1, m_buffer.length());
288
292
memcpy(buffer, m_buffer.data(), buffer_length);
302
306
#define is_digit() (has_char() && ('0' <= current()) && (current() <= '9'))
303
307
#define is_alpha() (has_char() && (('a' <= current() && (current() <= 'z')) \
304
308
|| (('A' <= current()) && (current() <= 'Z'))))
305
bool do_skip(std::size_t count)
307
ASSERT(has_count(count));
311
bool can_not_do_skip(std::size_t count) const
313
std::ostringstream message;
314
trace(message) << "\nCan't skip<" << count << ">'";
315
throw Error(message.str());
309
bool do_skip(std::size_t count)
311
ASSERT(has_count(count));
315
bool can_not_do_skip(std::size_t count) const
317
std::ostringstream message;
318
trace(message) << "\nCan't skip<" << count << ">'";
319
throw Error(message.str());
317
321
#define skip_char() ((has_char() && do_skip(sizeof(char) )) || can_not_do_skip(sizeof(char) ))
318
322
#define skip_word(word) ((has_word(word) && do_skip(sizeof(word)-1)) || can_not_do_skip(sizeof(word)-1))
319
323
#define skip_count(count) ((has_count(count) && do_skip(count )) || can_not_do_skip(count ))
320
324
#define try_char(c) (is_char(c) && skip_char())
321
325
#define try_word(word) (is_word(word) && skip_word(word))
322
326
#define is_space() (is_char(' ') || is_char('\t') || is_char('\f') || is_char('\r'))
334
338
#define try_set_word() (try_word("set ") || try_word("SET "))
335
339
#define try_use_word() (try_word("use ") || try_word("USE "))
338
return m_eof && (m_end + 1 >= m_buffer.length());
345
if (m_buffer.length() > m_end && m_buffer.data()[m_end] == '\n')
346
new_end= m_buffer.index('\n', m_end + 1);
342
return m_eof && (m_end + 1 >= m_buffer.length());
349
if (m_buffer.length() > m_end && m_buffer.data()[m_end] == '\n')
350
new_end= m_buffer.index('\n', m_end + 1);
352
new_end= m_buffer.index('\n', 0);
353
while(new_end == m_buffer.length())
356
if (m_queue.pop(m_page))
360
m_buffer.add_last(m_page);
361
new_end= m_buffer.index('\n', new_end);
348
new_end= m_buffer.index('\n', 0);
349
while(new_end == m_buffer.length())
352
if (m_queue.pop(m_page))
356
m_buffer.add_last(m_page);
357
new_end= m_buffer.index('\n', new_end);
363
if ((m_buffer.length() > 0))
365
if ((m_buffer.data()[new_end -1] != '\n'))
366
m_buffer.add_last('\n');
367
new_end= m_buffer.index('\n', new_end);
379
m_buffer.remove_first(m_end+1);
383
bool parse_query(bool is_admin)
385
Query_Parse_State state= query_parse_state_begin(eQuery_Detect_Correct);
386
std::size_t query_length= 0;
389
const char* from= m_buffer.data() + m_current + query_length;
390
query_length += query_parse(state, from, m_end - (m_current + query_length));
391
if (end_of_query(state) || is_admin)
393
sql.set(m_buffer.data() + m_current, query_length);
394
if (sql.data()[sql.length() - 1] != ';')
400
skip_count(query_length);
403
if (query_length != m_end)
405
std::ostringstream message;
406
trace(message) << "\nBug in query parser";
407
throw Error(message.str());
431
return try_number() && try_char('.') && try_number() && t.commit();
433
bool parse(long long int &value)
440
value= value * 10 + (current() - '0');
446
bool parse(Thread_Id& value)
451
value= static_cast<Thread_Id>(lli);
367
if ((m_buffer.length() > 0))
369
if ((m_buffer.data()[new_end -1] != '\n'))
370
m_buffer.add_last('\n');
371
new_end= m_buffer.index('\n', new_end);
383
m_buffer.remove_first(m_end+1);
387
bool parse_query(bool is_admin)
389
Query_Parse_State state= query_parse_state_begin(eQuery_Detect_Correct);
390
std::size_t query_length= 0;
393
const char* from= m_buffer.data() + m_current + query_length;
394
query_length += query_parse(state, from, m_end - (m_current + query_length));
395
if (end_of_query(state) || is_admin)
397
sql.set(m_buffer.data() + m_current, query_length);
398
if (sql.data()[sql.length() - 1] != ';')
404
skip_count(query_length);
407
if (query_length != m_end)
409
std::ostringstream message;
410
trace(message) << "\nBug in query parser";
411
throw Error(message.str());
435
return try_number() && try_char('.') && try_number() && t.commit();
437
bool parse(long long int &value)
444
value= value * 10 + (current() - '0');
450
bool parse(Thread_Id& value)
455
value= static_cast<Thread_Id>(lli);
454
458
#define declare_assignment(key) \
455
459
bool try_assignment_##key (long long int &value) \
457
Transaction t(this); \
458
return try_word( #key ) && try_char('=') && parse(value) && t.commit(); \
461
Transaction t(this); \
462
return try_word( #key ) && try_char('=') && parse(value) && t.commit(); \
460
464
#define declare_single_assignment(key) \
461
465
bool try_single_assignment_##key (long long int &value) \
463
Transaction t(this); \
464
return try_space() && try_set_word() && try_assignment_##key (value) && try_char(';') && t.commit(); \
466
declare_assignment(timestamp)
467
declare_assignment(insert_id)
468
declare_assignment(last_insert_id)
469
declare_single_assignment(timestamp)
470
declare_single_assignment(insert_id)
471
declare_single_assignment(last_insert_id)
467
Transaction t(this); \
468
return try_space() && try_set_word() && try_assignment_##key (value) && try_char(';') && t.commit(); \
470
declare_assignment(timestamp)
471
declare_assignment(insert_id)
472
declare_assignment(last_insert_id)
473
declare_single_assignment(timestamp)
474
declare_single_assignment(insert_id)
475
declare_single_assignment(last_insert_id)
473
bool try_semicolon_set_list()
475
//CHECK("try_semicolon_set_list()");
478
if (try_single_assignment_last_insert_id(last_insert_id))
481
m_result.last_insert_id(last_insert_id);
483
if (try_single_assignment_insert_id(insert_id))
486
m_result.insert_id(insert_id);
488
if (try_single_assignment_timestamp(timestamp))
491
m_result.timestamp(timestamp);
493
return result && t.commit();
495
bool try_comma_set_list()
497
//CHECK("try_comma_set_list()");
500
bool comma_need= false;
503
if (try_assignment_last_insert_id(last_insert_id))
505
result= comma_need= true;
506
m_result.last_insert_id(last_insert_id);
513
if (try_assignment_insert_id(insert_id))
515
result= comma_need= true;
516
m_result.insert_id(insert_id);
523
if (try_assignment_timestamp(timestamp))
526
m_result.timestamp(timestamp);
528
return result && try_char(';') && t.commit();
538
database.add_last(current());
542
if (database.length() == 0)
546
m_result.database(database);
477
bool try_semicolon_set_list()
479
//CHECK("try_semicolon_set_list()");
482
if (try_single_assignment_last_insert_id(last_insert_id))
485
m_result.last_insert_id(last_insert_id);
487
if (try_single_assignment_insert_id(insert_id))
490
m_result.insert_id(insert_id);
492
if (try_single_assignment_timestamp(timestamp))
495
m_result.timestamp(timestamp);
497
return result && t.commit();
499
bool try_comma_set_list()
501
//CHECK("try_comma_set_list()");
504
bool comma_need= false;
507
if (try_assignment_last_insert_id(last_insert_id))
509
result= comma_need= true;
510
m_result.last_insert_id(last_insert_id);
517
if (try_assignment_insert_id(insert_id))
519
result= comma_need= true;
520
m_result.insert_id(insert_id);
527
if (try_assignment_timestamp(timestamp))
530
m_result.timestamp(timestamp);
532
return result && try_char(';') && t.commit();
542
database.add_last(current());
546
if (database.length() == 0)
550
m_result.database(database);
549
553
#define try_time() try_word("# Time:")
550
554
#define try_user_host() try_word("# User@Host")
551
bool try_query_time_and_thread_id()
553
//CHECK("try_query_time()");
555
if (try_word("# Query_time:") && try_space() && try_float()
556
&& try_space() && try_word("Lock_time:") && try_space() && try_float()
557
&& try_space() && try_word("Rows_sent:") && try_space() && try_number()
558
&& try_space() && try_word("Rows_examined:") && try_space() && try_number()
559
&& try_space() && try_word("Thread_id:") && try_space() && parse(thread_id))
561
m_result.thread_id(thread_id);
567
bool try_query_time()
569
//CHECK("try_query_time()");
570
return is_word("# Query_time:");
575
// if (try_char('#') && try_space() && try_word("Thread_id:") && try_space() && parse(thread_id))
576
if (try_word("# Thread_id:") && try_space() && parse(thread_id))
578
m_result.thread_id(thread_id);
586
//CHECK("try_admin()");
588
return try_word("# administrator command: ") && parse_query(true) && t.commit();
592
//CHECK("try_query()");
593
return parse_query(false);
555
bool try_query_time_and_thread_id()
557
//CHECK("try_query_time()");
559
if (try_word("# Query_time:") && try_space() && try_float()
560
&& try_space() && try_word("Lock_time:") && try_space() && try_float()
561
&& try_space() && try_word("Rows_sent:") && try_space() && try_number()
562
&& try_space() && try_word("Rows_examined:") && try_space() && try_number()
563
&& try_space() && try_word("Thread_id:") && try_space() && parse(thread_id))
565
m_result.thread_id(thread_id);
571
bool try_query_time()
573
//CHECK("try_query_time()");
574
return is_word("# Query_time:");
579
// if (try_char('#') && try_space() && try_word("Thread_id:") && try_space() && parse(thread_id))
580
if (try_word("# Thread_id:") && try_space() && parse(thread_id))
582
m_result.thread_id(thread_id);
590
//CHECK("try_admin()");
592
return try_word("# administrator command: ") && parse_query(true) && t.commit();
596
//CHECK("try_query()");
597
return parse_query(false);
605
catch(std::exception const &e)
607
std::cerr<< "Parser: " << e.what() << std::endl;
612
std::cerr << "Parser: unhanled exception" << std::endl;
609
catch(std::exception const &e)
611
std::cerr<< "Parser: " << e.what() << std::endl;
616
std::cerr << "Parser: unhanled exception" << std::endl;
627
631
#define DO_SKIP() do { if (eof() || !skip_line()) { return; }; } while(false)
628
while (!is_char('#'))
634
while (!is_char('#'))
652
else if (try_query_time_and_thread_id())
657
else if (try_query_time())
667
bool admin_found= false;
670
admin_found= try_admin();
679
bool set_found= false;
683
set_found=try_semicolon_set_list();
686
if (!set_found && try_comma_set_list())
694
else if (try_query())
699
while (!is_char('#'))
632
while (!is_char('#'))
638
while (!is_char('#'))
656
else if (try_query_time_and_thread_id())
661
else if (try_query_time())
671
bool admin_found= false;
674
admin_found= try_admin();
683
bool set_found= false;
687
set_found=try_semicolon_set_list();
690
if (!set_found && try_comma_set_list())
698
else if (try_query())
703
while (!is_char('#'))
710
std::size_t m_current;
714
std::size_t m_current;
714
718
//unsigned int m_next_transaction_id;
715
719
// typedef std::vector< std::pair< unsigned int, std::size_t > > Stack;
718
String_Queue &m_queue;
722
String_Queue &m_queue;
723
long long int timestamp;
724
long long int insert_id;
725
long long int last_insert_id;
726
unsigned int thread_id;
727
long long int timestamp;
728
long long int insert_id;
729
long long int last_insert_id;
730
unsigned int thread_id;
729
733
#endif // __PERCONA_PLAYBACK__SLOW_LOG_PARSER__H__