2
# Licensed to the Apache Software Foundation (ASF) under one or more
3
# contributor license agreements. See the NOTICE file distributed with
4
# this work for additional information regarding copyright ownership.
5
# The ASF licenses this file to you under the Apache License, Version 2.0
6
# (the "License"); you may not use this file except in compliance with
7
# the License. You may obtain a copy of the License at:
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
20
Mail::SpamAssassin::BayesStore::Redis - Redis Bayesian Storage Module Implementation
26
This module implementes a Redis based bayesian storage module.
28
Apache SpamAssassin v3.4.0 introduces support for keeping
29
a Bayes database on a Redis server, either running locally, or accessed
30
over network. Similar to SQL backends, the database may be concurrently
31
used by several hosts running SpamAssassin.
33
The current implementation only supports a global Bayes database, i.e.
34
per-recipient sub-databases are not supported. The Redis 2.6.* server
35
supports access over IPv4 or over a Unix socket, starting with Redis
36
version 2.8.0 also IPv6 is supported. Bear in mind that Redis server only
37
offers limited access controls, so it is advisable to let the Redis server
38
bind to a loopback interface only, or to use other mechanisms to limit
39
access, such as local firewall rules.
41
The Redis backend for Bayes can put a Lua scripting support in a Redis
42
server to good use, improving performance. The Lua support is available
43
in Redis server since version 2.6. In absence of a Lua support, the Redis
44
backend uses batched (pipelined) traditional Redis commands, so it should
45
work with a Redis server version 2.4 (untested), although this is not
46
recommended for busy sites.
48
Expiration of token and 'seen' message id entries is left to the Redis
49
server. There is no provision for manually expiring a database, so it is
50
highly recommended to leave the setting bayes_auto_expire to its default
51
value 1 (i.e. enabled).
53
Example configuration:
55
bayes_store_module Mail::SpamAssassin::BayesStore::Redis
56
bayes_sql_dsn server=127.0.0.1:6379;password=foo;database=2
61
A redis server with a Lua support (2.6 or higher) is recommended
62
for performance reasons.
64
The bayes_sql_dsn config variable has been hijacked for our purposes:
68
Optional config parameters affecting a connection to a redis server.
70
This is a semicolon-separated list of option=value pairs, where an option
71
can be: server, password, database. Unrecognized options are silently
74
The 'server' option specifies a socket on which a redis server is
75
listening. It can be an absolute path of a Unix socket, or a host:port
76
pair, where a host can be an IPv4 or IPv6 address or a host name.
77
An IPv6 address must be enclosed in brackets, e.g. [::1]:6379
78
(IPv6 support in a redis server is available since version 2.8.0).
79
A default is to connect to an INET socket at 127.0.0.1, port 6379.
81
The value of a 'password' option is sent in an AUTH command to a redis
82
server on connecting if a server requests authentication. A password is
83
sent in plain text and a redis server only offers an optional rudimentary
84
authentication. To limit access to a redis server use its 'bind' option
85
to bind to a specific interface (typically to a loopback interface),
86
or use a host-based firewall.
88
The value of a 'database' option can be an non-negative (small) integer,
89
which is passed to a redis server with a SELECT command on connecting,
90
and chooses a sub-database index. A default database index is 0.
92
Example: server=localhost:6379;password=foo;database=2
96
Controls token expiry (ttl value in SECONDS, sent as-is to Redis)
97
when bayes_auto_expire is true. Default value is 3 weeks (but check
98
Mail::SpamAssassin::Conf.pm to make sure).
102
Controls 'seen' expiry (ttl value in SECONDS, sent as-is to Redis)
103
when bayes_auto_expire is true. Default value is 8 days (but check
104
Mail::SpamAssassin::Conf.pm to make sure).
106
Expiry is done internally in Redis using *_ttl settings mentioned above,
107
but only if bayes_auto_expire is true (which is a default). This is
108
why --force-expire etc does nothing, and token counts and atime values
109
are shown as zero in statistics.
111
LIMITATIONS: Only global bayes storage is implemented, per-user bayes is
112
not currently available. Dumping (sa-learn --backup, or --dump) of a huge
113
database may not be possible if all keys do not fit into process memory.
117
package Mail::SpamAssassin::BayesStore::Redis;
124
use Mail::SpamAssassin::Util qw(untaint_var);
125
use Mail::SpamAssassin::Timeout;
128
eval { require Digest::SHA; import Digest::SHA qw(sha1); 1 }
129
or do { require Digest::SHA1; import Digest::SHA1 qw(sha1) }
132
use Mail::SpamAssassin::Logger;
133
use Mail::SpamAssassin::BayesStore;
134
use Mail::SpamAssassin::Util::TinyRedis;
136
use vars qw( @ISA $VERSION );
140
@ISA = qw( Mail::SpamAssassin::BayesStore );
147
public class (Mail::SpamAssassin::BayesStore::Redis) new (Mail::Spamassassin::Plugin::Bayes $bayes)
150
This methods creates a new instance of the Mail::SpamAssassin::BayesStore::Redis
151
object. It expects to be passed an instance of the Mail::SpamAssassin:Bayes
152
object which is passed into the Mail::SpamAssassin::BayesStore parent object.
158
$class = ref($class) || $class;
159
my $self = $class->SUPER::new(@_);
161
my $bconf = $self->{bayes}->{conf};
163
foreach (split(';', $bconf->{bayes_sql_dsn})) {
164
my ($a, $b) = split('=');
166
warn("bayes: invalid bayes_sql_dsn config\n");
168
} elsif ($a eq 'database') {
170
} elsif ($a eq 'password') {
171
$self->{password} = $b;
173
push @{$self->{redis_conf}}, $a => $b eq 'undef' ?
174
undef : untaint_var($b);
178
if (!$bconf->{bayes_auto_expire}) {
179
$self->{expire_token} = $self->{expire_seen} = undef;
180
warn("bayes: the setting bayes_auto_expire is off, this is ".
181
"not a recommended setting for the Redis bayes backend");
183
$self->{expire_token} = $bconf->{bayes_token_ttl};
184
undef $self->{expire_token} if $self->{expire_token} &&
185
$self->{expire_token} < 0;
186
$self->{expire_seen} = $bconf->{bayes_seen_ttl};
187
undef $self->{expire_seen} if $self->{expire_seen} &&
188
$self->{expire_seen} < 0;
191
$self->{supported_db_version} = 3;
192
$self->{connected} = 0;
193
$self->{is_officially_open} = 0;
194
$self->{is_writable} = 0;
196
$self->{timer} = Mail::SpamAssassin::Timeout->new({
197
secs => $self->{conf}->{redis_timeout} || 10
205
if ($self->{connected}) {
207
dbg("bayes: Redis disconnect");
208
$self->{connected} = 0; undef $self->{redis};
215
$self->{connected} = 0; undef $self->{redis};
218
# Called from a Redis module on Redis->new and on automatic re-connect.
219
# The on_connect() callback must not use batched calls!
222
my $db_id = $self->{db_id} || 0;
223
dbg("bayes: Redis on-connect, db_id %d", $db_id);
225
$r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
227
if ($@ =~ /\bNOAUTH\b/) {
228
defined $self->{password}
229
or die "Redis server requires authentication, no password provided";
230
$r->call('AUTH', $self->{password});
231
$r->call('SELECT', $db_id);
233
chomp $@; die "Redis error: $@";
237
$r->call('CLIENT', 'SETNAME', 'sa['.$$.']');
239
dbg("bayes: CLIENT SETNAME command failed, don't worry, ".
240
"possibly an old redis version: %s", $@);
248
$self->disconnect if $self->{connected};
249
undef $self->{redis}; # just in case
251
my $err = $self->{timer}->run_and_catch(sub {
252
$self->{opened_from_pid} = $$;
253
# will keep a persistent session open to a redis server
254
$self->{redis} = Mail::SpamAssassin::Util::TinyRedis->new(
255
@{$self->{redis_conf}},
256
on_connect => sub { $self->on_connect(@_) },
258
$self->{redis} or die "Error: $!";
260
if ($self->{timer}->timed_out()) {
261
undef $self->{redis};
262
die "bayes: Redis connection timed out!";
264
undef $self->{redis};
265
die "bayes: Redis failed: $err";
267
$self->{connected} = 1;
272
public instance (Boolean) prefork_init ();
275
This optional method is called in the parent process shortly before
276
forking off child processes.
283
# Each child process must establish its own connection with a Redis server,
284
# re-using a common forked socket leads to serious trouble (garbled data).
286
# Parent process may have established its connection during startup, but
287
# it is no longer of any use by now, so we shut it down here in the master
288
# process, letting a spawned child process re-establish it later.
290
if ($self->{connected}) {
291
dbg("bayes: prefork_init, closing a session ".
292
"with a Redis server in a parent process");
298
=head2 spamd_child_init
300
public instance (Boolean) spamd_child_init ();
303
This optional method is called in a child process shortly after being spawned.
307
sub spamd_child_init {
310
# Each child process must establish its own connection with a Redis server,
311
# re-using a common forked socket leads to serious trouble (garbled data).
313
# Just in case the parent master process did not call prefork_init() above,
314
# we try to silently renounce the use of existing cloned connection here.
315
# As the prefork_init plugin callback has only been introduced in
316
# SpamAssassin 3.4.0, this situation can arrise in case of some third party
317
# software (or a pre-3.4.0 version of spamd) is somehow using this plugin.
318
# Better safe than sorry...
320
if ($self->{connected}) {
321
dbg("bayes: spamd_child_init, closing a parent's session ".
322
"to a Redis server in a child process");
324
$self->disconnect; # just drop it, don't shut down parent's session
328
=head2 tie_db_readonly
330
public instance (Boolean) tie_db_readonly ();
333
This method ensures that the database connection is properly setup and working.
337
sub tie_db_readonly {
340
$self->{is_writable} = 0;
342
if ($self->{connected}) {
343
$success = $self->{is_officially_open} = 1;
345
$success = $self->_open_db();
351
=head2 tie_db_writable
353
public instance (Boolean) tie_db_writable ()
356
This method ensures that the database connection is properly setup and
357
working. If necessary it will initialize the database so that they can
358
begin using the database immediately.
362
sub tie_db_writable {
365
$self->{is_writable} = 0;
367
if ($self->{connected}) {
368
$success = $self->{is_officially_open} = 1;
370
$success = $self->_open_db();
373
$self->{is_writable} = 1 if $success;
380
private instance (Boolean) _open_db (Boolean $writable)
383
This method ensures that the database connection is properly setup and
384
working. It will initialize bayes variables so that they can begin using
385
the database immediately.
392
dbg("bayes: _open_db(%s)",
393
$self->{connected} ? 'already connected' : 'not yet connected');
395
if ($self->{connected}) {
396
$self->{is_officially_open} = 1;
400
$self->read_db_configs();
403
if (!defined $self->{redis_server_version}) {
404
my $info = $self->{info} = $self->{redis}->call("INFO");
406
my $redis_mem; local $1;
407
$self->{redis_server_version} =
408
$info =~ /^redis_version:\s*(.*?)\r?$/m ? $1 : '';
409
$self->{have_lua} = $info =~ /^used_memory_lua:/m ? 1 : 0;
410
$redis_mem = $1 if $info =~ /^used_memory:\s*(.*?)\r?$/m;
411
dbg("bayes: redis server version %s, memory used %.1f MiB, Lua %s",
412
$self->{redis_server_version}, $redis_mem/1024/1024,
413
$self->{have_lua} ? 'is available' : 'is not available');
417
$self->{db_version} = $self->{redis}->call('GET', 'v:DB_VERSION');
419
if (!$self->{db_version}) {
420
$self->{db_version} = $self->DB_VERSION;
421
my $ret = $self->{redis}->call('MSET',
422
'v:DB_VERSION', $self->{db_version},
425
'v:TOKEN_FORMAT', 2 );
427
warn("bayes: failed to initialize database");
430
dbg("bayes: initialized empty database, version $self->{db_version}");
433
dbg("bayes: found bayes db version %s", $self->{db_version});
434
if ($self->{db_version} ne $self->DB_VERSION) {
435
warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
438
my $token_format = $self->{redis}->call('GET', 'v:TOKEN_FORMAT') || 0;
439
if ($token_format < 2) {
440
warn("bayes: bayes old token format $token_format not supported, ".
441
"consider backup/restore or initialize a database\n");
446
if ($self->{have_lua} && !defined $self->{multi_hmget_script}) {
447
$self->_define_lua_scripts;
450
$self->{is_officially_open} = 1;
457
public instance () untie_db ()
460
Closes any open db handles. You can safely call this at any time.
467
$self->{is_officially_open} = $self->{is_writable} = 0;
473
public instance (Boolean) sync_due ()
476
This method determines if a database sync is currently required.
478
Unused for Redis implementation.
488
public instance (Boolean) expiry_due ()
491
This methods determines if an expire is due.
493
Unused for Redis implementation.
503
public instance (String) seen_get (string $msgid)
506
This method retrieves the stored value, if any, for C<$msgid>. The return
507
value is the stored string ('s' for spam and 'h' for ham) or undef if C<$msgid>
513
my($self, $msgid) = @_;
515
return $self->{redis}->call('GET', "s:$msgid");
520
public (Boolean) seen_put (string $msgid, char $flag)
523
This method records C<$msgid> as the type given by C<$flag>. C<$flag> is one
524
of two values 's' for spam and 'h' for ham.
529
my($self, $msgid, $flag) = @_;
531
my $r = $self->{redis};
532
if ($self->{expire_seen}) {
533
$r->call('SETEX', "s:$msgid", $self->{expire_seen}, $flag);
535
$r->call('SET', "s:$msgid", $flag);
543
public instance (Boolean) seen_delete (string $msgid)
546
This method removes C<$msgid> from the database.
551
my($self, $msgid) = @_;
553
$self->{redis}->call('DEL', "s:$msgid");
557
=head2 get_storage_variables
559
public instance (@) get_storage_variables ()
562
This method retrieves the various administrative variables used by
563
the Bayes process and database.
565
The values returned in the array are in the following order:
570
3: number of tokens in db
572
5: oldest token in db atime
576
9: last expire reduction count
577
10: newest token in db atime
579
Only 1,2,6 are used with Redis, others return zero always.
583
sub get_storage_variables {
584
my($self, @varnames) = @_;
586
@varnames = qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
587
OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
588
LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
589
TOKEN_FORMAT} if !@varnames;
590
my $values = $self->{redis}->call('MGET', map('v:'.$_, @varnames));
592
return map(defined $_ ? $_ : 0, @$values);
595
=head2 get_running_expire_tok
597
public instance (String $time) get_running_expire_tok ()
600
This method determines if an expire is currently running and returns
605
sub get_running_expire_tok {
609
=head2 set_running_expire_tok
611
public instance (String $time) set_running_expire_tok ()
614
This method sets the time that an expire starts running.
618
sub set_running_expire_tok {
622
=head2 remove_running_expire_tok
624
public instance (Boolean) remove_running_expire_tok ()
627
This method removes the row in the database that indicates that
628
and expire is currently running.
632
sub remove_running_expire_tok {
638
public instance (Integer, Integer, Integer) tok_get (String $token)
641
This method retrieves a specificed token (C<$token>) from the database
642
and returns its spam_count, ham_count and last access time.
647
my($self, $token) = @_;
649
my $array = $self->tok_get_all($token);
650
return if !$array || !@$array;
651
return (@{$array->[0]})[1,2,3];
656
public instance (\@) tok_get (@ $tokens)
659
This method retrieves the specified tokens (C<$tokens>) from storage and
660
returns a ref to arrays spam count, ham count and last access time.
666
# my @keys = @_; # avoid copying strings unnecessarily
669
$self->connect if !$self->{connected};
670
my $r = $self->{redis};
672
if (! $self->{have_lua} ) {
674
$r->b_call('HMGET', 'w:'.$_, 's', 'h') for @_;
675
my $results = $r->b_results;
677
if (@$results != @_) {
679
die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
680
scalar @$results, scalar @_);
682
for my $j (0 .. $#$results) {
683
my($s,$h) = @{$results->[$j]};
684
push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0]) if $s || $h;
689
# no need for cryptographical strength, just checking for protocol errors
690
my $nonce = sprintf("%06x", rand(0xffffff));
694
$result = $r->call('EVALSHA', $self->{multi_hmget_script},
695
scalar @_, map('w:'.$_, @_), $nonce);
697
} or do { # Lua script probably not cached, define again and re-try
698
if ($@ !~ /^NOSCRIPT/) {
700
die "bayes: Redis LUA error: $@\n";
702
$self->_define_lua_scripts;
703
$result = $r->call('EVALSHA', $self->{multi_hmget_script},
704
scalar @_, map('w:'.$_, @_), $nonce);
706
my @items = split(' ', $result);
707
my $r_nonce = pop(@items);
708
if ($r_nonce ne $nonce) {
709
# redis protocol error?
711
die sprintf("bayes: tok_get_all nonce mismatch, expected %s, got %s\n",
712
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
713
} elsif (@items != @_) {
715
die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
716
scalar @items, scalar @_);
718
for my $j (0 .. $#items) {
719
my($s,$h) = split(m{/}, $items[$j], 2);
720
push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0]) if $s || $h;
725
dbg("bayes: tok_get_all found %d tokens out of %d",
726
scalar @values, scalar @_);
731
=head2 tok_count_change
733
public instance (Boolean) tok_count_change (
734
Integer $dspam, Integer $dham, String $token, String $newatime)
737
This method takes a C<$spam_count> and C<$ham_count> and adds it to
738
C<$tok> along with updating C<$tok>s atime with C<$atime>.
742
sub tok_count_change {
743
my($self, $dspam, $dham, $token, $newatime) = @_;
745
$self->multi_tok_count_change($dspam, $dham, {$token => 1}, $newatime);
748
=head2 multi_tok_count_change
750
public instance (Boolean) multi_tok_count_change (
751
Integer $dspam, Integer $dham, \% $tokens, String $newatime)
754
This method takes a C<$dspam> and C<$dham> and adds it to all of the
755
tokens in the C<$tokens> hash ref along with updating each token's
756
atime with C<$atime>.
760
sub multi_tok_count_change {
761
my($self, $dspam, $dham, $tokens, $newatime) = @_;
763
# turn undef or an empty string into a 0
766
# the increment must be an integer, otherwise redis returns an error
768
dbg("bayes: multi_tok_count_change learning %d spam, %d ham",
771
my $ttl = $self->{expire_token}; # time-to-live, in seconds
773
$self->connect if !$self->{connected};
774
my $r = $self->{redis};
776
if ($dspam > 0 || $dham > 0) { # learning
777
while (my($token,$v) = each(%$tokens)) {
778
my $key = 'w:'.$token;
779
$r->b_call('HINCRBY', $key, 's', int $dspam) if $dspam > 0;
780
$r->b_call('HINCRBY', $key, 'h', int $dham) if $dham > 0;
781
$r->b_call('EXPIRE', $key, $ttl) if $ttl;
783
$r->b_results; # collect response, ignoring results
786
if ($dspam < 0 || $dham < 0) { # unlearning - rare, not as efficient
787
while (my($token,$v) = each(%$tokens)) {
788
my $key = 'w:'.$token;
790
my $result = $r->call('HINCRBY', $key, 's', int $dspam);
791
if (!$result || $result <= 0) {
792
$r->call('HDEL', $key, 's');
794
$r->call('EXPIRE', $key, $ttl);
798
my $result = $r->call('HINCRBY', $key, 'h', int $dham);
799
if (!$result || $result <= 0) {
800
$r->call('HDEL', $key, 'h');
802
$r->call('EXPIRE', $key, $ttl);
811
=head2 nspam_nham_get
813
public instance ($spam_count, $ham_count) nspam_nham_get ()
816
This method retrieves the total number of spam and the total number of
824
my @vars = $self->get_storage_variables('NSPAM', 'NHAM');
825
dbg("bayes: nspam_nham_get nspam=%s, nham=%s", @vars);
829
=head2 nspam_nham_change
831
public instance (Boolean) nspam_nham_change (Integer $num_spam,
835
This method updates the number of spam and the number of ham in the database.
839
sub nspam_nham_change {
840
my($self, $ds, $dh) = @_;
842
return 1 unless $ds || $dh;
844
$self->connect if !$self->{connected};
845
my $r = $self->{redis};
847
my $err = $self->{timer}->run_and_catch(sub {
848
$r->b_call('INCRBY', "v:NSPAM", $ds) if $ds;
849
$r->b_call('INCRBY', "v:NHAM", $dh) if $dh;
850
$r->b_results; # collect response, ignoring results
853
if ($self->{timer}->timed_out()) {
855
die("bayes: Redis connection timed out!");
859
die("bayes: failed to increment nspam $ds nham $dh: $err");
867
public instance (Boolean) tok_touch (String $token,
871
This method updates the given tokens (C<$token>) atime.
873
The assumption is that the token already exists in the database.
875
We will never update to an older atime
880
my($self, $token, $atime) = @_;
882
return $self->tok_touch_all([$token], $atime);
887
public instance (Boolean) tok_touch (\@ $tokens
891
This method does a mass update of the given list of tokens C<$tokens>,
892
if the existing token atime is < C<$atime>.
897
my($self, $tokens, $newatime) = @_;
899
my $ttl = $self->{expire_token}; # time-to-live, in seconds
900
return 1 unless $ttl && $tokens && @$tokens;
902
dbg("bayes: tok_touch_all setting expire to %s on %d tokens",
903
$ttl, scalar @$tokens);
905
$self->connect if !$self->{connected};
906
my $r = $self->{redis};
908
# Benchmarks for a 'with-Lua' vs. a 'batched non-Lua' case show same speed,
909
# so for simplicity we only kept a batched non-Lua code. Note that this
910
# only applies to our own implementation of the Redis client protocol
911
# which offers efficient command batching (pipelining) - with the Redis
912
# CPAN module the batched case would be worse by about 33% on the average.
914
# We just refresh TTL on all
916
$r->b_call('EXPIRE', 'w:'.$_, $ttl) for @$tokens;
917
$r->b_results; # collect response, ignoring results
924
public instance (Boolean) cleanup ()
927
This method perfoms any cleanup necessary before moving onto the next
938
public instance (String) get_magic_re ()
941
This method returns a regexp which indicates a magic token.
945
use constant get_magic_re => undef;
949
public instance (Boolean) sync (\% $opts)
952
This method performs a sync of the database
960
=head2 perform_upgrade
962
public instance (Boolean) perform_upgrade (\% $opts);
965
Performs an upgrade of the database from one version to another, not
966
currently used in this implementation.
970
sub perform_upgrade {
974
=head2 clear_database
976
public instance (Boolean) clear_database ()
979
This method deletes all records for a particular user.
981
Callers should be aware that any errors returned by this method
982
could causes the database to be inconsistent for the given user.
990
warn("bayes: note: assuming the database is empty; ".
991
"to manually clear a database: redis-cli -n <db-ind> FLUSHDB\n");
998
public instance () dump_db_toks (String $template, String $regex, Array @vars)
1001
This method loops over all tokens, computing the probability for the token
1002
and then printing it out according to the passed in token.
1007
my ($self, $template, $regex, @vars) = @_;
1009
return 0 unless $self->tie_db_readonly;
1010
$self->connect if !$self->{connected};
1011
my $r = $self->{redis};
1013
my $atime = time; # fake
1015
# let's get past this terrible command as fast as possible
1016
# (ignoring $regex which makes no sense with SHA digests)
1017
my $keys = $r->call('KEYS', 'w:*');
1018
dbg("bayes: fetched %d token keys", scalar @$keys);
1020
# process tokens in chunks of 1000
1021
for (my $i = 0; $i <= $#$keys; $i += 1000) {
1022
my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
1025
if (! $self->{have_lua}) { # no Lua, 3-times slower
1027
for (my $j = $i; $j <= $end; $j++) {
1028
$r->b_call('HMGET', $keys->[$j], 's', 'h');
1031
my $itemslist_ref = $r->b_results;
1032
foreach my $item ( @$itemslist_ref ) {
1035
[ substr($keys->[$j],2), ($s||0)+0, ($h||0)+0 ]) if $s || $h;
1041
my $nonce = sprintf("%06x", rand(0xffffff));
1042
my @tokens = @{$keys}[$i .. $end];
1043
my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
1044
scalar @tokens, @tokens, $nonce);
1045
my @items = split(' ', $result);
1046
my $r_nonce = pop(@items);
1047
if (!defined $r_nonce) {
1049
die "bayes: dump_db_toks received no results\n";
1050
} elsif ($r_nonce ne $nonce) {
1051
# redis protocol error?
1053
die sprintf("bayes: dump_db_toks nonce mismatch, ".
1054
"expected %s, got %s\n",
1055
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
1056
} elsif (@items != @tokens) {
1058
die sprintf("bayes: dump_db_toks got %d entries, expected %d\n",
1059
scalar @items, scalar @tokens);
1061
# stripping a leading "w:"
1062
@tokensdata = map { my($s,$h) = split(m{/}, shift @items, 2);
1063
[ substr($_,2), ($s||0)+0, ($h||0)+0 ] } @tokens;
1066
my $probabilities_ref =
1067
$self->{bayes}->_compute_prob_for_all_tokens(\@tokensdata,
1068
$vars[1], $vars[2]);
1069
foreach my $tokendata (@tokensdata) {
1070
my $prob = shift(@$probabilities_ref);
1071
my($token, $s, $h) = @$tokendata;
1073
$prob = 0.5 if !defined $prob;
1074
my $encoded = unpack("H*", $token);
1075
printf($template, $prob, $s, $h, $atime, $encoded)
1076
or die "Error writing tokens: $!";
1079
dbg("bayes: written token keys");
1086
=head2 backup_database
1088
public instance (Boolean) backup_database ()
1091
This method will dump the users database in a machine readable format.
1095
sub backup_database {
1098
return 0 unless $self->tie_db_readonly;
1099
$self->connect if !$self->{connected};
1100
my $r = $self->{redis};
1102
my $atime = time; # fake
1103
my @vars = $self->get_storage_variables(qw(DB_VERSION NSPAM NHAM));
1104
print "v\t$vars[0]\tdb_version # this must be the first line!!!\n";
1105
print "v\t$vars[1]\tnum_spam\n";
1106
print "v\t$vars[2]\tnum_nonspam\n";
1108
# let's get past this terrible command as fast as possible
1109
my $keys = $r->call('KEYS', 'w:*');
1110
dbg("bayes: fetched %d token keys", scalar @$keys);
1112
# process tokens in chunks of 1000
1113
for (my $i = 0; $i <= $#$keys; $i += 1000) {
1114
my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
1116
if (! $self->{have_lua}) { # no Lua, slower
1118
for (my $j = $i; $j <= $end; $j++) {
1119
$r->b_call('HMGET', $keys->[$j], 's', 'h');
1122
my $itemslist_ref = $r->b_results;
1123
foreach my $item ( @$itemslist_ref ) {
1124
my $encoded = unpack("H*", substr($keys->[$j++], 2));
1126
printf("t\t%d\t%d\t%s\t%s\n",
1127
$s||0, $h||0, $atime, $encoded) if $s || $h;
1132
my $nonce = sprintf("%06x", rand(0xffffff));
1133
my @tokens = @{$keys}[$i .. $end];
1134
my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
1135
scalar @tokens, @tokens, $nonce);
1136
my @items = split(' ', $result);
1137
my $r_nonce = pop(@items);
1138
if (!defined $r_nonce) {
1140
die "bayes: backup_database received no results\n";
1141
} elsif ($r_nonce ne $nonce) {
1142
# redis protocol error?
1144
die sprintf("bayes: backup_database nonce mismatch, ".
1145
"expected %s, got %s\n",
1146
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
1147
} elsif (@items != @tokens) {
1149
die sprintf("bayes: backup_database got %d entries, expected %d\n",
1150
scalar @items, scalar @tokens);
1152
foreach my $token (@tokens) {
1153
my($s,$h) = split(m{/}, shift @items, 2);
1155
my $encoded = unpack("H*", substr($token,2)); # strip leading "w:"
1156
printf("t\t%d\t%d\t%s\t%s\n", $s||0, $h||0, $atime, $encoded);
1160
dbg("bayes: written token keys");
1162
$keys = $r->call('KEYS', 's:*');
1163
dbg("bayes: fetched %d seen keys", scalar @$keys);
1165
for (my $i = 0; $i <= $#$keys; $i += 1000) {
1166
my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
1167
my @t = @{$keys}[$i .. $end];
1168
my $v = $r->call('MGET', @t);
1169
for (my $i = 0; $i < @$v; $i++) {
1170
next unless defined $v->[$i];
1171
printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
1174
dbg("bayes: written seen keys");
1181
=head2 restore_database
1183
public instance (Boolean) restore_database (String $filename, Boolean $showdots)
1186
This method restores a database from the given filename, C<$filename>.
1188
Callers should be aware that any errors returned by this method
1189
could causes the database to be inconsistent for the given user.
1193
sub restore_database {
1194
my ($self, $filename, $showdots) = @_;
1197
if (!open(DUMPFILE, '<', $filename)) {
1198
warn("bayes: unable to open backup file $filename: $!");
1202
unless ($self->clear_database()) {
1206
return 0 unless $self->tie_db_writable;
1207
$self->connect if !$self->{connected};
1208
my $r = $self->{redis};
1210
my $token_count = 0;
1216
my $line = <DUMPFILE>;
1217
defined $line or die "Error reading dump file: $!";
1219
# We require the database version line to be the first in the file so we can
1220
# figure out how to properly deal with the file. If it is not the first
1222
if ($line =~ m/^v\s+(\d+)\s+db_version/) {
1225
warn("bayes: database version must be the first line in the backup file, correct and re-run");
1229
unless ($db_version == 2 || $db_version == 3) {
1230
warn("bayes: database version $db_version is unsupported, must be version 2 or 3\n");
1236
my $token_ttl = $self->{expire_token}; # possibly undefined
1237
my $seen_ttl = $self->{expire_seen}; # possibly undefined
1239
for ($!=0; defined($line=<DUMPFILE>); $!=0) {
1243
if ($showdots && $line_count % 1000 == 0) {
1244
print STDERR "." if $showdots;
1247
if ($line =~ /^t\s+/) { # token line
1248
my @parsed_line = split(/\s+/, $line, 5);
1249
my $spam_count = $parsed_line[1] + 0;
1250
my $ham_count = $parsed_line[2] + 0;
1251
my $token = $parsed_line[4];
1253
$spam_count = 0 if $spam_count < 0;
1254
$ham_count = 0 if $ham_count < 0;
1256
next if !$spam_count && !$ham_count;
1258
if ($db_version < 3) {
1259
# versions < 3 use plain text tokens, so we need to convert to hash
1260
$token = substr(sha1($token), -5);
1262
# turn unpacked binary token back into binary value
1263
$token = pack("H*",$token);
1265
my $key = 'w:'.$token;
1266
$r->b_call('HINCRBY', $key, 's', int $spam_count) if $spam_count > 0;
1267
$r->b_call('HINCRBY', $key, 'h', int $ham_count) if $ham_count > 0;
1270
# by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
1271
# we avoid auto-expiration of many tokens all at once,
1272
# introducing an unnecessary load spike on a redis server
1273
$r->b_call('EXPIRE', $key, int($token_ttl * (rand()+0.7)));
1276
# collect response every now and then, ignoring results
1277
$r->b_results if ++$q_cnt % 1000 == 0;
1281
} elsif ($line =~ /^s\s+/) { # seen line
1282
my @parsed_line = split(/\s+/, $line, 3);
1283
my $flag = $parsed_line[1];
1284
my $msgid = $parsed_line[2];
1286
unless ($flag eq 'h' || $flag eq 's') {
1287
dbg("bayes: unknown seen flag ($flag) for line: $line, skipping");
1292
dbg("bayes: blank msgid for line: $line, skipping");
1297
$r->b_call('SET', "s:$msgid", $flag);
1299
# by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
1300
# we avoid auto-expiration of many 'seen' entries all at once,
1301
# introducing an unnecessary load spike on a redis server
1302
$r->b_call('SETEX', "s:$msgid", int($seen_ttl * (rand()+0.7)), $flag);
1305
# collect response every now and then, ignoring results
1306
$r->b_results if ++$q_cnt % 1000 == 0;
1308
} elsif ($line =~ /^v\s+/) { # variable line
1309
my @parsed_line = split(/\s+/, $line, 3);
1310
my $value = $parsed_line[1] + 0;
1311
if ($parsed_line[2] eq 'num_spam') {
1313
} elsif ($parsed_line[2] eq 'num_nonspam') {
1316
dbg("bayes: restore_database: skipping unknown line: $line");
1320
dbg("bayes: skipping unknown line: $line");
1325
$r->b_results; # collect any remaining response, ignoring results
1327
defined $line || $!==0 or
1328
$!==EBADF ? dbg("bayes: error reading dump file: $!")
1329
: die "error reading dump file: $!";
1330
close(DUMPFILE) or die "Can't close dump file: $!";
1332
print STDERR "\n" if $showdots;
1334
if ($num_spam <= 0 && $num_ham <= 0) {
1335
warn("bayes: no num_spam/num_ham found, aborting");
1339
$self->nspam_nham_change($num_spam, $num_ham);
1342
dbg("bayes: parsed $line_count lines");
1343
dbg("bayes: created database with $token_count tokens ".
1344
"based on $num_spam spam messages and $num_ham ham messages");
1353
public instance (Boolean) db_readable()
1356
This method returns a boolean value indicating if the database is in a
1364
return $self->{is_officially_open};
1369
public instance (Boolean) db_writable()
1372
This method returns a boolean value indicating if the database is in a
1380
return $self->{is_officially_open} && $self->{is_writable};
1387
sub _define_lua_scripts {
1389
dbg("bayes: defining Lua scripts");
1391
$self->connect if !$self->{connected};
1392
my $r = $self->{redis};
1394
$self->{multi_hmget_script} = $r->call('SCRIPT', 'LOAD', <<'END');
1395
local rcall = redis.call
1396
local nonce = ARGV[1]
1400
local sh = rcall("HMGET", KEYS[j], "s", "h")
1401
-- returns counts as a list of spam/ham pairs, zeroes may be omitted
1402
local s, h = sh[1] or "0", sh[2] or "0"
1405
pair = s -- just a spam field, possibly zero; a ham field omitted
1406
elseif s == "0" then
1407
pair = "/" .. h -- just a ham field, zero in a spam field suppressed
1409
pair = s .. "/" .. h
1414
-- return counts as a single string, avoids overhead of multiresult parsing
1415
return table.concat(r," ")