~stewart/percona-playback/cassert-header

« back to all changes in this revision

Viewing changes to src/slow_query_log_parser.h

  • Committer: Oleg Tsarev
  • Date: 2011-05-04 21:38:59 UTC
  • mfrom: (108.1.18 sbt)
  • Revision ID: oleg.tsarev@percona.com-20110504213859-pw1mgjmuj9gz44vb
merge split_by_transaction

Show diffs side-by-side

added added

removed removed

Lines of Context:
9
9
#include "options.h"
10
10
#include "queue.h"
11
11
#include "query.h"
12
 
#include "processor.h"
 
12
#include "sheduler.h"
13
13
#include "options.h"
14
14
 
15
15
class Slow_Query_Log_Processor
16
16
{
17
17
public:
18
 
  Slow_Query_Log_Processor(Query_Processor* processor) :
19
 
    m_processor(processor)
 
18
Slow_Query_Log_Processor(Sheduler_Pointer const &a_sheduler) :
 
19
  m_sheduler(a_sheduler)
20
20
  {
21
21
    m_current_query.query_id= 0;
22
22
    m_current_query.database= ::database();
24
24
    m_current_query.insert_id= 0;
25
25
    m_current_query.last_insert_id= 0;
26
26
  }
 
27
  ~Slow_Query_Log_Processor()
 
28
  {
 
29
    //std::cout << "SLOW_QUERY_LOG_PROCESSOR DTOR\n" << std::flush;
 
30
  }
27
31
  void start()
28
32
  {
29
 
    m_processor->start();
 
33
    m_sheduler->start();
30
34
  }
31
 
  void run(Query const &query)
 
35
  bool run(Query const &query)
32
36
  {
33
 
    m_processor->run(query);
 
37
    return m_sheduler->run(query);
34
38
  }
35
39
  void flush()
36
40
  {
37
 
    m_processor->flush();
 
41
    m_sheduler->flush();
38
42
  }
39
 
  void wait()
 
43
  void join()
40
44
  {
41
 
    m_processor->wait();
 
45
    m_sheduler->join();
42
46
  }
43
47
  void stop()
44
48
  {
45
 
    m_processor->stop();
 
49
    m_sheduler->stop();
46
50
  }
47
51
  void database(Database const &a)
48
52
  {
77
81
    m_current_query.thread_id= a;
78
82
  }
79
83
private:
80
 
  Query_Processor *m_processor;
 
84
  Sheduler_Pointer m_sheduler;
81
85
  Query            m_current_query;
82
86
};
83
87
 
118
122
};
119
123
 
120
124
template<typename Result= Slow_Query_Log_Processor>
121
 
class Parser : public IParser
122
 
{
123
 
public:
124
 
  Parser(String_Queue &a_string_queue
125
 
         , std::size_t a_page_size
126
 
         , Result *a_result) :
127
 
    m_thread(this, &Parser<Result>::work, "Slow Query Log Parser")
128
 
    , m_page(a_page_size)
129
 
    , m_current(0)
130
 
    , m_end(0)
131
 
    , m_eof(false)
132
 
    , m_queue(a_string_queue)
133
 
    , m_result(*a_result)
134
 
  {
135
 
  }
136
 
  virtual ~Parser()
137
 
  {
138
 
  }
139
 
  void start()
140
 
  {
141
 
    m_thread.start();
142
 
  }
143
 
  void stop()
144
 
  {
145
 
    m_queue.stop();
146
 
    m_result.stop();
147
 
  }
148
 
  void flush()
149
 
  {
150
 
    m_queue.flush();
151
 
  }
152
 
  void wait()
153
 
  {
154
 
    m_thread.join();
155
 
  }
156
 
public:
157
 
  class Transaction
158
 
  {
159
 
  public:
160
 
    Transaction(Parser* a_parser) :
161
 
      m_parser(a_parser)
162
 
      , m_current(m_parser->begin_transaction())
163
 
      , m_complete(false)
164
 
    {
165
 
    }
166
 
    ~Transaction()
167
 
    {
168
 
      if (false == m_complete)
169
 
        {
170
 
          rollback();
171
 
        }
172
 
    }
173
 
    void complete()
174
 
    {
175
 
      if (m_complete)
176
 
        {
177
 
          std::ostringstream message;
178
 
          message << "Transaction " << m_current << " already complete";
179
 
          throw Error(message.str());
180
 
        }
181
 
      else
182
 
        {
183
 
          m_complete= true;
184
 
        }
185
 
    }
186
 
    bool commit()
187
 
    {
188
 
      if (m_complete)
189
 
        {
190
 
        complete();
191
 
        return false;
192
 
        }
193
 
      else
194
 
        {
195
 
          m_parser->commit_transaction(m_current);
196
 
          complete();
197
 
          return true;
198
 
        }
199
 
    }
200
 
    bool rollback()
201
 
    {
202
 
      if (m_complete)
203
 
        {
204
 
          complete();
205
 
          return false;
206
 
        }
207
 
      else
208
 
        {
209
 
          m_parser->rollback_transaction(m_current);
210
 
          complete();
211
 
          return false;
212
 
        }
213
 
    }
214
 
  private:
215
 
    Transaction& operator=(Transaction const &other) { return *this; }
216
 
  private:
217
 
    Parser *m_parser;
218
 
    unsigned int m_current;
219
 
    bool m_complete;
220
 
  };
221
 
  friend class Transaction;
222
 
private:
223
 
  unsigned int begin_transaction()
224
 
  {
225
 
    /*unsigned int result= m_next_transaction_id++;
 
125
  class Parser : public IParser
 
126
  {
 
127
public:
 
128
Parser(String_Queue &a_string_queue
 
129
       , std::size_t a_page_size
 
130
       , Result *a_result) :
 
131
m_thread(this, &Parser<Result>::work, "Slow Query Log Parser")
 
132
, m_page(a_page_size)
 
133
, m_current(0)
 
134
, m_end(0)
 
135
, m_eof(false)
 
136
, m_queue(a_string_queue)
 
137
, m_result(*a_result)
 
138
{
 
139
}
 
140
virtual ~Parser()
 
141
{
 
142
}
 
143
void start()
 
144
{
 
145
  m_thread.start();
 
146
}
 
147
void stop()
 
148
{
 
149
  m_queue.stop();
 
150
  m_result.stop();
 
151
}
 
152
void flush()
 
153
{
 
154
  m_queue.flush();
 
155
}
 
156
void join()
 
157
{
 
158
  m_thread.join();
 
159
}
 
160
public:
 
161
class Transaction
 
162
{
 
163
public:
 
164
Transaction(Parser* a_parser) :
 
165
m_parser(a_parser)
 
166
, m_current(m_parser->begin_transaction())
 
167
, m_complete(false)
 
168
{
 
169
}
 
170
~Transaction()
 
171
{
 
172
  if (false == m_complete)
 
173
  {
 
174
    rollback();
 
175
  }
 
176
}
 
177
void complete()
 
178
{
 
179
  if (m_complete)
 
180
  {
 
181
    std::ostringstream message;
 
182
    message << "Transaction " << m_current << " already complete";
 
183
    throw Error(message.str());
 
184
  }
 
185
  else
 
186
  {
 
187
    m_complete= true;
 
188
  }
 
189
}
 
190
bool commit()
 
191
{
 
192
  if (m_complete)
 
193
  {
 
194
    complete();
 
195
    return false;
 
196
  }
 
197
  else
 
198
  {
 
199
    m_parser->commit_transaction(m_current);
 
200
    complete();
 
201
    return true;
 
202
  }
 
203
}
 
204
bool rollback()
 
205
{
 
206
  if (m_complete)
 
207
  {
 
208
    complete();
 
209
    return false;
 
210
  }
 
211
  else
 
212
  {
 
213
    m_parser->rollback_transaction(m_current);
 
214
    complete();
 
215
    return false;
 
216
  }
 
217
}
 
218
private:
 
219
Transaction& operator=(Transaction const &other) { return *this; }
 
220
private:
 
221
Parser *m_parser;
 
222
unsigned int m_current;
 
223
bool m_complete;
 
224
};
 
225
friend class Transaction;
 
226
private:
 
227
unsigned int begin_transaction()
 
228
{
 
229
  /*unsigned int result= m_next_transaction_id++;
226
230
    m_stack.push_back(std::make_pair(result, m_current));
227
231
    return result;*/
228
 
    return m_current;
229
 
  }
230
 
  std::ostream& trace(std::ostream& s) const
231
 
  {
232
 
    //s << "Parser buffer: " << m_buffer << "\n";
233
 
    std::string line;
234
 
    line= m_buffer.data() + m_current;
235
 
    line.resize(m_end - m_current);
236
 
    s << "Parser line: '" << line << "'\n";
237
 
    s << "Parser current: " << m_current;
238
 
    s << " end: " << m_end;
239
 
    s << " eof: " << (m_eof ? "true" : "false");
 
232
  return m_current;
 
233
}
 
234
std::ostream& trace(std::ostream& s) const
 
235
{
 
236
  //s << "Parser buffer: " << m_buffer << "\n";
 
237
  std::string line;
 
238
  line= m_buffer.data() + m_current;
 
239
  line.resize(m_end - m_current);
 
240
  s << "Parser line: '" << line << "'\n";
 
241
  s << "Parser current: " << m_current;
 
242
  s << " end: " << m_end;
 
243
  s << " eof: " << (m_eof ? "true" : "false");
240
244
/*    s << "\nParser transaction stack:[";
241
 
    for(Stack::const_iterator i= m_stack.begin(), end= m_stack.end(); end != i; ++i)
 
245
      for(Stack::const_iterator i= m_stack.begin(), end= m_stack.end(); end != i; ++i)
242
246
      s << " (" << i->first << "," << i->second << ")";
243
247
      s << " ]";*/
244
 
    return s;
245
 
  }
246
 
  void commit_transaction(unsigned int)
247
 
  {
 
248
  return s;
 
249
}
 
250
void commit_transaction(unsigned int)
 
251
{
248
252
/*    if (m_stack.empty()
249
 
        || (false == m_stack.empty() && id != m_stack.back().first))
 
253
      || (false == m_stack.empty() && id != m_stack.back().first))
250
254
      {
251
 
        std::ostringstream message;
252
 
        trace(message) << "\n[ERROR] Can't commit transaction: " << id;
253
 
        throw Error(message.str());
 
255
      std::ostringstream message;
 
256
      trace(message) << "\n[ERROR] Can't commit transaction: " << id;
 
257
      throw Error(message.str());
254
258
      }
255
259
      m_stack.pop_back();*/
256
 
  }
 
260
}
257
261
void rollback_transaction(unsigned int to)
258
 
  {
 
262
{
259
263
/*     if (m_stack.empty()
260
 
        || (false == m_stack.empty() && id != m_stack.back().first))
261
 
      {
262
 
        std::ostringstream message;
263
 
        trace(message) << "\n[ERROR] Can't rollback transaction: " << id;
264
 
        throw Error(message.str());
265
 
        }
266
 
    m_current= m_stack.back().second;
267
 
    m_stack.pop_back();*/
268
 
    m_current= to;
269
 
  }
 
264
       || (false == m_stack.empty() && id != m_stack.back().first))
 
265
       {
 
266
       std::ostringstream message;
 
267
       trace(message) << "\n[ERROR] Can't rollback transaction: " << id;
 
268
       throw Error(message.str());
 
269
       }
 
270
       m_current= m_stack.back().second;
 
271
       m_stack.pop_back();*/
 
272
  m_current= to;
 
273
}
270
274
#define has_count(count) (m_current + count < m_end + 1)
271
275
#define has_char() has_count(sizeof(char))
272
276
#define has_word(word) has_count(sizeof(word) - 1)
273
 
  char current()
 
277
char current()
 
278
{
 
279
  if (!has_char())
274
280
  {
275
 
    if (!has_char())
276
 
      {
277
 
        std::ostringstream message;
278
 
        trace(message) << "\nCan't call 'current()'";
279
 
        throw Error(message.str());
280
 
      }
281
 
    return m_buffer.data()[m_current];
 
281
    std::ostringstream message;
 
282
    trace(message) << "\nCan't call 'current()'";
 
283
    throw Error(message.str());
282
284
  }
 
285
  return m_buffer.data()[m_current];
 
286
}
283
287
#define is_char(c) (has_char() && (current() == c))
284
288
/*const char*  current_word(const char* word, std::size_t word_strlen, std::size_t word_sizeof, std::size_t line)
285
 
{
 
289
  {
286
290
  char buffer[1024];
287
291
  std::size_t buffer_length= std::min(sizeof(buffer) - 1, m_buffer.length());
288
292
  memcpy(buffer, m_buffer.data(), buffer_length);
293
297
  current[current_length]= 0;
294
298
  char output[4096];
295
299
  snprintf(output, sizeof(output), "\nword: '%s' strlen: %u sizeof: %u line: %u\nbuffer: '%s'...\ncurrent: '%s'\n"
296
 
           , word, (unsigned int)word_strlen, (unsigned int)word_sizeof, (unsigned int)line, buffer, current);
 
300
  , word, (unsigned int)word_strlen, (unsigned int)word_sizeof, (unsigned int)line, buffer, current);
297
301
  std::cout << (const char*)output << std::flush;
298
302
  return m_buffer.data() + m_current;
299
303
  }
302
306
#define is_digit() (has_char() && ('0' <= current()) && (current() <= '9'))
303
307
#define is_alpha() (has_char() && (('a' <= current() && (current() <= 'z')) \
304
308
                                   || (('A' <= current()) && (current() <= 'Z'))))
305
 
  bool do_skip(std::size_t count)
306
 
  {
307
 
    ASSERT(has_count(count));
308
 
    m_current += count;
309
 
    return true;
310
 
  }
311
 
  bool can_not_do_skip(std::size_t count) const
312
 
  {
313
 
    std::ostringstream message;
314
 
    trace(message) << "\nCan't skip<" << count << ">'";
315
 
    throw Error(message.str());
316
 
  }
 
309
bool do_skip(std::size_t count)
 
310
{
 
311
  ASSERT(has_count(count));
 
312
  m_current += count;
 
313
  return true;
 
314
}
 
315
bool can_not_do_skip(std::size_t count) const
 
316
{
 
317
  std::ostringstream message;
 
318
  trace(message) << "\nCan't skip<" << count << ">'";
 
319
  throw Error(message.str());
 
320
}
317
321
#define skip_char()       ((has_char()       && do_skip(sizeof(char)  )) || can_not_do_skip(sizeof(char)  ))
318
322
#define skip_word(word)   ((has_word(word)   && do_skip(sizeof(word)-1)) || can_not_do_skip(sizeof(word)-1))
319
323
#define skip_count(count) ((has_count(count) && do_skip(count         )) || can_not_do_skip(count         ))
320
324
#define try_char(c)    (is_char(c)    && skip_char())
321
325
#define try_word(word) (is_word(word) && skip_word(word))
322
326
#define is_space() (is_char(' ') || is_char('\t') || is_char('\f') || is_char('\r'))
323
 
  bool try_space()
 
327
bool try_space()
 
328
{
 
329
  if (!is_space())
 
330
    return false;
 
331
  do
324
332
  {
325
 
    if (!is_space())
326
 
      return false;
327
 
    do
328
 
      {
329
 
        skip_char();
330
 
      }
331
 
    while(is_space());
332
 
    return true;
 
333
    skip_char();
333
334
  }
 
335
  while(is_space());
 
336
  return true;
 
337
}
334
338
#define try_set_word() (try_word("set ") || try_word("SET "))
335
339
#define try_use_word() (try_word("use ") || try_word("USE "))
336
 
  bool eof()
337
 
  {
338
 
    return m_eof && (m_end + 1 >= m_buffer.length());
339
 
  }
340
 
  bool add_line()
341
 
  {
342
 
    if (eof())
343
 
      return false;
344
 
    std::size_t new_end;
345
 
    if (m_buffer.length() > m_end && m_buffer.data()[m_end] == '\n')
346
 
      new_end= m_buffer.index('\n', m_end + 1);
 
340
bool eof()
 
341
{
 
342
  return m_eof && (m_end + 1 >= m_buffer.length());
 
343
}
 
344
bool add_line()
 
345
{
 
346
  if (eof())
 
347
    return false;
 
348
  std::size_t new_end;
 
349
  if (m_buffer.length() > m_end && m_buffer.data()[m_end] == '\n')
 
350
    new_end= m_buffer.index('\n', m_end + 1);
 
351
  else
 
352
    new_end= m_buffer.index('\n', 0);
 
353
  while(new_end == m_buffer.length())
 
354
  {
 
355
    m_page.clear();
 
356
    if (m_queue.pop(m_page))
 
357
    {
 
358
      if( m_page.length())
 
359
      {
 
360
        m_buffer.add_last(m_page);
 
361
        new_end= m_buffer.index('\n', new_end);
 
362
      }
 
363
    }
347
364
    else
348
 
      new_end= m_buffer.index('\n', 0);
349
 
    while(new_end == m_buffer.length())
350
 
      {
351
 
        m_page.clear();
352
 
        if (m_queue.pop(m_page))
353
 
          {
354
 
            if( m_page.length())
355
 
              {
356
 
                m_buffer.add_last(m_page);
357
 
                new_end= m_buffer.index('\n', new_end);
358
 
              }
359
 
          }
360
 
        else
361
 
          {
362
 
            m_eof= true;
363
 
            if ((m_buffer.length() > 0))
364
 
              {
365
 
                if ((m_buffer.data()[new_end -1] != '\n'))
366
 
                  m_buffer.add_last('\n');
367
 
                new_end= m_buffer.index('\n', new_end);
368
 
                break;
369
 
              }
370
 
            else
371
 
              return false;
372
 
          }
373
 
      }
374
 
    m_end= new_end;
375
 
    return true;
376
 
  }
377
 
  bool skip_line()
378
 
  {
379
 
    m_buffer.remove_first(m_end+1);
380
 
    m_current= m_end= 0;
381
 
    return add_line();
382
 
  }
383
 
  bool parse_query(bool is_admin)
384
 
  {
385
 
    Query_Parse_State state=  query_parse_state_begin(eQuery_Detect_Correct);
386
 
    std::size_t query_length= 0;
387
 
    do
388
 
      {
389
 
        const char* from= m_buffer.data() + m_current + query_length;
390
 
        query_length += query_parse(state, from, m_end - (m_current + query_length));
391
 
        if (end_of_query(state) || is_admin)
392
 
          {
393
 
            sql.set(m_buffer.data() + m_current, query_length);
394
 
            if (sql.data()[sql.length() - 1] != ';')
395
 
              sql.add_last(';');
396
 
            if (is_admin)
397
 
              m_result.admin(sql);
398
 
            else
399
 
              m_result.query(sql);
400
 
            skip_count(query_length);
401
 
            return true;
402
 
          }
403
 
        if (query_length != m_end)
404
 
          {
405
 
            std::ostringstream message;
406
 
            trace(message) << "\nBug in query parser";
407
 
            throw Error(message.str());
408
 
          }
409
 
        if (!add_line())
410
 
          {
411
 
            // EOF
412
 
            return true;
413
 
          }
414
 
      }
415
 
    while(true);
416
 
  }
417
 
  bool try_number()
418
 
  {
419
 
    if (!is_digit())
420
 
      return false;
421
 
    do
422
 
      {
423
 
        skip_char();
424
 
      }
425
 
    while(is_digit());
426
 
    return true;
427
 
  }
428
 
  bool try_float()
429
 
  {
430
 
    Transaction t(this);
431
 
    return try_number() && try_char('.') && try_number() && t.commit();
432
 
  }
433
 
  bool parse(long long int &value)
434
 
  {
435
 
    if (!is_digit())
436
 
      return false;
437
 
    value= 0;
438
 
    do
439
 
    {
440
 
      value= value * 10 + (current() - '0');
441
 
      skip_char();
442
 
    }
443
 
    while (is_digit());
444
 
    return true;
445
 
  }
446
 
  bool parse(Thread_Id& value)
447
 
  {
448
 
    long long int lli;
449
 
    if (!parse(lli))
450
 
      return false;
451
 
    value= static_cast<Thread_Id>(lli);
452
 
    return true;
453
 
  }
 
365
    {
 
366
      m_eof= true;
 
367
      if ((m_buffer.length() > 0))
 
368
      {
 
369
        if ((m_buffer.data()[new_end -1] != '\n'))
 
370
          m_buffer.add_last('\n');
 
371
        new_end= m_buffer.index('\n', new_end);
 
372
        break;
 
373
      }
 
374
      else
 
375
        return false;
 
376
    }
 
377
  }
 
378
  m_end= new_end;
 
379
  return true;
 
380
}
 
381
bool skip_line()
 
382
{
 
383
  m_buffer.remove_first(m_end+1);
 
384
  m_current= m_end= 0;
 
385
  return add_line();
 
386
}
 
387
bool parse_query(bool is_admin)
 
388
{
 
389
  Query_Parse_State state=  query_parse_state_begin(eQuery_Detect_Correct);
 
390
  std::size_t query_length= 0;
 
391
  do
 
392
  {
 
393
    const char* from= m_buffer.data() + m_current + query_length;
 
394
    query_length += query_parse(state, from, m_end - (m_current + query_length));
 
395
    if (end_of_query(state) || is_admin)
 
396
    {
 
397
      sql.set(m_buffer.data() + m_current, query_length);
 
398
      if (sql.data()[sql.length() - 1] != ';')
 
399
        sql.add_last(';');
 
400
      if (is_admin)
 
401
        m_result.admin(sql);
 
402
      else
 
403
        m_result.query(sql);
 
404
      skip_count(query_length);
 
405
      return true;
 
406
    }
 
407
    if (query_length != m_end)
 
408
    {
 
409
      std::ostringstream message;
 
410
      trace(message) << "\nBug in query parser";
 
411
      throw Error(message.str());
 
412
    }
 
413
    if (!add_line())
 
414
    {
 
415
      // EOF
 
416
      return true;
 
417
    }
 
418
  }
 
419
  while(true);
 
420
}
 
421
bool try_number()
 
422
{
 
423
  if (!is_digit())
 
424
    return false;
 
425
  do
 
426
  {
 
427
    skip_char();
 
428
  }
 
429
  while(is_digit());
 
430
  return true;
 
431
}
 
432
bool try_float()
 
433
{
 
434
  Transaction t(this);
 
435
  return try_number() && try_char('.') && try_number() && t.commit();
 
436
}
 
437
bool parse(long long int &value)
 
438
{
 
439
  if (!is_digit())
 
440
    return false;
 
441
  value= 0;
 
442
  do
 
443
  {
 
444
    value= value * 10 + (current() - '0');
 
445
    skip_char();
 
446
  }
 
447
  while (is_digit());
 
448
  return true;
 
449
}
 
450
bool parse(Thread_Id& value)
 
451
{
 
452
  long long int lli;
 
453
  if (!parse(lli))
 
454
    return false;
 
455
  value= static_cast<Thread_Id>(lli);
 
456
  return true;
 
457
}
454
458
#define declare_assignment(key)                                         \
455
459
  bool try_assignment_##key (long long int &value)                      \
456
 
  {                                                                     \
457
 
    Transaction t(this);                                                \
458
 
    return try_word( #key ) && try_char('=') && parse(value) && t.commit(); \
459
 
  }
 
460
{                                                                       \
 
461
  Transaction t(this);                                                  \
 
462
  return try_word( #key ) && try_char('=') && parse(value) && t.commit(); \
 
463
}
460
464
#define declare_single_assignment(key)                                  \
461
465
  bool try_single_assignment_##key (long long int &value)               \
462
 
  {                                                                     \
463
 
    Transaction t(this);                                                \
464
 
    return try_space() && try_set_word() && try_assignment_##key (value) && try_char(';') && t.commit(); \
465
 
  }
466
 
  declare_assignment(timestamp)
467
 
  declare_assignment(insert_id)
468
 
  declare_assignment(last_insert_id)
469
 
  declare_single_assignment(timestamp)
470
 
  declare_single_assignment(insert_id)
471
 
  declare_single_assignment(last_insert_id)
 
466
{                                                                       \
 
467
  Transaction t(this);                                                  \
 
468
  return try_space() && try_set_word() && try_assignment_##key (value) && try_char(';') && t.commit(); \
 
469
}
 
470
declare_assignment(timestamp)
 
471
declare_assignment(insert_id)
 
472
declare_assignment(last_insert_id)
 
473
declare_single_assignment(timestamp)
 
474
declare_single_assignment(insert_id)
 
475
declare_single_assignment(last_insert_id)
472
476
  
473
 
  bool try_semicolon_set_list()
474
 
  {
475
 
    //CHECK("try_semicolon_set_list()");
476
 
    Transaction t(this);
477
 
    bool result;
478
 
    if (try_single_assignment_last_insert_id(last_insert_id))
479
 
      {
480
 
        result= true;
481
 
        m_result.last_insert_id(last_insert_id);
482
 
      }
483
 
    if (try_single_assignment_insert_id(insert_id))
484
 
      {
485
 
        result= true;
486
 
        m_result.insert_id(insert_id);
487
 
      }
488
 
    if (try_single_assignment_timestamp(timestamp))
489
 
      {
490
 
        result= true;
491
 
        m_result.timestamp(timestamp);
492
 
      }
493
 
    return result && t.commit();
494
 
  }
495
 
  bool try_comma_set_list()
496
 
  {
497
 
    //CHECK("try_comma_set_list()");
498
 
    Transaction t(this);
499
 
    bool result= false;
500
 
    bool comma_need= false;
501
 
    if (!try_set_word())
502
 
      return false;
503
 
    if (try_assignment_last_insert_id(last_insert_id))
504
 
      {
505
 
        result= comma_need= true;
506
 
        m_result.last_insert_id(last_insert_id);
507
 
      }
508
 
    if (comma_need)
509
 
      {
510
 
        if (try_char(','))
511
 
          comma_need= false;
512
 
      }
513
 
    if (try_assignment_insert_id(insert_id))
514
 
     { 
515
 
        result= comma_need= true;
516
 
        m_result.insert_id(insert_id);
517
 
      }
518
 
    if (comma_need)
519
 
      {
520
 
        if (try_char(','))
521
 
          comma_need= false;
522
 
      }
523
 
    if (try_assignment_timestamp(timestamp))
524
 
      {
525
 
        result= true;
526
 
        m_result.timestamp(timestamp);
527
 
      }
528
 
    return result && try_char(';') && t.commit();
529
 
  }
530
 
  bool try_use()
531
 
  {
532
 
    Transaction t(this);
533
 
    if (!try_use_word())
534
 
      return false;
535
 
    database.clear();
536
 
    do
537
 
      {
538
 
        database.add_last(current());
539
 
        skip_char();
540
 
      }
541
 
    while (is_alpha());
542
 
    if (database.length() == 0)
543
 
      return false;
544
 
    if (!try_char(';'))
545
 
      return false;
546
 
    m_result.database(database);
547
 
    return t.commit();
548
 
  }
 
477
bool try_semicolon_set_list()
 
478
{
 
479
  //CHECK("try_semicolon_set_list()");
 
480
  Transaction t(this);
 
481
  bool result;
 
482
  if (try_single_assignment_last_insert_id(last_insert_id))
 
483
  {
 
484
    result= true;
 
485
    m_result.last_insert_id(last_insert_id);
 
486
  }
 
487
  if (try_single_assignment_insert_id(insert_id))
 
488
  {
 
489
    result= true;
 
490
    m_result.insert_id(insert_id);
 
491
  }
 
492
  if (try_single_assignment_timestamp(timestamp))
 
493
  {
 
494
    result= true;
 
495
    m_result.timestamp(timestamp);
 
496
  }
 
497
  return result && t.commit();
 
498
}
 
499
bool try_comma_set_list()
 
500
{
 
501
  //CHECK("try_comma_set_list()");
 
502
  Transaction t(this);
 
503
  bool result= false;
 
504
  bool comma_need= false;
 
505
  if (!try_set_word())
 
506
    return false;
 
507
  if (try_assignment_last_insert_id(last_insert_id))
 
508
  {
 
509
    result= comma_need= true;
 
510
    m_result.last_insert_id(last_insert_id);
 
511
  }
 
512
  if (comma_need)
 
513
  {
 
514
    if (try_char(','))
 
515
      comma_need= false;
 
516
  }
 
517
  if (try_assignment_insert_id(insert_id))
 
518
  { 
 
519
    result= comma_need= true;
 
520
    m_result.insert_id(insert_id);
 
521
  }
 
522
  if (comma_need)
 
523
  {
 
524
    if (try_char(','))
 
525
      comma_need= false;
 
526
  }
 
527
  if (try_assignment_timestamp(timestamp))
 
528
  {
 
529
    result= true;
 
530
    m_result.timestamp(timestamp);
 
531
  }
 
532
  return result && try_char(';') && t.commit();
 
533
}
 
534
bool try_use()
 
535
{
 
536
  Transaction t(this);
 
537
  if (!try_use_word())
 
538
    return false;
 
539
  database.clear();
 
540
  do
 
541
  {
 
542
    database.add_last(current());
 
543
    skip_char();
 
544
  }
 
545
  while (is_alpha());
 
546
  if (database.length() == 0)
 
547
    return false;
 
548
  if (!try_char(';'))
 
549
    return false;
 
550
  m_result.database(database);
 
551
  return t.commit();
 
552
}
549
553
#define try_time()      try_word("# Time:")
550
554
#define try_user_host() try_word("# User@Host")
551
 
  bool try_query_time_and_thread_id()
552
 
  {
553
 
    //CHECK("try_query_time()");
554
 
    Transaction t(this);
555
 
    if (try_word("# Query_time:") && try_space() && try_float()
556
 
        && try_space() && try_word("Lock_time:")     && try_space() && try_float()
557
 
        && try_space() && try_word("Rows_sent:")     && try_space() && try_number()
558
 
        && try_space() && try_word("Rows_examined:") && try_space() && try_number()
559
 
        && try_space() && try_word("Thread_id:")     && try_space() && parse(thread_id))
560
 
      {
561
 
        m_result.thread_id(thread_id);
562
 
        return t.commit();
563
 
      }
564
 
    else
565
 
      return false;
566
 
  }
567
 
  bool try_query_time()
568
 
  {
569
 
    //CHECK("try_query_time()");
570
 
    return is_word("# Query_time:");
571
 
  }
572
 
  bool try_thread_id()
573
 
  {
574
 
    Transaction t(this);
575
 
    //    if (try_char('#') && try_space() && try_word("Thread_id:") && try_space() && parse(thread_id))
576
 
    if (try_word("# Thread_id:") && try_space() && parse(thread_id))
577
 
      {
578
 
        m_result.thread_id(thread_id);
579
 
        return t.commit();
580
 
      }
581
 
    else
582
 
      return false;
583
 
  }
584
 
  bool try_admin()
585
 
  {
586
 
    //CHECK("try_admin()");
587
 
    Transaction t(this);
588
 
    return try_word("# administrator command: ") && parse_query(true) && t.commit();
589
 
  }
590
 
  bool try_query()
591
 
  {
592
 
    //CHECK("try_query()");
593
 
    return parse_query(false);
594
 
  }
595
 
  void work()
 
555
bool try_query_time_and_thread_id()
 
556
{
 
557
  //CHECK("try_query_time()");
 
558
  Transaction t(this);
 
559
  if (try_word("# Query_time:") && try_space() && try_float()
 
560
      && try_space() && try_word("Lock_time:")     && try_space() && try_float()
 
561
      && try_space() && try_word("Rows_sent:")     && try_space() && try_number()
 
562
      && try_space() && try_word("Rows_examined:") && try_space() && try_number()
 
563
      && try_space() && try_word("Thread_id:")     && try_space() && parse(thread_id))
 
564
  {
 
565
    m_result.thread_id(thread_id);
 
566
    return t.commit();
 
567
  }
 
568
  else
 
569
    return false;
 
570
}
 
571
bool try_query_time()
 
572
{
 
573
  //CHECK("try_query_time()");
 
574
  return is_word("# Query_time:");
 
575
}
 
576
bool try_thread_id()
 
577
{
 
578
  Transaction t(this);
 
579
  //    if (try_char('#') && try_space() && try_word("Thread_id:") && try_space() && parse(thread_id))
 
580
  if (try_word("# Thread_id:") && try_space() && parse(thread_id))
 
581
  {
 
582
    m_result.thread_id(thread_id);
 
583
    return t.commit();
 
584
  }
 
585
  else
 
586
    return false;
 
587
}
 
588
bool try_admin()
 
589
{
 
590
  //CHECK("try_admin()");
 
591
  Transaction t(this);
 
592
  return try_word("# administrator command: ") && parse_query(true) && t.commit();
 
593
}
 
594
bool try_query()
 
595
{
 
596
  //CHECK("try_query()");
 
597
  return parse_query(false);
 
598
}
 
599
void work()
 
600
{
 
601
  try
596
602
  {
597
603
    try
598
 
      {
599
 
        try
600
 
          {
601
 
            m_result.start();
602
 
            parse_all();
603
 
            m_result.flush();
604
 
          }
605
 
        catch(std::exception const &e)
606
 
          {
607
 
            std::cerr<< "Parser: " << e.what() << std::endl;
608
 
            m_result.stop();
609
 
          }
610
 
        catch(...)
611
 
          {
612
 
            std::cerr << "Parser: unhanled exception" << std::endl;
613
 
            m_result.stop();
614
 
          }
615
 
      }
 
604
    {
 
605
      m_result.start();
 
606
      parse_all();
 
607
      m_result.flush();
 
608
    }
 
609
    catch(std::exception const &e)
 
610
    {
 
611
      std::cerr<< "Parser: " << e.what() << std::endl;
 
612
      m_result.stop();
 
613
    }
616
614
    catch(...)
617
 
      {
618
 
        m_result.wait();
619
 
        throw;
620
 
      }
621
 
    m_result.wait();
 
615
    {
 
616
      std::cerr << "Parser: unhanled exception" << std::endl;
 
617
      m_result.stop();
 
618
    }
622
619
  }
623
 
  void parse_all()
 
620
  catch(...)
624
621
  {
625
 
    if (!add_line())
626
 
        return;
 
622
    m_result.join();
 
623
    throw;
 
624
  }
 
625
  m_result.join();
 
626
}
 
627
void parse_all()
 
628
{
 
629
  if (!add_line())
 
630
    return;
627
631
#define DO_SKIP() do { if (eof() || !skip_line()) { return; }; } while(false)
628
 
    while (!is_char('#'))
629
 
      {
630
 
        DO_SKIP();
631
 
      }
632
 
    while(true)
633
 
      {
634
 
        while (!is_char('#'))
635
 
          {
636
 
            if (!skip_line())
637
 
              {
638
 
                return;
639
 
              }
640
 
          }
641
 
        while (is_char('#'))
642
 
          {
643
 
            if (try_time())
644
 
              { DO_SKIP(); }
645
 
            if (try_user_host())
646
 
              { DO_SKIP(); }
647
 
            if (try_thread_id())
648
 
              {
649
 
                DO_SKIP();
650
 
                break;
651
 
              }
652
 
            else if (try_query_time_and_thread_id())
653
 
              {
654
 
                DO_SKIP();
655
 
                break;
656
 
              }
657
 
            else if (try_query_time())
658
 
              {
659
 
                DO_SKIP();
660
 
                break;
661
 
              }
662
 
            else
663
 
              {
664
 
                DO_SKIP();
665
 
              }
666
 
          }
667
 
        bool admin_found= false;
668
 
        while(is_char('#'))
669
 
          {
670
 
            admin_found= try_admin();
671
 
            DO_SKIP();
672
 
            if (admin_found)
673
 
              {
674
 
                break;
675
 
              }
676
 
          }
677
 
        if (!admin_found)
678
 
          {
679
 
            bool set_found= false;
680
 
            if (try_use())
681
 
              {
682
 
                if (is_space())
683
 
                  set_found=try_semicolon_set_list();
684
 
                DO_SKIP();
685
 
              }
686
 
            if (!set_found && try_comma_set_list())
687
 
              {
688
 
                DO_SKIP();
689
 
              }
690
 
            if (try_admin())
691
 
              {
692
 
                DO_SKIP();
693
 
              }
694
 
            else if (try_query())
695
 
              {
696
 
                DO_SKIP();
697
 
              }
698
 
          }
699
 
        while (!is_char('#'))
700
 
          {
701
 
            DO_SKIP();
702
 
          }
 
632
  while (!is_char('#'))
 
633
  {
 
634
    DO_SKIP();
 
635
  }
 
636
  while(true)
 
637
  {
 
638
    while (!is_char('#'))
 
639
    {
 
640
      if (!skip_line())
 
641
      {
 
642
        return;
 
643
      }
 
644
    }
 
645
    while (is_char('#'))
 
646
    {
 
647
      if (try_time())
 
648
      { DO_SKIP(); }
 
649
      if (try_user_host())
 
650
      { DO_SKIP(); }
 
651
      if (try_thread_id())
 
652
      {
 
653
        DO_SKIP();
 
654
        break;
 
655
      }
 
656
      else if (try_query_time_and_thread_id())
 
657
      {
 
658
        DO_SKIP();
 
659
        break;
 
660
      }
 
661
      else if (try_query_time())
 
662
      {
 
663
        DO_SKIP();
 
664
        break;
 
665
      }
 
666
      else
 
667
      {
 
668
        DO_SKIP();
 
669
      }
 
670
    }
 
671
    bool admin_found= false;
 
672
    while(is_char('#'))
 
673
    {
 
674
      admin_found= try_admin();
 
675
      DO_SKIP();
 
676
      if (admin_found)
 
677
      {
 
678
        break;
 
679
      }
 
680
    }
 
681
    if (!admin_found)
 
682
    {
 
683
      bool set_found= false;
 
684
      if (try_use())
 
685
      {
 
686
        if (is_space())
 
687
          set_found=try_semicolon_set_list();
 
688
        DO_SKIP();
 
689
      }
 
690
      if (!set_found && try_comma_set_list())
 
691
      {
 
692
        DO_SKIP();
 
693
      }
 
694
      if (try_admin())
 
695
      {
 
696
        DO_SKIP();
 
697
      }
 
698
      else if (try_query())
 
699
      {
 
700
        DO_SKIP();
 
701
      }
 
702
    }
 
703
    while (!is_char('#'))
 
704
    {
 
705
      DO_SKIP();
 
706
    }
703
707
#undef DO_SKIP
704
 
      }
705
708
  }
 
709
}
706
710
private:
707
 
  Thread      m_thread;
708
 
  String      m_page;
709
 
  String      m_buffer;
710
 
  std::size_t m_current;
711
 
  std::size_t m_end;
712
 
  bool m_eof;
 
711
Thread      m_thread;
 
712
String      m_page;
 
713
String      m_buffer;
 
714
std::size_t m_current;
 
715
std::size_t m_end;
 
716
bool m_eof;
713
717
private:
714
718
//unsigned int m_next_transaction_id;
715
719
//  typedef std::vector< std::pair< unsigned int, std::size_t > > Stack;
716
720
//Stack m_stack;
717
721
private:
718
 
  String_Queue &m_queue;
719
 
  Result       &m_result;
 
722
String_Queue &m_queue;
 
723
Result       &m_result;
720
724
private:
721
 
  Database database;
722
 
  String   sql;
723
 
  long long int timestamp;
724
 
  long long int insert_id;
725
 
  long long int last_insert_id;
726
 
  unsigned int  thread_id;
727
 
};
 
725
Database database;
 
726
String   sql;
 
727
long long int timestamp;
 
728
long long int insert_id;
 
729
long long int last_insert_id;
 
730
unsigned int  thread_id;
 
731
  };
728
732
 
729
733
#endif //  __PERCONA_PLAYBACK__SLOW_LOG_PARSER__H__