~ubuntu-branches/ubuntu/utopic/spamassassin/utopic-updates

« back to all changes in this revision

Viewing changes to lib/Mail/SpamAssassin/BayesStore/Redis.pm

  • Committer: Package Import Robot
  • Author(s): Noah Meyerhans
  • Date: 2014-02-14 22:45:15 UTC
  • mfrom: (0.8.1) (0.6.2) (5.1.22 sid)
  • Revision ID: package-import@ubuntu.com-20140214224515-z1es2twos8xh7n2y
Tags: 3.4.0-1
* New upstream version! (Closes: 738963, 738872, 738867)
* Scrub the environment when switching to the debian-spamd user in
  postinst and cron.daily. (Closes: 738951)
* Enhancements to postinst to better manage ownership of
  /var/lib/spamassassin, via Iain Lane <iain.lane@canonical.com>
  (Closes: 738974)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# <@LICENSE>
 
2
# Licensed to the Apache Software Foundation (ASF) under one or more
 
3
# contributor license agreements.  See the NOTICE file distributed with
 
4
# this work for additional information regarding copyright ownership.
 
5
# The ASF licenses this file to you under the Apache License, Version 2.0
 
6
# (the "License"); you may not use this file except in compliance with
 
7
# the License.  You may obtain a copy of the License at:
 
8
#
 
9
#     http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS,
 
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
# See the License for the specific language governing permissions and
 
15
# limitations under the License.
 
16
# </@LICENSE>
 
17
 
 
18
=head1 NAME
 
19
 
 
20
Mail::SpamAssassin::BayesStore::Redis - Redis Bayesian Storage Module Implementation
 
21
 
 
22
=head1 SYNOPSIS
 
23
 
 
24
=head1 DESCRIPTION
 
25
 
 
26
This module implementes a Redis based bayesian storage module.
 
27
 
 
28
Apache SpamAssassin v3.4.0 introduces support for keeping
 
29
a Bayes database on a Redis server, either running locally, or accessed
 
30
over network. Similar to SQL backends, the database may be concurrently
 
31
used by several hosts running SpamAssassin.
 
32
 
 
33
The current implementation only supports a global Bayes database, i.e.
 
34
per-recipient sub-databases are not supported. The Redis 2.6.* server
 
35
supports access over IPv4 or over a Unix socket, starting with Redis 
 
36
version 2.8.0 also IPv6 is supported. Bear in mind that Redis server only 
 
37
offers limited access controls, so it is advisable to let the Redis server 
 
38
bind to a loopback interface only, or to use other mechanisms to limit 
 
39
access, such as local firewall rules.
 
40
 
 
41
The Redis backend for Bayes can put a Lua scripting support in a Redis
 
42
server to good use, improving performance. The Lua support is available
 
43
in Redis server since version 2.6.  In absence of a Lua support, the Redis
 
44
backend uses batched (pipelined) traditional Redis commands, so it should
 
45
work with a Redis server version 2.4 (untested), although this is not
 
46
recommended for busy sites.
 
47
 
 
48
Expiration of token and 'seen' message id entries is left to the Redis
 
49
server. There is no provision for manually expiring a database, so it is
 
50
highly recommended to leave the setting bayes_auto_expire to its default
 
51
value 1 (i.e. enabled).
 
52
 
 
53
Example configuration:
 
54
 
 
55
  bayes_store_module  Mail::SpamAssassin::BayesStore::Redis
 
56
  bayes_sql_dsn       server=127.0.0.1:6379;password=foo;database=2
 
57
  bayes_token_ttl 21d
 
58
  bayes_seen_ttl   8d
 
59
  bayes_auto_expire 1
 
60
 
 
61
A redis server with a Lua support (2.6 or higher) is recommended
 
62
for performance reasons.
 
63
 
 
64
The bayes_sql_dsn config variable has been hijacked for our purposes:
 
65
 
 
66
  bayes_sql_dsn
 
67
 
 
68
    Optional config parameters affecting a connection to a redis server.
 
69
 
 
70
    This is a semicolon-separated list of option=value pairs, where an option
 
71
    can be: server, password, database. Unrecognized options are silently
 
72
    ignored.
 
73
 
 
74
    The 'server' option specifies a socket on which a redis server is
 
75
    listening. It can be an absolute path of a Unix socket, or a host:port
 
76
    pair, where a host can be an IPv4 or IPv6 address or a host name.
 
77
    An IPv6 address must be enclosed in brackets, e.g. [::1]:6379
 
78
    (IPv6 support in a redis server is available since version 2.8.0).
 
79
    A default is to connect to an INET socket at 127.0.0.1, port 6379.
 
80
 
 
81
    The value of a 'password' option is sent in an AUTH command to a redis
 
82
    server on connecting if a server requests authentication. A password is
 
83
    sent in plain text and a redis server only offers an optional rudimentary
 
84
    authentication. To limit access to a redis server use its 'bind' option
 
85
    to bind to a specific interface (typically to a loopback interface),
 
86
    or use a host-based firewall.
 
87
 
 
88
    The value of a 'database' option can be an non-negative (small) integer,
 
89
    which is passed to a redis server with a SELECT command on connecting,
 
90
    and chooses a sub-database index. A default database index is 0.
 
91
 
 
92
    Example: server=localhost:6379;password=foo;database=2
 
93
 
 
94
  bayes_token_ttl
 
95
 
 
96
    Controls token expiry (ttl value in SECONDS, sent as-is to Redis)
 
97
    when bayes_auto_expire is true. Default value is 3 weeks (but check
 
98
    Mail::SpamAssassin::Conf.pm to make sure).
 
99
 
 
100
  bayes_seen_ttl
 
101
 
 
102
    Controls 'seen' expiry (ttl value in SECONDS, sent as-is to Redis)
 
103
    when bayes_auto_expire is true. Default value is 8 days (but check
 
104
    Mail::SpamAssassin::Conf.pm to make sure).
 
105
 
 
106
Expiry is done internally in Redis using *_ttl settings mentioned above,
 
107
but only if bayes_auto_expire is true (which is a default).  This is
 
108
why --force-expire etc does nothing, and token counts and atime values
 
109
are shown as zero in statistics.
 
110
 
 
111
LIMITATIONS: Only global bayes storage is implemented, per-user bayes is
 
112
not currently available. Dumping (sa-learn --backup, or --dump) of a huge
 
113
database may not be possible if all keys do not fit into process memory.
 
114
 
 
115
=cut
 
116
 
 
117
package Mail::SpamAssassin::BayesStore::Redis;
 
118
 
 
119
use strict;
 
120
use warnings;
 
121
use bytes;
 
122
use re 'taint';
 
123
use Errno qw(EBADF);
 
124
use Mail::SpamAssassin::Util qw(untaint_var);
 
125
use Mail::SpamAssassin::Timeout;
 
126
 
 
127
BEGIN {
 
128
  eval { require Digest::SHA; import Digest::SHA qw(sha1); 1 }
 
129
  or do { require Digest::SHA1; import Digest::SHA1 qw(sha1) }
 
130
}
 
131
 
 
132
use Mail::SpamAssassin::Logger;
 
133
use Mail::SpamAssassin::BayesStore;
 
134
use Mail::SpamAssassin::Util::TinyRedis;
 
135
 
 
136
use vars qw( @ISA $VERSION );
 
137
 
 
138
BEGIN {
 
139
  $VERSION = 0.09;
 
140
  @ISA = qw( Mail::SpamAssassin::BayesStore );
 
141
}
 
142
 
 
143
=head1 METHODS
 
144
 
 
145
=head2 new
 
146
 
 
147
public class (Mail::SpamAssassin::BayesStore::Redis) new (Mail::Spamassassin::Plugin::Bayes $bayes)
 
148
 
 
149
Description:
 
150
This methods creates a new instance of the Mail::SpamAssassin::BayesStore::Redis
 
151
object.  It expects to be passed an instance of the Mail::SpamAssassin:Bayes
 
152
object which is passed into the Mail::SpamAssassin::BayesStore parent object.
 
153
 
 
154
=cut
 
155
 
 
156
sub new {
 
157
  my $class = shift;
 
158
  $class = ref($class) || $class;
 
159
  my $self = $class->SUPER::new(@_);
 
160
 
 
161
  my $bconf = $self->{bayes}->{conf};
 
162
 
 
163
  foreach (split(';', $bconf->{bayes_sql_dsn})) {
 
164
    my ($a, $b) = split('=');
 
165
    if (!defined $b) {
 
166
      warn("bayes: invalid bayes_sql_dsn config\n");
 
167
      return;
 
168
    } elsif ($a eq 'database') {
 
169
      $self->{db_id} = $b;
 
170
    } elsif ($a eq 'password') {
 
171
      $self->{password} = $b;
 
172
    } else {
 
173
      push @{$self->{redis_conf}}, $a => $b eq 'undef' ?
 
174
        undef : untaint_var($b);
 
175
    }
 
176
  }
 
177
 
 
178
  if (!$bconf->{bayes_auto_expire}) {
 
179
    $self->{expire_token} = $self->{expire_seen} = undef;
 
180
    warn("bayes: the setting bayes_auto_expire is off, this is ".
 
181
         "not a recommended setting for the Redis bayes backend");
 
182
  } else {
 
183
    $self->{expire_token} = $bconf->{bayes_token_ttl};
 
184
    undef $self->{expire_token}  if $self->{expire_token} &&
 
185
                                    $self->{expire_token} < 0;
 
186
    $self->{expire_seen}  = $bconf->{bayes_seen_ttl};
 
187
    undef $self->{expire_seen}   if $self->{expire_seen} &&
 
188
                                    $self->{expire_seen} < 0;
 
189
  }
 
190
 
 
191
  $self->{supported_db_version} = 3;
 
192
  $self->{connected} = 0;
 
193
  $self->{is_officially_open} = 0;
 
194
  $self->{is_writable} = 0;
 
195
 
 
196
  $self->{timer} = Mail::SpamAssassin::Timeout->new({
 
197
    secs => $self->{conf}->{redis_timeout} || 10
 
198
  });
 
199
 
 
200
  return $self;
 
201
}
 
202
 
 
203
sub disconnect {
 
204
  my($self) = @_;
 
205
  if ($self->{connected}) {
 
206
    local($@, $!);
 
207
    dbg("bayes: Redis disconnect");
 
208
    $self->{connected} = 0; undef $self->{redis};
 
209
  }
 
210
}
 
211
 
 
212
sub DESTROY {
 
213
  my($self) = @_;
 
214
  local($@, $!, $_);
 
215
  $self->{connected} = 0; undef $self->{redis};
 
216
}
 
217
 
 
218
# Called from a Redis module on Redis->new and on automatic re-connect.
 
219
# The on_connect() callback must not use batched calls!
 
220
sub on_connect {
 
221
  my($self, $r) = @_;
 
222
  my $db_id = $self->{db_id} || 0;
 
223
  dbg("bayes: Redis on-connect, db_id %d", $db_id);
 
224
  eval {
 
225
    $r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
 
226
  } or do {
 
227
    if ($@ =~ /\bNOAUTH\b/) {
 
228
      defined $self->{password}
 
229
        or die "Redis server requires authentication, no password provided";
 
230
      $r->call('AUTH', $self->{password});
 
231
      $r->call('SELECT', $db_id);
 
232
    } else {
 
233
      chomp $@; die "Redis error: $@";
 
234
    }
 
235
  };
 
236
  eval {
 
237
    $r->call('CLIENT', 'SETNAME', 'sa['.$$.']');
 
238
  } or do {
 
239
    dbg("bayes: CLIENT SETNAME command failed, don't worry, ".
 
240
        "possibly an old redis version: %s", $@);
 
241
  };
 
242
  1;
 
243
}
 
244
 
 
245
sub connect {
 
246
  my($self) = @_;
 
247
 
 
248
  $self->disconnect if $self->{connected};
 
249
  undef $self->{redis};  # just in case
 
250
 
 
251
  my $err = $self->{timer}->run_and_catch(sub {
 
252
    $self->{opened_from_pid} = $$;
 
253
    # will keep a persistent session open to a redis server
 
254
    $self->{redis} = Mail::SpamAssassin::Util::TinyRedis->new(
 
255
                       @{$self->{redis_conf}},
 
256
                       on_connect => sub { $self->on_connect(@_) },
 
257
                     );
 
258
    $self->{redis} or die "Error: $!";
 
259
  });
 
260
  if ($self->{timer}->timed_out()) {
 
261
    undef $self->{redis};
 
262
    die "bayes: Redis connection timed out!";
 
263
  } elsif ($err) {
 
264
    undef $self->{redis};
 
265
    die "bayes: Redis failed: $err";
 
266
  }
 
267
  $self->{connected} = 1;
 
268
}
 
269
 
 
270
=head2 prefork_init
 
271
 
 
272
public instance (Boolean) prefork_init ();
 
273
 
 
274
Description:
 
275
This optional method is called in the parent process shortly before
 
276
forking off child processes.
 
277
 
 
278
=cut
 
279
 
 
280
sub prefork_init {
 
281
  my ($self) = @_;
 
282
 
 
283
  # Each child process must establish its own connection with a Redis server,
 
284
  # re-using a common forked socket leads to serious trouble (garbled data).
 
285
  #
 
286
  # Parent process may have established its connection during startup, but
 
287
  # it is no longer of any use by now, so we shut it down here in the master
 
288
  # process, letting a spawned child process re-establish it later.
 
289
 
 
290
  if ($self->{connected}) {
 
291
    dbg("bayes: prefork_init, closing a session ".
 
292
        "with a Redis server in a parent process");
 
293
    $self->untie_db;
 
294
    $self->disconnect;
 
295
  }
 
296
}
 
297
 
 
298
=head2 spamd_child_init
 
299
 
 
300
public instance (Boolean) spamd_child_init ();
 
301
 
 
302
Description:
 
303
This optional method is called in a child process shortly after being spawned.
 
304
 
 
305
=cut
 
306
 
 
307
sub spamd_child_init {
 
308
  my ($self) = @_;
 
309
 
 
310
  # Each child process must establish its own connection with a Redis server,
 
311
  # re-using a common forked socket leads to serious trouble (garbled data).
 
312
  #
 
313
  # Just in case the parent master process did not call prefork_init() above,
 
314
  # we try to silently renounce the use of existing cloned connection here.
 
315
  # As the prefork_init plugin callback has only been introduced in
 
316
  # SpamAssassin 3.4.0, this situation can arrise in case of some third party
 
317
  # software (or a pre-3.4.0 version of spamd) is somehow using this plugin.
 
318
  # Better safe than sorry...
 
319
 
 
320
  if ($self->{connected}) {
 
321
    dbg("bayes: spamd_child_init, closing a parent's session ".
 
322
        "to a Redis server in a child process");
 
323
    $self->untie_db;
 
324
    $self->disconnect;  # just drop it, don't shut down parent's session
 
325
  }
 
326
}
 
327
 
 
328
=head2 tie_db_readonly
 
329
 
 
330
public instance (Boolean) tie_db_readonly ();
 
331
 
 
332
Description:
 
333
This method ensures that the database connection is properly setup and working.
 
334
 
 
335
=cut
 
336
 
 
337
sub tie_db_readonly {
 
338
  my($self) = @_;
 
339
 
 
340
  $self->{is_writable} = 0;
 
341
  my $success;
 
342
  if ($self->{connected}) {
 
343
    $success = $self->{is_officially_open} = 1;
 
344
  } else {
 
345
    $success = $self->_open_db();
 
346
  }
 
347
 
 
348
  return $success;
 
349
}
 
350
 
 
351
=head2 tie_db_writable
 
352
 
 
353
public instance (Boolean) tie_db_writable ()
 
354
 
 
355
Description:
 
356
This method ensures that the database connection is properly setup and
 
357
working. If necessary it will initialize the database so that they can
 
358
begin using the database immediately.
 
359
 
 
360
=cut
 
361
 
 
362
sub tie_db_writable {
 
363
  my($self) = @_;
 
364
 
 
365
  $self->{is_writable} = 0;
 
366
  my $success;
 
367
  if ($self->{connected}) {
 
368
    $success = $self->{is_officially_open} = 1;
 
369
  } else {
 
370
    $success = $self->_open_db();
 
371
  }
 
372
 
 
373
  $self->{is_writable} = 1 if $success;
 
374
 
 
375
  return $success;
 
376
}
 
377
 
 
378
=head2 _open_db
 
379
 
 
380
private instance (Boolean) _open_db (Boolean $writable)
 
381
 
 
382
Description:
 
383
This method ensures that the database connection is properly setup and
 
384
working.  It will initialize bayes variables so that they can begin using
 
385
the database immediately.
 
386
 
 
387
=cut
 
388
 
 
389
sub _open_db {
 
390
  my($self) = @_;
 
391
 
 
392
  dbg("bayes: _open_db(%s)",
 
393
      $self->{connected} ? 'already connected' : 'not yet connected');
 
394
 
 
395
  if ($self->{connected}) {
 
396
    $self->{is_officially_open} = 1;
 
397
    return 1;
 
398
  }
 
399
 
 
400
  $self->read_db_configs();
 
401
  $self->connect;
 
402
 
 
403
  if (!defined $self->{redis_server_version}) {
 
404
    my $info = $self->{info} = $self->{redis}->call("INFO");
 
405
    if (defined $info) {
 
406
      my $redis_mem; local $1;
 
407
      $self->{redis_server_version} =
 
408
                          $info =~ /^redis_version:\s*(.*?)\r?$/m ? $1 : '';
 
409
      $self->{have_lua} = $info =~ /^used_memory_lua:/m ? 1 : 0;
 
410
      $redis_mem = $1  if $info =~ /^used_memory:\s*(.*?)\r?$/m;
 
411
      dbg("bayes: redis server version %s, memory used %.1f MiB, Lua %s",
 
412
          $self->{redis_server_version}, $redis_mem/1024/1024,
 
413
          $self->{have_lua} ? 'is available' : 'is not available');
 
414
    }
 
415
  }
 
416
 
 
417
  $self->{db_version} = $self->{redis}->call('GET', 'v:DB_VERSION');
 
418
 
 
419
  if (!$self->{db_version}) {
 
420
    $self->{db_version} = $self->DB_VERSION;
 
421
    my $ret = $self->{redis}->call('MSET',
 
422
                                   'v:DB_VERSION', $self->{db_version},
 
423
                                   'v:NSPAM', 0,
 
424
                                   'v:NHAM', 0,
 
425
                                   'v:TOKEN_FORMAT', 2 );
 
426
    unless ($ret) {
 
427
      warn("bayes: failed to initialize database");
 
428
      return 0;
 
429
    }
 
430
    dbg("bayes: initialized empty database, version $self->{db_version}");
 
431
  }
 
432
  else {
 
433
    dbg("bayes: found bayes db version %s", $self->{db_version});
 
434
    if ($self->{db_version} ne $self->DB_VERSION) {
 
435
      warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
 
436
      return 0;
 
437
    }
 
438
    my $token_format = $self->{redis}->call('GET', 'v:TOKEN_FORMAT') || 0;
 
439
    if ($token_format < 2) {
 
440
      warn("bayes: bayes old token format $token_format not supported, ".
 
441
           "consider backup/restore or initialize a database\n");
 
442
      return 0;
 
443
    }
 
444
  }
 
445
 
 
446
  if ($self->{have_lua} && !defined $self->{multi_hmget_script}) {
 
447
    $self->_define_lua_scripts;
 
448
  }
 
449
 
 
450
  $self->{is_officially_open} = 1;
 
451
 
 
452
  return 1;
 
453
}
 
454
 
 
455
=head2 untie_db
 
456
 
 
457
public instance () untie_db ()
 
458
 
 
459
Description:
 
460
Closes any open db handles.  You can safely call this at any time.
 
461
 
 
462
=cut
 
463
 
 
464
sub untie_db {
 
465
  my $self = shift;
 
466
 
 
467
  $self->{is_officially_open} = $self->{is_writable} = 0;
 
468
  return;
 
469
}
 
470
 
 
471
=head2 sync_due
 
472
 
 
473
public instance (Boolean) sync_due ()
 
474
 
 
475
Description:
 
476
This method determines if a database sync is currently required.
 
477
 
 
478
Unused for Redis implementation.
 
479
 
 
480
=cut
 
481
 
 
482
sub sync_due {
 
483
  return 0;
 
484
}
 
485
 
 
486
=head2 expiry_due
 
487
 
 
488
public instance (Boolean) expiry_due ()
 
489
 
 
490
Description:
 
491
This methods determines if an expire is due.
 
492
 
 
493
Unused for Redis implementation.
 
494
 
 
495
=cut
 
496
 
 
497
sub expiry_due {
 
498
  return 0;
 
499
}
 
500
 
 
501
=head2 seen_get
 
502
 
 
503
public instance (String) seen_get (string $msgid)
 
504
 
 
505
Description:
 
506
This method retrieves the stored value, if any, for C<$msgid>.  The return
 
507
value is the stored string ('s' for spam and 'h' for ham) or undef if C<$msgid>
 
508
is not found.
 
509
 
 
510
=cut
 
511
 
 
512
sub seen_get {
 
513
  my($self, $msgid) = @_;
 
514
 
 
515
  return $self->{redis}->call('GET', "s:$msgid");
 
516
}
 
517
 
 
518
=head2 seen_put
 
519
 
 
520
public (Boolean) seen_put (string $msgid, char $flag)
 
521
 
 
522
Description:
 
523
This method records C<$msgid> as the type given by C<$flag>.  C<$flag> is one
 
524
of two values 's' for spam and 'h' for ham.
 
525
 
 
526
=cut
 
527
 
 
528
sub seen_put {
 
529
  my($self, $msgid, $flag) = @_;
 
530
 
 
531
  my $r = $self->{redis};
 
532
  if ($self->{expire_seen}) {
 
533
    $r->call('SETEX', "s:$msgid", $self->{expire_seen}, $flag);
 
534
  } else {
 
535
    $r->call('SET',   "s:$msgid", $flag);
 
536
  }
 
537
 
 
538
  return 1;
 
539
}
 
540
 
 
541
=head2 seen_delete
 
542
 
 
543
public instance (Boolean) seen_delete (string $msgid)
 
544
 
 
545
Description:
 
546
This method removes C<$msgid> from the database.
 
547
 
 
548
=cut
 
549
 
 
550
sub seen_delete {
 
551
  my($self, $msgid) = @_;
 
552
 
 
553
  $self->{redis}->call('DEL', "s:$msgid");
 
554
  return 1;
 
555
}
 
556
 
 
557
=head2 get_storage_variables
 
558
 
 
559
public instance (@) get_storage_variables ()
 
560
 
 
561
Description:
 
562
This method retrieves the various administrative variables used by
 
563
the Bayes process and database.
 
564
 
 
565
The values returned in the array are in the following order:
 
566
 
 
567
0: scan count base
 
568
1: number of spam
 
569
2: number of ham
 
570
3: number of tokens in db
 
571
4: last expire atime
 
572
5: oldest token in db atime
 
573
6: db version value
 
574
7: last journal sync
 
575
8: last atime delta
 
576
9: last expire reduction count
 
577
10: newest token in db atime
 
578
 
 
579
Only 1,2,6 are used with Redis, others return zero always.
 
580
 
 
581
=cut
 
582
 
 
583
sub get_storage_variables {
 
584
  my($self, @varnames) = @_;
 
585
 
 
586
  @varnames = qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
 
587
                 OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
 
588
                 LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
 
589
                 TOKEN_FORMAT}  if !@varnames;
 
590
  my $values = $self->{redis}->call('MGET', map('v:'.$_, @varnames));
 
591
  return if !$values;
 
592
  return map(defined $_ ? $_ : 0, @$values);
 
593
}
 
594
 
 
595
=head2 get_running_expire_tok
 
596
 
 
597
public instance (String $time) get_running_expire_tok ()
 
598
 
 
599
Description:
 
600
This method determines if an expire is currently running and returns
 
601
the last time set.
 
602
 
 
603
=cut
 
604
 
 
605
sub get_running_expire_tok {
 
606
  return 0;
 
607
}
 
608
 
 
609
=head2 set_running_expire_tok
 
610
 
 
611
public instance (String $time) set_running_expire_tok ()
 
612
 
 
613
Description:
 
614
This method sets the time that an expire starts running.
 
615
 
 
616
=cut
 
617
 
 
618
sub set_running_expire_tok {
 
619
  return 0;
 
620
}
 
621
 
 
622
=head2 remove_running_expire_tok
 
623
 
 
624
public instance (Boolean) remove_running_expire_tok ()
 
625
 
 
626
Description:
 
627
This method removes the row in the database that indicates that
 
628
and expire is currently running.
 
629
 
 
630
=cut
 
631
 
 
632
sub remove_running_expire_tok {
 
633
  return 1;
 
634
}
 
635
 
 
636
=head2 tok_get
 
637
 
 
638
public instance (Integer, Integer, Integer) tok_get (String $token)
 
639
 
 
640
Description:
 
641
This method retrieves a specificed token (C<$token>) from the database
 
642
and returns its spam_count, ham_count and last access time.
 
643
 
 
644
=cut
 
645
 
 
646
sub tok_get {
 
647
  my($self, $token) = @_;
 
648
 
 
649
  my $array = $self->tok_get_all($token);
 
650
  return if !$array || !@$array;
 
651
  return (@{$array->[0]})[1,2,3];
 
652
}
 
653
 
 
654
=head2 tok_get_all
 
655
 
 
656
public instance (\@) tok_get (@ $tokens)
 
657
 
 
658
Description:
 
659
This method retrieves the specified tokens (C<$tokens>) from storage and
 
660
returns a ref to arrays spam count, ham count and last access time.
 
661
 
 
662
=cut
 
663
 
 
664
sub tok_get_all {
 
665
  my $self = shift;
 
666
# my @keys = @_;  # avoid copying strings unnecessarily
 
667
 
 
668
  my @values;
 
669
  $self->connect if !$self->{connected};
 
670
  my $r = $self->{redis};
 
671
 
 
672
  if (! $self->{have_lua} ) {
 
673
 
 
674
    $r->b_call('HMGET', 'w:'.$_, 's', 'h')  for @_;
 
675
    my $results = $r->b_results;
 
676
 
 
677
    if (@$results != @_) {
 
678
      $self->disconnect;
 
679
      die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
 
680
                  scalar @$results, scalar @_);
 
681
    }
 
682
    for my $j (0 .. $#$results) {
 
683
      my($s,$h) = @{$results->[$j]};
 
684
      push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0])  if $s || $h;
 
685
    }
 
686
 
 
687
  } else {  # have Lua
 
688
 
 
689
    # no need for cryptographical strength, just checking for protocol errors
 
690
    my $nonce = sprintf("%06x", rand(0xffffff));
 
691
 
 
692
    my $result;
 
693
    eval {
 
694
      $result = $r->call('EVALSHA', $self->{multi_hmget_script},
 
695
                         scalar @_, map('w:'.$_, @_), $nonce);
 
696
      1;
 
697
    } or do {  # Lua script probably not cached, define again and re-try
 
698
      if ($@ !~ /^NOSCRIPT/) {
 
699
        $self->disconnect;
 
700
        die "bayes: Redis LUA error: $@\n";
 
701
      }
 
702
      $self->_define_lua_scripts;
 
703
      $result = $r->call('EVALSHA', $self->{multi_hmget_script},
 
704
                         scalar @_, map('w:'.$_, @_), $nonce);
 
705
    };
 
706
    my @items = split(' ', $result);
 
707
    my $r_nonce = pop(@items);
 
708
    if ($r_nonce ne $nonce) {
 
709
      # redis protocol error?
 
710
      $self->disconnect;
 
711
      die sprintf("bayes: tok_get_all nonce mismatch, expected %s, got %s\n",
 
712
                  $nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
 
713
    } elsif (@items != @_) {
 
714
      $self->disconnect;
 
715
      die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
 
716
                  scalar @items, scalar @_);
 
717
    } else {
 
718
      for my $j (0 .. $#items) {
 
719
        my($s,$h) = split(m{/}, $items[$j], 2);
 
720
        push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0])  if $s || $h;
 
721
      }
 
722
    }
 
723
  }
 
724
 
 
725
  dbg("bayes: tok_get_all found %d tokens out of %d",
 
726
      scalar @values, scalar @_);
 
727
 
 
728
  return \@values;
 
729
}
 
730
 
 
731
=head2 tok_count_change
 
732
 
 
733
public instance (Boolean) tok_count_change (
 
734
  Integer $dspam, Integer $dham, String $token, String $newatime)
 
735
 
 
736
Description:
 
737
This method takes a C<$spam_count> and C<$ham_count> and adds it to
 
738
C<$tok> along with updating C<$tok>s atime with C<$atime>.
 
739
 
 
740
=cut
 
741
 
 
742
sub tok_count_change {
 
743
  my($self, $dspam, $dham, $token, $newatime) = @_;
 
744
 
 
745
  $self->multi_tok_count_change($dspam, $dham, {$token => 1}, $newatime);
 
746
}
 
747
 
 
748
=head2 multi_tok_count_change
 
749
 
 
750
public instance (Boolean) multi_tok_count_change (
 
751
  Integer $dspam, Integer $dham, \% $tokens, String $newatime)
 
752
 
 
753
Description:
 
754
This method takes a C<$dspam> and C<$dham> and adds it to all of the
 
755
tokens in the C<$tokens> hash ref along with updating each token's
 
756
atime with C<$atime>.
 
757
 
 
758
=cut
 
759
 
 
760
sub multi_tok_count_change {
 
761
  my($self, $dspam, $dham, $tokens, $newatime) = @_;
 
762
 
 
763
  # turn undef or an empty string into a 0
 
764
  $dspam ||= 0;
 
765
  $dham  ||= 0;
 
766
  # the increment must be an integer, otherwise redis returns an error
 
767
 
 
768
  dbg("bayes: multi_tok_count_change learning %d spam, %d ham",
 
769
      $dspam, $dham);
 
770
 
 
771
  my $ttl = $self->{expire_token};  # time-to-live, in seconds
 
772
 
 
773
  $self->connect if !$self->{connected};
 
774
  my $r = $self->{redis};
 
775
 
 
776
  if ($dspam > 0 || $dham > 0) {  # learning
 
777
    while (my($token,$v) = each(%$tokens)) {
 
778
      my $key = 'w:'.$token;
 
779
      $r->b_call('HINCRBY', $key, 's', int $dspam) if $dspam > 0;
 
780
      $r->b_call('HINCRBY', $key, 'h', int $dham)  if $dham  > 0;
 
781
      $r->b_call('EXPIRE',  $key, $ttl)  if $ttl;
 
782
    }
 
783
    $r->b_results;  # collect response, ignoring results
 
784
  }
 
785
 
 
786
  if ($dspam < 0 || $dham < 0) {  # unlearning - rare, not as efficient
 
787
    while (my($token,$v) = each(%$tokens)) {
 
788
      my $key = 'w:'.$token;
 
789
      if ($dspam < 0) {
 
790
        my $result = $r->call('HINCRBY', $key, 's', int $dspam);
 
791
        if (!$result || $result <= 0) {
 
792
          $r->call('HDEL',   $key, 's');
 
793
        } elsif ($ttl) {
 
794
          $r->call('EXPIRE', $key, $ttl);
 
795
        }
 
796
      }
 
797
      if ($dham < 0) {
 
798
        my $result = $r->call('HINCRBY', $key, 'h', int $dham);
 
799
        if (!$result || $result <= 0) {
 
800
          $r->call('HDEL',   $key, 'h');
 
801
        } elsif ($ttl) {
 
802
          $r->call('EXPIRE', $key, $ttl);
 
803
        }
 
804
      }
 
805
    }
 
806
  }
 
807
 
 
808
  return 1;
 
809
}
 
810
 
 
811
=head2 nspam_nham_get
 
812
 
 
813
public instance ($spam_count, $ham_count) nspam_nham_get ()
 
814
 
 
815
Description:
 
816
This method retrieves the total number of spam and the total number of
 
817
ham learned.
 
818
 
 
819
=cut
 
820
 
 
821
sub nspam_nham_get {
 
822
  my($self) = @_;
 
823
 
 
824
  my @vars = $self->get_storage_variables('NSPAM', 'NHAM');
 
825
  dbg("bayes: nspam_nham_get nspam=%s, nham=%s", @vars);
 
826
  @vars;
 
827
}
 
828
 
 
829
=head2 nspam_nham_change
 
830
 
 
831
public instance (Boolean) nspam_nham_change (Integer $num_spam,
 
832
                                             Integer $num_ham)
 
833
 
 
834
Description:
 
835
This method updates the number of spam and the number of ham in the database.
 
836
 
 
837
=cut
 
838
 
 
839
sub nspam_nham_change {
 
840
  my($self, $ds, $dh) = @_;
 
841
 
 
842
  return 1 unless $ds || $dh;
 
843
 
 
844
  $self->connect if !$self->{connected};
 
845
  my $r = $self->{redis};
 
846
 
 
847
  my $err = $self->{timer}->run_and_catch(sub {
 
848
    $r->b_call('INCRBY', "v:NSPAM", $ds) if $ds;
 
849
    $r->b_call('INCRBY', "v:NHAM",  $dh) if $dh;
 
850
    $r->b_results;  # collect response, ignoring results
 
851
  });
 
852
 
 
853
  if ($self->{timer}->timed_out()) {
 
854
    $self->disconnect;
 
855
    die("bayes: Redis connection timed out!");
 
856
  }
 
857
  elsif ($err) {
 
858
    $self->disconnect;
 
859
    die("bayes: failed to increment nspam $ds nham $dh: $err");
 
860
  }
 
861
 
 
862
  return 1;
 
863
}
 
864
 
 
865
=head2 tok_touch
 
866
 
 
867
public instance (Boolean) tok_touch (String $token,
 
868
                                     String $atime)
 
869
 
 
870
Description:
 
871
This method updates the given tokens (C<$token>) atime.
 
872
 
 
873
The assumption is that the token already exists in the database.
 
874
 
 
875
We will never update to an older atime
 
876
 
 
877
=cut
 
878
 
 
879
sub tok_touch {
 
880
  my($self, $token, $atime) = @_;
 
881
 
 
882
  return $self->tok_touch_all([$token], $atime);
 
883
}
 
884
 
 
885
=head2 tok_touch_all
 
886
 
 
887
public instance (Boolean) tok_touch (\@ $tokens
 
888
                                     String $atime)
 
889
 
 
890
Description:
 
891
This method does a mass update of the given list of tokens C<$tokens>,
 
892
if the existing token atime is < C<$atime>.
 
893
 
 
894
=cut
 
895
 
 
896
sub tok_touch_all {
 
897
  my($self, $tokens, $newatime) = @_;
 
898
 
 
899
  my $ttl = $self->{expire_token};  # time-to-live, in seconds
 
900
  return 1  unless $ttl && $tokens && @$tokens;
 
901
 
 
902
  dbg("bayes: tok_touch_all setting expire to %s on %d tokens",
 
903
      $ttl, scalar @$tokens);
 
904
 
 
905
  $self->connect if !$self->{connected};
 
906
  my $r = $self->{redis};
 
907
 
 
908
  # Benchmarks for a 'with-Lua' vs. a 'batched non-Lua' case show same speed,
 
909
  # so for simplicity we only kept a batched non-Lua code. Note that this
 
910
  # only applies to our own implementation of the Redis client protocol
 
911
  # which offers efficient command batching (pipelining) - with the Redis
 
912
  # CPAN module the batched case would be worse by about 33% on the average.
 
913
 
 
914
  # We just refresh TTL on all
 
915
 
 
916
  $r->b_call('EXPIRE', 'w:'.$_, $ttl) for @$tokens;
 
917
  $r->b_results;  # collect response, ignoring results
 
918
 
 
919
  return 1;
 
920
}
 
921
 
 
922
=head2 cleanup
 
923
 
 
924
public instance (Boolean) cleanup ()
 
925
 
 
926
Description:
 
927
This method perfoms any cleanup necessary before moving onto the next
 
928
operation.
 
929
 
 
930
=cut
 
931
 
 
932
sub cleanup {
 
933
  return 1;
 
934
}
 
935
 
 
936
=head2 get_magic_re
 
937
 
 
938
public instance (String) get_magic_re ()
 
939
 
 
940
Description:
 
941
This method returns a regexp which indicates a magic token.
 
942
 
 
943
=cut
 
944
 
 
945
use constant get_magic_re => undef;
 
946
 
 
947
=head2 sync
 
948
 
 
949
public instance (Boolean) sync (\% $opts)
 
950
 
 
951
Description:
 
952
This method performs a sync of the database
 
953
 
 
954
=cut
 
955
 
 
956
sub sync {
 
957
  return 1;
 
958
}
 
959
 
 
960
=head2 perform_upgrade
 
961
 
 
962
public instance (Boolean) perform_upgrade (\% $opts);
 
963
 
 
964
Description:
 
965
Performs an upgrade of the database from one version to another, not
 
966
currently used in this implementation.
 
967
 
 
968
=cut
 
969
 
 
970
sub perform_upgrade {
 
971
  return 1;
 
972
}
 
973
 
 
974
=head2 clear_database
 
975
 
 
976
public instance (Boolean) clear_database ()
 
977
 
 
978
Description:
 
979
This method deletes all records for a particular user.
 
980
 
 
981
Callers should be aware that any errors returned by this method
 
982
could causes the database to be inconsistent for the given user.
 
983
 
 
984
=cut
 
985
 
 
986
sub clear_database {
 
987
  my($self) = @_;
 
988
 
 
989
  # TODO
 
990
  warn("bayes: note: assuming the database is empty; ".
 
991
       "to manually clear a database: redis-cli -n <db-ind> FLUSHDB\n");
 
992
 
 
993
  return 1;
 
994
}
 
995
 
 
996
=head2 dump_db_toks
 
997
 
 
998
public instance () dump_db_toks (String $template, String $regex, Array @vars)
 
999
 
 
1000
Description:
 
1001
This method loops over all tokens, computing the probability for the token
 
1002
and then printing it out according to the passed in token.
 
1003
 
 
1004
=cut
 
1005
 
 
1006
sub dump_db_toks {
 
1007
  my ($self, $template, $regex, @vars) = @_;
 
1008
 
 
1009
  return 0 unless $self->tie_db_readonly;
 
1010
  $self->connect if !$self->{connected};
 
1011
  my $r = $self->{redis};
 
1012
 
 
1013
  my $atime = time;  # fake
 
1014
 
 
1015
  # let's get past this terrible command as fast as possible
 
1016
  # (ignoring $regex which makes no sense with SHA digests)
 
1017
  my $keys = $r->call('KEYS', 'w:*');
 
1018
  dbg("bayes: fetched %d token keys", scalar @$keys);
 
1019
 
 
1020
  # process tokens in chunks of 1000
 
1021
  for (my $i = 0; $i <= $#$keys; $i += 1000) {
 
1022
    my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
 
1023
 
 
1024
    my @tokensdata;
 
1025
    if (! $self->{have_lua}) {  # no Lua, 3-times slower
 
1026
 
 
1027
      for (my $j = $i; $j <= $end; $j++) {
 
1028
        $r->b_call('HMGET', $keys->[$j], 's', 'h');
 
1029
      }
 
1030
      my $j = $i;
 
1031
      my $itemslist_ref = $r->b_results;
 
1032
      foreach my $item ( @$itemslist_ref ) {
 
1033
        my($s,$h) = @$item;
 
1034
        push(@tokensdata,
 
1035
             [ substr($keys->[$j],2), ($s||0)+0, ($h||0)+0 ])  if $s || $h;
 
1036
        $j++;
 
1037
      }
 
1038
 
 
1039
    } else {  # have_lua
 
1040
 
 
1041
      my $nonce = sprintf("%06x", rand(0xffffff));
 
1042
      my @tokens = @{$keys}[$i .. $end];
 
1043
      my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
 
1044
                            scalar @tokens, @tokens, $nonce);
 
1045
      my @items = split(' ', $result);
 
1046
      my $r_nonce = pop(@items);
 
1047
      if (!defined $r_nonce) {
 
1048
        $self->disconnect;
 
1049
        die "bayes: dump_db_toks received no results\n";
 
1050
      } elsif ($r_nonce ne $nonce) {
 
1051
        # redis protocol error?
 
1052
        $self->disconnect;
 
1053
        die sprintf("bayes: dump_db_toks nonce mismatch, ".
 
1054
                    "expected %s, got %s\n",
 
1055
                    $nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
 
1056
      } elsif (@items != @tokens) {
 
1057
        $self->disconnect;
 
1058
        die sprintf("bayes: dump_db_toks got %d entries, expected %d\n",
 
1059
                       scalar @items, scalar @tokens);
 
1060
      }
 
1061
      # stripping a leading "w:"
 
1062
      @tokensdata = map { my($s,$h) = split(m{/}, shift @items, 2);
 
1063
                          [ substr($_,2), ($s||0)+0, ($h||0)+0 ] } @tokens;
 
1064
    }
 
1065
 
 
1066
    my $probabilities_ref =
 
1067
      $self->{bayes}->_compute_prob_for_all_tokens(\@tokensdata,
 
1068
                                                   $vars[1], $vars[2]);
 
1069
    foreach my $tokendata (@tokensdata) {
 
1070
      my $prob = shift(@$probabilities_ref);
 
1071
      my($token, $s, $h) = @$tokendata;
 
1072
      next if !$s && !$h;
 
1073
      $prob = 0.5  if !defined $prob;
 
1074
      my $encoded = unpack("H*", $token);
 
1075
      printf($template, $prob, $s, $h, $atime, $encoded)
 
1076
        or die "Error writing tokens: $!";
 
1077
    }
 
1078
  }
 
1079
  dbg("bayes: written token keys");
 
1080
 
 
1081
  $self->untie_db();
 
1082
 
 
1083
  return;
 
1084
}
 
1085
 
 
1086
=head2 backup_database
 
1087
 
 
1088
public instance (Boolean) backup_database ()
 
1089
 
 
1090
Description:
 
1091
This method will dump the users database in a machine readable format.
 
1092
 
 
1093
=cut
 
1094
 
 
1095
sub backup_database {
 
1096
  my($self) = @_;
 
1097
 
 
1098
  return 0 unless $self->tie_db_readonly;
 
1099
  $self->connect if !$self->{connected};
 
1100
  my $r = $self->{redis};
 
1101
 
 
1102
  my $atime = time;  # fake
 
1103
  my @vars = $self->get_storage_variables(qw(DB_VERSION NSPAM NHAM));
 
1104
  print "v\t$vars[0]\tdb_version # this must be the first line!!!\n";
 
1105
  print "v\t$vars[1]\tnum_spam\n";
 
1106
  print "v\t$vars[2]\tnum_nonspam\n";
 
1107
 
 
1108
  # let's get past this terrible command as fast as possible
 
1109
  my $keys = $r->call('KEYS', 'w:*');
 
1110
  dbg("bayes: fetched %d token keys", scalar @$keys);
 
1111
 
 
1112
  # process tokens in chunks of 1000
 
1113
  for (my $i = 0; $i <= $#$keys; $i += 1000) {
 
1114
    my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
 
1115
 
 
1116
    if (! $self->{have_lua}) {  # no Lua, slower
 
1117
 
 
1118
      for (my $j = $i; $j <= $end; $j++) {
 
1119
        $r->b_call('HMGET', $keys->[$j], 's', 'h');
 
1120
      }
 
1121
      my $j = $i;
 
1122
      my $itemslist_ref = $r->b_results;
 
1123
      foreach my $item ( @$itemslist_ref ) {
 
1124
        my $encoded = unpack("H*", substr($keys->[$j++], 2));
 
1125
        my($s,$h) = @$item;
 
1126
        printf("t\t%d\t%d\t%s\t%s\n",
 
1127
               $s||0, $h||0, $atime, $encoded)  if $s || $h;
 
1128
      }
 
1129
 
 
1130
    } else {  # have_lua
 
1131
 
 
1132
      my $nonce = sprintf("%06x", rand(0xffffff));
 
1133
      my @tokens = @{$keys}[$i .. $end];
 
1134
      my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
 
1135
                            scalar @tokens, @tokens, $nonce);
 
1136
      my @items = split(' ', $result);
 
1137
      my $r_nonce = pop(@items);
 
1138
      if (!defined $r_nonce) {
 
1139
        $self->disconnect;
 
1140
        die "bayes: backup_database received no results\n";
 
1141
      } elsif ($r_nonce ne $nonce) {
 
1142
        # redis protocol error?
 
1143
        $self->disconnect;
 
1144
        die sprintf("bayes: backup_database nonce mismatch, ".
 
1145
                    "expected %s, got %s\n",
 
1146
                    $nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
 
1147
      } elsif (@items != @tokens) {
 
1148
        $self->disconnect;
 
1149
        die sprintf("bayes: backup_database got %d entries, expected %d\n",
 
1150
                       scalar @items, scalar @tokens);
 
1151
      }
 
1152
      foreach my $token (@tokens) {
 
1153
        my($s,$h) = split(m{/}, shift @items, 2);
 
1154
        next if !$s && !$h;
 
1155
        my $encoded = unpack("H*", substr($token,2));  # strip leading "w:"
 
1156
        printf("t\t%d\t%d\t%s\t%s\n", $s||0, $h||0, $atime, $encoded);
 
1157
      }
 
1158
    }
 
1159
  }
 
1160
  dbg("bayes: written token keys");
 
1161
 
 
1162
  $keys = $r->call('KEYS', 's:*');
 
1163
  dbg("bayes: fetched %d seen keys", scalar @$keys);
 
1164
 
 
1165
  for (my $i = 0; $i <= $#$keys; $i += 1000) {
 
1166
    my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
 
1167
    my @t = @{$keys}[$i .. $end];
 
1168
    my $v = $r->call('MGET', @t);
 
1169
    for (my $i = 0; $i < @$v; $i++) {
 
1170
      next unless defined $v->[$i];
 
1171
      printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
 
1172
    }
 
1173
  }
 
1174
  dbg("bayes: written seen keys");
 
1175
 
 
1176
  $self->untie_db();
 
1177
 
 
1178
  return 1;
 
1179
}
 
1180
 
 
1181
=head2 restore_database
 
1182
 
 
1183
public instance (Boolean) restore_database (String $filename, Boolean $showdots)
 
1184
 
 
1185
Description:
 
1186
This method restores a database from the given filename, C<$filename>.
 
1187
 
 
1188
Callers should be aware that any errors returned by this method
 
1189
could causes the database to be inconsistent for the given user.
 
1190
 
 
1191
=cut
 
1192
 
 
1193
sub restore_database {
 
1194
  my ($self, $filename, $showdots) = @_;
 
1195
 
 
1196
  local *DUMPFILE;
 
1197
  if (!open(DUMPFILE, '<', $filename)) {
 
1198
    warn("bayes: unable to open backup file $filename: $!");
 
1199
    return 0;
 
1200
  }
 
1201
 
 
1202
  unless ($self->clear_database()) {
 
1203
    return 0;
 
1204
  }
 
1205
 
 
1206
  return 0 unless $self->tie_db_writable;
 
1207
  $self->connect if !$self->{connected};
 
1208
  my $r = $self->{redis};
 
1209
 
 
1210
  my $token_count = 0;
 
1211
  my $db_version;
 
1212
  my $num_spam = 0;
 
1213
  my $num_ham = 0;
 
1214
  my $line_count = 0;
 
1215
 
 
1216
  my $line = <DUMPFILE>;
 
1217
  defined $line  or die "Error reading dump file: $!";
 
1218
  $line_count++;
 
1219
  # We require the database version line to be the first in the file so we can
 
1220
  # figure out how to properly deal with the file.  If it is not the first
 
1221
  # line then fail
 
1222
  if ($line =~ m/^v\s+(\d+)\s+db_version/) {
 
1223
    $db_version = $1;
 
1224
  } else {
 
1225
    warn("bayes: database version must be the first line in the backup file, correct and re-run");
 
1226
    return 0;
 
1227
  }
 
1228
 
 
1229
  unless ($db_version == 2 || $db_version == 3) {
 
1230
    warn("bayes: database version $db_version is unsupported, must be version 2 or 3\n");
 
1231
    return 0;
 
1232
  }
 
1233
 
 
1234
  my $curtime = time;
 
1235
  my $q_cnt = 0;
 
1236
  my $token_ttl = $self->{expire_token};  # possibly undefined
 
1237
  my $seen_ttl  = $self->{expire_seen};   # possibly undefined
 
1238
 
 
1239
  for ($!=0; defined($line=<DUMPFILE>); $!=0) {
 
1240
    chomp($line);
 
1241
    $line_count++;
 
1242
 
 
1243
    if ($showdots && $line_count % 1000 == 0) {
 
1244
      print STDERR "." if $showdots;
 
1245
    }
 
1246
 
 
1247
    if ($line =~ /^t\s+/) { # token line
 
1248
      my @parsed_line = split(/\s+/, $line, 5);
 
1249
      my $spam_count = $parsed_line[1] + 0;
 
1250
      my $ham_count = $parsed_line[2] + 0;
 
1251
      my $token = $parsed_line[4];
 
1252
 
 
1253
      $spam_count = 0 if $spam_count < 0;
 
1254
      $ham_count = 0 if $ham_count < 0;
 
1255
 
 
1256
      next if !$spam_count && !$ham_count;
 
1257
 
 
1258
      if ($db_version < 3) {
 
1259
        # versions < 3 use plain text tokens, so we need to convert to hash
 
1260
        $token = substr(sha1($token), -5);
 
1261
      } else {
 
1262
        # turn unpacked binary token back into binary value
 
1263
        $token = pack("H*",$token);
 
1264
      }
 
1265
      my $key = 'w:'.$token;
 
1266
      $r->b_call('HINCRBY', $key, 's', int $spam_count) if $spam_count > 0;
 
1267
      $r->b_call('HINCRBY', $key, 'h', int $ham_count)  if $ham_count  > 0;
 
1268
 
 
1269
      if ($token_ttl) {
 
1270
        # by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
 
1271
        # we avoid auto-expiration of many tokens all at once,
 
1272
        # introducing an unnecessary load spike on a redis server
 
1273
        $r->b_call('EXPIRE', $key, int($token_ttl * (rand()+0.7)));
 
1274
      }
 
1275
 
 
1276
      # collect response every now and then, ignoring results
 
1277
      $r->b_results  if ++$q_cnt % 1000 == 0;
 
1278
 
 
1279
      $token_count++;
 
1280
 
 
1281
    } elsif ($line =~ /^s\s+/) { # seen line
 
1282
      my @parsed_line = split(/\s+/, $line, 3);
 
1283
      my $flag  = $parsed_line[1];
 
1284
      my $msgid = $parsed_line[2];
 
1285
 
 
1286
      unless ($flag eq 'h' || $flag eq 's') {
 
1287
        dbg("bayes: unknown seen flag ($flag) for line: $line, skipping");
 
1288
        next;
 
1289
      }
 
1290
 
 
1291
      unless ($msgid) {
 
1292
        dbg("bayes: blank msgid for line: $line, skipping");
 
1293
        next;
 
1294
      }
 
1295
 
 
1296
      if (!$seen_ttl) {
 
1297
        $r->b_call('SET', "s:$msgid", $flag);
 
1298
      } else {
 
1299
        # by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
 
1300
        # we avoid auto-expiration of many 'seen' entries all at once,
 
1301
        # introducing an unnecessary load spike on a redis server
 
1302
        $r->b_call('SETEX', "s:$msgid", int($seen_ttl * (rand()+0.7)), $flag);
 
1303
      }
 
1304
 
 
1305
      # collect response every now and then, ignoring results
 
1306
      $r->b_results  if ++$q_cnt % 1000 == 0;
 
1307
 
 
1308
    } elsif ($line =~ /^v\s+/) {  # variable line
 
1309
      my @parsed_line = split(/\s+/, $line, 3);
 
1310
      my $value = $parsed_line[1] + 0;
 
1311
      if ($parsed_line[2] eq 'num_spam') {
 
1312
        $num_spam = $value;
 
1313
      } elsif ($parsed_line[2] eq 'num_nonspam') {
 
1314
        $num_ham = $value;
 
1315
      } else {
 
1316
        dbg("bayes: restore_database: skipping unknown line: $line");
 
1317
      }
 
1318
 
 
1319
    } else {
 
1320
      dbg("bayes: skipping unknown line: $line");
 
1321
      next;
 
1322
    }
 
1323
  }
 
1324
 
 
1325
  $r->b_results;  # collect any remaining response, ignoring results
 
1326
 
 
1327
  defined $line || $!==0  or
 
1328
    $!==EBADF ? dbg("bayes: error reading dump file: $!")
 
1329
      : die "error reading dump file: $!";
 
1330
  close(DUMPFILE) or die "Can't close dump file: $!";
 
1331
 
 
1332
  print STDERR "\n" if $showdots;
 
1333
 
 
1334
  if ($num_spam <= 0 && $num_ham <= 0) {
 
1335
    warn("bayes: no num_spam/num_ham found, aborting");
 
1336
    return 0;
 
1337
  }
 
1338
  else {
 
1339
    $self->nspam_nham_change($num_spam, $num_ham);
 
1340
  }
 
1341
 
 
1342
  dbg("bayes: parsed $line_count lines");
 
1343
  dbg("bayes: created database with $token_count tokens ".
 
1344
      "based on $num_spam spam messages and $num_ham ham messages");
 
1345
 
 
1346
  $self->untie_db();
 
1347
 
 
1348
  return 1;
 
1349
}
 
1350
 
 
1351
=head2 db_readable
 
1352
 
 
1353
public instance (Boolean) db_readable()
 
1354
 
 
1355
Description:
 
1356
This method returns a boolean value indicating if the database is in a
 
1357
readable state.
 
1358
 
 
1359
=cut
 
1360
 
 
1361
sub db_readable {
 
1362
  my($self) = @_;
 
1363
 
 
1364
  return $self->{is_officially_open};
 
1365
}
 
1366
 
 
1367
=head2 db_writable
 
1368
 
 
1369
public instance (Boolean) db_writable()
 
1370
 
 
1371
Description:
 
1372
This method returns a boolean value indicating if the database is in a
 
1373
writable state.
 
1374
 
 
1375
=cut
 
1376
 
 
1377
sub db_writable {
 
1378
  my($self) = @_;
 
1379
 
 
1380
  return $self->{is_officially_open} && $self->{is_writable};
 
1381
}
 
1382
 
 
1383
#
 
1384
# Redis functions
 
1385
#
 
1386
 
 
1387
sub _define_lua_scripts {
 
1388
  my $self = shift;
 
1389
  dbg("bayes: defining Lua scripts");
 
1390
 
 
1391
  $self->connect if !$self->{connected};
 
1392
  my $r = $self->{redis};
 
1393
 
 
1394
  $self->{multi_hmget_script} = $r->call('SCRIPT', 'LOAD', <<'END');
 
1395
    local rcall = redis.call
 
1396
    local nonce = ARGV[1]
 
1397
    local KEYS = KEYS
 
1398
    local r = {}
 
1399
    for j = 1, #KEYS do
 
1400
      local sh = rcall("HMGET", KEYS[j], "s", "h")
 
1401
      -- returns counts as a list of spam/ham pairs, zeroes may be omitted
 
1402
      local s, h = sh[1] or "0", sh[2] or "0"
 
1403
      local pair
 
1404
      if h == "0" then
 
1405
        pair = s  -- just a spam field, possibly zero; a ham field omitted
 
1406
      elseif s == "0" then
 
1407
        pair = "/" .. h  -- just a ham field, zero in a spam field suppressed
 
1408
      else
 
1409
        pair = s .. "/" .. h
 
1410
      end
 
1411
      r[#r+1] = pair
 
1412
    end
 
1413
    r[#r+1] = nonce
 
1414
    -- return counts as a single string, avoids overhead of multiresult parsing
 
1415
    return table.concat(r," ")
 
1416
END
 
1417
  1;
 
1418
}
 
1419
 
 
1420
1;