~percona-toolkit-dev/percona-toolkit/release-2.2.2

« back to all changes in this revision

Viewing changes to lib/LogSplitter.pm

  • Committer: Daniel Nichter
  • Date: 2011-06-24 17:22:06 UTC
  • Revision ID: daniel@percona.com-20110624172206-c7q4s4ad6r260zz6
Add lib/, t/lib/, and sandbox/.  All modules are updated and passing on MySQL 5.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# This program is copyright 2008-2011 Percona Inc.
 
2
# Feedback and improvements are welcome.
 
3
#
 
4
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
 
5
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
 
6
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
 
7
#
 
8
# This program is free software; you can redistribute it and/or modify it under
 
9
# the terms of the GNU General Public License as published by the Free Software
 
10
# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
 
11
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
 
12
# licenses.
 
13
#
 
14
# You should have received a copy of the GNU General Public License along with
 
15
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
 
16
# Place, Suite 330, Boston, MA  02111-1307  USA.
 
17
# ###########################################################################
 
18
# LogSplitter package $Revision: 7177 $
 
19
# ###########################################################################
 
20
 
 
21
# Package: LogSplitter
 
22
# LogSplitter splits MySQL query logs by sessions.
 
23
{
 
24
package LogSplitter;
 
25
 
 
26
use strict;
 
27
use warnings FATAL => 'all';
 
28
use English qw(-no_match_vars);
 
29
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
 
30
 
 
31
use Data::Dumper;
 
32
$Data::Dumper::Indent    = 1;
 
33
$Data::Dumper::Sortkeys  = 1;
 
34
$Data::Dumper::Quotekeys = 0;
 
35
 
 
36
my $oktorun = 1;
 
37
 
 
38
sub new {
 
39
   my ( $class, %args ) = @_;
 
40
   foreach my $arg ( qw(attribute base_dir parser session_files) ) {
 
41
      die "I need a $arg argument" unless $args{$arg};
 
42
   }
 
43
 
 
44
   # TODO: this is probably problematic on Windows
 
45
   $args{base_dir} .= '/' if substr($args{base_dir}, -1, 1) ne '/';
 
46
 
 
47
   if ( $args{split_random} ) {
 
48
      MKDEBUG && _d('Split random');
 
49
      $args{attribute} = '_sessionno';  # set round-robin 1..session_files
 
50
   }
 
51
 
 
52
   my $self = {
 
53
      # %args will override these default args if given explicitly.
 
54
      base_file_name    => 'session',
 
55
      max_dirs          => 1_000,
 
56
      max_files_per_dir => 5_000,
 
57
      max_sessions      => 5_000_000,  # max_dirs * max_files_per_dir
 
58
      merge_sessions    => 1,
 
59
      session_files     => 64,
 
60
      quiet             => 0,
 
61
      verbose           => 0,
 
62
      max_open_files    => 1_000,
 
63
      close_lru_files   => 100,
 
64
      # Override default args above.
 
65
      %args,
 
66
      # These args cannot be overridden.
 
67
      n_dirs_total       => 0,  # total number of dirs created
 
68
      n_files_total      => 0,  # total number of session files created
 
69
      n_files_this_dir   => -1, # number of session files in current dir
 
70
      session_fhs        => [], # filehandles for each session
 
71
      n_open_fhs         => 0,  # current number of open session filehandles
 
72
      n_events_total     => 0,  # total number of events in log
 
73
      n_events_saved     => 0,  # total number of events saved
 
74
      n_sessions_skipped => 0,  # total number of sessions skipped
 
75
      n_sessions_saved   => 0,  # number of sessions saved
 
76
      sessions           => {}, # sessions data store
 
77
      created_dirs       => [],
 
78
   };
 
79
 
 
80
   MKDEBUG && _d('new LogSplitter final args:', Dumper($self));
 
81
   return bless $self, $class;
 
82
}
 
83
 
 
84
sub split {
 
85
   my ( $self, @logs ) = @_;
 
86
   $oktorun = 1; # True as long as we haven't created too many
 
87
                 # session files or too many dirs and files
 
88
 
 
89
   my $callbacks = $self->{callbacks};
 
90
 
 
91
   my $next_sessionno;
 
92
   if ( $self->{split_random} ) {
 
93
      # round-robin iterator
 
94
      $next_sessionno = make_rr_iter(1, $self->{session_files});
 
95
   }
 
96
 
 
97
   if ( @logs == 0 ) {
 
98
      MKDEBUG && _d('Implicitly reading STDIN because no logs were given');
 
99
      push @logs, '-';
 
100
   }
 
101
 
 
102
   # Split all the log files.
 
103
   my $lp = $self->{parser};
 
104
   LOG:
 
105
   foreach my $log ( @logs ) {
 
106
      last unless $oktorun;
 
107
      next unless defined $log;
 
108
 
 
109
      if ( !-f $log && $log ne '-' ) {
 
110
         warn "Skipping $log because it is not a file";
 
111
         next LOG;
 
112
      }
 
113
      my $fh;
 
114
      if ( $log eq '-' ) {
 
115
         $fh = *STDIN;
 
116
      }
 
117
      else {
 
118
         if ( !open $fh, "<", $log ) {
 
119
            warn "Cannot open $log: $OS_ERROR\n";
 
120
            next LOG;
 
121
         }
 
122
      }
 
123
 
 
124
      MKDEBUG && _d('Splitting', $log);
 
125
      my $event           = {};
 
126
      my $more_events     = 1;
 
127
      my $more_events_sub = sub { $more_events = $_[0]; };
 
128
      EVENT:
 
129
      while ( $oktorun ) {
 
130
         $event = $lp->parse_event(
 
131
            next_event => sub { return <$fh>;    },
 
132
            tell       => sub { return tell $fh; },
 
133
            oktorun => $more_events_sub,
 
134
         );
 
135
         if ( $event ) {
 
136
            $self->{n_events_total}++;
 
137
            if ( $self->{split_random} ) {
 
138
               $event->{_sessionno} = $next_sessionno->();
 
139
            }
 
140
            if ( $callbacks ) {
 
141
               foreach my $callback ( @$callbacks ) {
 
142
                  $event = $callback->($event);
 
143
                  last unless $event;
 
144
               }
 
145
            }
 
146
            $self->_save_event($event) if $event;
 
147
         }
 
148
         if ( !$more_events ) {
 
149
            MKDEBUG && _d('Done parsing', $log);
 
150
            close $fh;
 
151
            next LOG;
 
152
         }
 
153
         last LOG unless $oktorun;
 
154
      }
 
155
   }
 
156
 
 
157
   # Close session filehandles.
 
158
   while ( my $fh = pop @{ $self->{session_fhs} } ) {
 
159
      close $fh->{fh};
 
160
   }
 
161
   $self->{n_open_fhs}  = 0;
 
162
 
 
163
   $self->_merge_session_files() if $self->{merge_sessions};
 
164
   $self->print_split_summary() unless $self->{quiet};
 
165
 
 
166
   return;
 
167
}
 
168
 
 
169
sub _save_event {
 
170
   my ( $self, $event ) = @_; 
 
171
   my ($session, $session_id) = $self->_get_session_ds($event);
 
172
   return unless $session;
 
173
 
 
174
   if ( !defined $session->{fh} ) {
 
175
      $self->{n_sessions_saved}++;
 
176
      MKDEBUG && _d('New session:', $session_id, ',',
 
177
         $self->{n_sessions_saved}, 'of', $self->{max_sessions});
 
178
 
 
179
      my $session_file = $self->_get_next_session_file();
 
180
      if ( !$session_file ) {
 
181
         $oktorun = 0;
 
182
         MKDEBUG && _d('Not oktorun because no _get_next_session_file');
 
183
         return;
 
184
      }
 
185
 
 
186
      # Close Last Recently Used session fhs if opening if this new
 
187
      # session fh will cause us to have too many open files.
 
188
      if ( $self->{n_open_fhs} >= $self->{max_open_files} ) {
 
189
         $self->_close_lru_session()
 
190
      }
 
191
 
 
192
      # Open a fh for this session file.
 
193
      open my $fh, '>', $session_file
 
194
         or die "Cannot open session file $session_file: $OS_ERROR";
 
195
      $session->{fh} = $fh;
 
196
      $self->{n_open_fhs}++;
 
197
 
 
198
      # Save fh and session file in case we need to open/close it later.
 
199
      $session->{active}       = 1;
 
200
      $session->{session_file} = $session_file;
 
201
 
 
202
      push @{$self->{session_fhs}}, { fh => $fh, session_id => $session_id };
 
203
 
 
204
      MKDEBUG && _d('Created', $session_file, 'for session',
 
205
         $self->{attribute}, '=', $session_id);
 
206
 
 
207
      # This special comment lets mk-log-player know when a session begins.
 
208
      print $fh "-- START SESSION $session_id\n\n";
 
209
   }
 
210
   elsif ( !$session->{active} ) {
 
211
      # Reopen the existing but inactive session. This happens when
 
212
      # a new session (above) had to close LRU session fhs.
 
213
 
 
214
      # Again, close Last Recently Used session fhs if reopening if this
 
215
      # session's fh will cause us to have too many open files.
 
216
      if ( $self->{n_open_fhs} >= $self->{max_open_files} ) {
 
217
         $self->_close_lru_session();
 
218
      }
 
219
 
 
220
       # Reopen this session's fh.
 
221
       open $session->{fh}, '>>', $session->{session_file}
 
222
          or die "Cannot reopen session file "
 
223
            . "$session->{session_file}: $OS_ERROR";
 
224
 
 
225
       # Mark this session as active again.
 
226
       $session->{active} = 1;
 
227
       $self->{n_open_fhs}++;
 
228
 
 
229
       MKDEBUG && _d('Reopend', $session->{session_file}, 'for session',
 
230
         $self->{attribute}, '=', $session_id);
 
231
   }
 
232
   else {
 
233
      MKDEBUG && _d('Event belongs to active session', $session_id);
 
234
   }
 
235
 
 
236
   my $session_fh = $session->{fh};
 
237
 
 
238
   # Print USE db if 1) we haven't done so yet or 2) the db has changed.
 
239
   my $db = $event->{db} || $event->{Schema};
 
240
   if ( $db && ( !defined $session->{db} || $session->{db} ne $db ) ) {
 
241
      print $session_fh "use $db\n\n";
 
242
      $session->{db} = $db;
 
243
   }
 
244
 
 
245
   print $session_fh $self->flatten($event->{arg}), "\n\n";
 
246
   $self->{n_events_saved}++;
 
247
 
 
248
   return;
 
249
}
 
250
 
 
251
# Returns shortcut to session data store and id for the given event.
 
252
# The returned session will be undef if no more sessions are allowed.
 
253
sub _get_session_ds {
 
254
   my ( $self, $event ) = @_;
 
255
 
 
256
   my $attrib = $self->{attribute};
 
257
   if ( !$event->{ $attrib } ) {
 
258
      MKDEBUG && _d('No attribute', $attrib, 'in event:', Dumper($event));
 
259
      return;
 
260
   }
 
261
 
 
262
   # This could indicate a problem in parser not parsing
 
263
   # a log event correctly thereby leaving $event->{arg} undefined.
 
264
   # Or, it could simply be an event like:
 
265
   #   use db;
 
266
   #   SET NAMES utf8;
 
267
   return unless $event->{arg};
 
268
 
 
269
   # Don't print admin commands like quit or ping because these
 
270
   # cannot be played.
 
271
   return if ($event->{cmd} || '') eq 'Admin';
 
272
 
 
273
   my $session;
 
274
   my $session_id = $event->{ $attrib };
 
275
 
 
276
   # The following is necessary to prevent Perl from auto-vivifying
 
277
   # a lot of empty hashes for new sessions that are ignored due to
 
278
   # already having max_sessions.
 
279
   if ( $self->{n_sessions_saved} < $self->{max_sessions} ) {
 
280
      # Will auto-vivify if necessary.
 
281
      $session = $self->{sessions}->{ $session_id } ||= {};
 
282
   }
 
283
   elsif ( exists $self->{sessions}->{ $session_id } ) {
 
284
      # Use only existing sessions.
 
285
      $session = $self->{sessions}->{ $session_id };
 
286
   }
 
287
   else {
 
288
      $self->{n_sessions_skipped} += 1;
 
289
      MKDEBUG && _d('Skipping new session', $session_id,
 
290
         'because max_sessions is reached');
 
291
   }
 
292
 
 
293
   return $session, $session_id;
 
294
}
 
295
 
 
296
sub _close_lru_session {
 
297
   my ( $self ) = @_;
 
298
   my $session_fhs = $self->{session_fhs};
 
299
   my $lru_n       = $self->{n_sessions_saved} - $self->{max_open_files} - 1;
 
300
   my $close_to_n  = $lru_n + $self->{close_lru_files} - 1;
 
301
 
 
302
   MKDEBUG && _d('Closing session fhs', $lru_n, '..', $close_to_n,
 
303
      '(',$self->{n_sessions}, 'sessions', $self->{n_open_fhs}, 'open fhs)');
 
304
 
 
305
   foreach my $session ( @$session_fhs[ $lru_n..$close_to_n ] ) {
 
306
      close $session->{fh};
 
307
      $self->{n_open_fhs}--;
 
308
      $self->{sessions}->{ $session->{session_id} }->{active} = 0;
 
309
   }
 
310
 
 
311
   return;
 
312
}
 
313
 
 
314
# Returns an empty string on failure, or the next session file name on success.
 
315
# This will fail if we have opened maxdirs and maxfiles.
 
316
sub _get_next_session_file {
 
317
   my ( $self, $n ) = @_;
 
318
   return if $self->{n_dirs_total} >= $self->{max_dirs};
 
319
 
 
320
   # n_files_this_dir will only be < 0 for the first dir and file
 
321
   # because n_file is set to -1 in new(). This is a hack
 
322
   # to cause the first dir and file to be created automatically.
 
323
   if ( ($self->{n_files_this_dir} >= $self->{max_files_per_dir})
 
324
        || $self->{n_files_this_dir} < 0 ) {
 
325
      $self->{n_dirs_total}++;
 
326
      $self->{n_files_this_dir} = 0;
 
327
      my $new_dir = "$self->{base_dir}$self->{n_dirs_total}";
 
328
      if ( !-d $new_dir ) {
 
329
         my $retval = system("mkdir $new_dir");
 
330
         if ( ($retval >> 8) != 0 ) {
 
331
            die "Cannot create new directory $new_dir: $OS_ERROR";
 
332
         }
 
333
         MKDEBUG && _d('Created new base_dir', $new_dir);
 
334
         push @{$self->{created_dirs}}, $new_dir;
 
335
      }
 
336
      elsif ( MKDEBUG ) {
 
337
         _d($new_dir, 'already exists');
 
338
      }
 
339
   }
 
340
   else {
 
341
      MKDEBUG && _d('No dir created; n_files_this_dir:',
 
342
         $self->{n_files_this_dir}, 'n_files_total:',
 
343
         $self->{n_files_total});
 
344
   }
 
345
 
 
346
   $self->{n_files_total}++;
 
347
   $self->{n_files_this_dir}++;
 
348
   my $dir_n        = $self->{n_dirs_total} . '/';
 
349
   my $session_n    = sprintf '%d', $n || $self->{n_sessions_saved};
 
350
   my $session_file = $self->{base_dir}
 
351
                    . $dir_n
 
352
                    . $self->{base_file_name}."-$session_n.txt";
 
353
   MKDEBUG && _d('Next session file', $session_file);
 
354
   return $session_file;
 
355
}
 
356
 
 
357
# Flattens multiple new-line and spaces to single new-lines and spaces
 
358
# and remove /* comment */ blocks.
 
359
sub flatten {
 
360
   my ( $self, $query ) = @_;
 
361
   return unless $query;
 
362
   $query =~ s!/\*.*?\*/! !g;
 
363
   $query =~ s/^\s+//;
 
364
   $query =~ s/\s{2,}/ /g;
 
365
   return $query;
 
366
}
 
367
 
 
368
sub _merge_session_files {
 
369
   my ( $self ) = @_;
 
370
 
 
371
   print "Merging session files...\n" unless $self->{quiet};
 
372
 
 
373
   my @multi_session_files;
 
374
   for my $i ( 1..$self->{session_files} ) {
 
375
      push @multi_session_files, $self->{base_dir} ."sessions-$i.txt";
 
376
   }
 
377
 
 
378
   my @single_session_files = map {
 
379
      $_->{session_file};
 
380
   } values %{$self->{sessions}};
 
381
 
 
382
   my $i = make_rr_iter(0, $#multi_session_files);  # round-robin iterator
 
383
   foreach my $single_session_file ( @single_session_files ) {
 
384
      my $multi_session_file = $multi_session_files[ $i->() ];
 
385
      my $cmd;
 
386
      if ( $self->{split_random} ) {
 
387
         $cmd = "mv $single_session_file $multi_session_file";
 
388
      }
 
389
      else {
 
390
         $cmd = "cat $single_session_file >> $multi_session_file";
 
391
      }
 
392
      eval { `$cmd`; };
 
393
      if ( $EVAL_ERROR ) {
 
394
         warn "Failed to `$cmd`: $OS_ERROR";
 
395
      }
 
396
   }
 
397
 
 
398
   foreach my $created_dir ( @{$self->{created_dirs}} ) {
 
399
      my $cmd = "rm -rf $created_dir";
 
400
      eval { `$cmd`; };
 
401
      if ( $EVAL_ERROR ) {
 
402
         warn "Failed to `$cmd`: $OS_ERROR";
 
403
      }
 
404
   }
 
405
 
 
406
   return;
 
407
}
 
408
 
 
409
sub make_rr_iter {
 
410
   my ( $start, $end ) = @_;
 
411
   my $current = $start;
 
412
   return sub {
 
413
      $current = $start if $current > $end ;
 
414
      $current++;  # For next iteration.
 
415
      return $current - 1;
 
416
   };
 
417
}
 
418
 
 
419
sub print_split_summary {
 
420
   my ( $self ) = @_;
 
421
   print "Split summary:\n";
 
422
   my $fmt = "%-20s %-10s\n";
 
423
   printf $fmt, 'Total sessions',
 
424
      $self->{n_sessions_saved} + $self->{n_sessions_skipped};
 
425
   printf $fmt, 'Sessions saved',
 
426
      $self->{n_sessions_saved};
 
427
   printf $fmt, 'Total events', $self->{n_events_total};
 
428
   printf $fmt, 'Events saved', $self->{n_events_saved};
 
429
   return;
 
430
}
 
431
 
 
432
sub _d {
 
433
   my ($package, undef, $line) = caller 0;
 
434
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
 
435
        map { defined $_ ? $_ : 'undef' }
 
436
        @_;
 
437
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
 
438
}
 
439
 
 
440
1;
 
441
}
 
442
# ###########################################################################
 
443
# End LogSplitter package
 
444
# ###########################################################################