~ubuntu-branches/ubuntu/utopic/spamassassin/utopic-updates

« back to all changes in this revision

Viewing changes to lib/Mail/SpamAssassin/AsyncLoop.pm

  • Committer: Package Import Robot
  • Author(s): Noah Meyerhans
  • Date: 2014-02-14 22:45:15 UTC
  • mfrom: (0.8.1) (0.6.2) (5.1.22 sid)
  • Revision ID: package-import@ubuntu.com-20140214224515-z1es2twos8xh7n2y
Tags: 3.4.0-1
* New upstream version! (Closes: 738963, 738872, 738867)
* Scrub the environment when switching to the debian-spamd user in
  postinst and cron.daily. (Closes: 738951)
* Enhancements to postinst to better manage ownership of
  /var/lib/spamassassin, via Iain Lane <iain.lane@canonical.com>
  (Closes: 738974)

Show diffs side-by-side

added added

removed removed

Lines of Context:
61
61
#############################################################################
62
62
 
63
63
sub new {
 
64
  # called from PerMsgStatus, a new AsyncLoop object is created
 
65
  # for each new message processing
64
66
  my $class = shift;
65
67
  $class = ref($class) || $class;
66
68
 
73
75
    total_queries_completed => 0,
74
76
    pending_lookups     => { },
75
77
    timing_by_query     => { },
 
78
    all_lookups         => { },  # keyed by "rr_type/domain"
76
79
  };
77
80
 
78
81
  bless ($self, $class);
79
82
  $self;
80
83
}
81
84
 
 
85
# Given a domain name, produces a listref of successively stripped down
 
86
# parent domains, e.g. a domain '2.10.Example.COM' would produce a list:
 
87
# '2.10.example.com', '10.example.com', 'example.com', 'com', ''
 
88
#
 
89
sub domain_to_search_list {
 
90
  my ($domain) = @_;
 
91
  $domain =~ s/^\.+//; $domain =~ s/\.+\z//;  # strip leading and trailing dots
 
92
  my @search_keys;
 
93
  if ($domain =~ /\[/) {  # don't split address literals
 
94
    @search_keys = ( $domain, '' );  # presumably an address literal
 
95
  } else {
 
96
    local $1;
 
97
    $domain = lc $domain;
 
98
    for (;;) {
 
99
      push(@search_keys, $domain);
 
100
      last  if $domain eq '';
 
101
      # strip one level
 
102
      $domain = ($domain =~ /^ (?: [^.]* ) \. (.*) \z/xs) ? $1 : '';
 
103
    }
 
104
    if (@search_keys > 20) {  # enforce some sanity limit
 
105
      @search_keys = @search_keys[$#search_keys-19 .. $#search_keys];
 
106
    }
 
107
  }
 
108
  return \@search_keys;
 
109
}
 
110
 
82
111
# ---------------------------------------------------------------------------
83
112
 
84
 
=item $obj = $async->start_lookup($obj)
 
113
=item $ent = $async->start_lookup($ent, $master_deadline)
85
114
 
86
 
Register the start of a long-running asynchronous lookup operation. C<$obj>
87
 
is a hash reference containing the following items:
 
115
Register the start of a long-running asynchronous lookup operation.
 
116
C<$ent> is a hash reference containing the following items:
88
117
 
89
118
=over 4
90
119
 
106
135
A string, typically one word, used to describe the type of lookup in log
107
136
messages, such as C<DNSBL>, C<MX>, C<TXT>.
108
137
 
109
 
=item poll_callback (optional)
110
 
 
111
 
A code reference, which will be called periodically during the
112
 
background-processing period.  If you will be performing an async lookup on a
113
 
non-DNS-based service, you will need to implement this so that it checks for
114
 
new responses and calls C<set_response_packet()> or C<report_id_complete()> as
115
 
appropriate.   DNS-based lookups can leave it undefined, since
116
 
DnsResolver::poll_responses() will be called automatically anyway.
117
 
 
118
 
The code reference will be called with one argument, the C<$ent> object.
119
 
 
120
 
=item completed_callback (optional)
121
 
 
122
 
A code reference which will be called when an asynchronous task (e.g. a
123
 
DNS lookup) is completed, either normally, or aborted, e.g. by a timeout.
124
 
 
125
 
When a task has been reported as completed via C<set_response_packet()>
126
 
the response (as provided to C<set_response_packet()>) is stored in
127
 
$ent->{response_packet} (possibly undef, its semantics is defined by the
128
 
caller). When completion is reported via C<report_id_complete()> or a
129
 
task was aborted, the $ent->{response_packet} is guaranteed to be undef.
130
 
If it is necessary to distinguish between the last two cases, the
131
 
$ent->{status} may be examined for a string 'ABORTING' or 'FINISHED'.
132
 
 
133
 
The code reference will be called with one argument, the C<$ent> object.
134
 
 
135
138
=item zone (optional)
136
139
 
137
140
A zone specification (typically a DNS zone name - e.g. host, domain, or RBL)
163
166
 
164
167
=back
165
168
 
166
 
C<$obj> is returned by this method.
 
169
C<$ent> is returned by this method, with its contents augmented by additional
 
170
information.
167
171
 
168
172
=cut
169
173
 
170
174
sub start_lookup {
171
175
  my ($self, $ent, $master_deadline) = @_;
172
176
 
173
 
  die "oops, no id"   unless $ent->{id}   ne '';
174
 
  die "oops, no key"  unless $ent->{key}  ne '';
175
 
  die "oops, no type" unless $ent->{type} ne '';
176
 
 
177
 
  my $now = time;
 
177
  my $id  = $ent->{id};
178
178
  my $key = $ent->{key};
179
 
  my $id  = $ent->{id};
180
 
  $ent->{status} = 'STARTED';
 
179
  defined $id && $id ne ''  or die "oops, no id";
 
180
  $key                      or die "oops, no key";
 
181
  $ent->{type}              or die "oops, no type";
 
182
 
 
183
  my $now = time;
181
184
  $ent->{start_time} = $now  if !defined $ent->{start_time};
182
185
 
183
186
  # are there any applicable per-zone settings?
190
193
    for (;;) {  # 2.10.example.com, 10.example.com, example.com, com, ''
191
194
      if (exists $conf_by_zone->{$zone}) {
192
195
        $settings = $conf_by_zone->{$zone};
193
 
        dbg("async: applying by_zone settings for $zone");
194
196
        last;
195
197
      } elsif ($zone eq '') {
196
198
        last;
201
203
    }
202
204
  }
203
205
 
 
206
  dbg("async: applying by_zone settings for %s", $zone)  if $settings;
 
207
 
204
208
  my $t_init = $ent->{timeout_initial};  # application-specified has precedence
205
209
  $t_init = $settings->{rbl_timeout}  if $settings && !defined $t_init;
206
210
  $t_init = $self->{main}->{conf}->{rbl_timeout}  if !defined $t_init;
229
233
               map { ref $ent->{$_} ? @{$ent->{$_}} : $ent->{$_} }
230
234
               qw(sets rules rulename type key) );
231
235
 
 
236
  $self->{pending_lookups}->{$key} = $ent;
 
237
 
232
238
  $self->{queries_started}++;
233
239
  $self->{total_queries_started}++;
234
 
  $self->{pending_lookups}->{$key} = $ent;
235
 
 
236
240
  dbg("async: starting: %s (timeout %.1fs, min %.1fs)%s",
237
241
      $ent->{display_id}, $ent->{timeout_initial}, $ent->{timeout_min},
238
242
      !$clipped_by_master_deadline ? '' : ', capped by time limit');
 
243
 
239
244
  $ent;
240
245
}
241
246
 
242
247
# ---------------------------------------------------------------------------
243
248
 
244
 
=item $obj = $async->get_lookup($key)
 
249
=item $ent = $async->bgsend_and_start_lookup($domain, $type, $class, $ent, $cb, %options)
 
250
 
 
251
A common idiom: calls C<bgsend>, followed by a call to C<start_lookup>,
 
252
returning the argument $ent object as modified by C<start_lookup> and
 
253
filled-in with a query ID.
 
254
 
 
255
=cut
 
256
 
 
257
sub bgsend_and_start_lookup {
 
258
  my($self, $domain, $type, $class, $ent, $cb, %options) = @_;
 
259
  $ent = {}  if !$ent;
 
260
  $domain =~ s/\.+\z//s;  # strip trailing dots, these sometimes still sneak in
 
261
  $ent->{id} = undef;
 
262
  $ent->{query_type} = $type;
 
263
  $ent->{query_domain} = $domain;
 
264
  $ent->{type} = $type  if !exists $ent->{type};
 
265
  $cb = $ent->{completed_callback}  if !$cb;  # compatibility with SA < 3.4
 
266
 
 
267
  my $key = $ent->{key} || '';
 
268
 
 
269
  my $dnskey = uc($type) . '/' . lc($domain);
 
270
  my $dns_query_info = $self->{all_lookups}{$dnskey};
 
271
 
 
272
  if ($dns_query_info) {  # DNS query already underway or completed
 
273
    my $id = $ent->{id} = $dns_query_info->{id};  # re-use existing query
 
274
    return if !defined $id;  # presumably blocked, or other fatal failure
 
275
    my $id_tail = $id; $id_tail =~ s{^\d+/IN/}{};
 
276
    lc($id_tail) eq lc($dnskey)
 
277
      or info("async: unmatched id %s, key=%s", $id, $dnskey);
 
278
 
 
279
    my $pkt = $dns_query_info->{pkt};
 
280
    if (!$pkt) {  # DNS query underway, still waiting for results
 
281
      # just add our query to the existing one
 
282
      push(@{$dns_query_info->{applicants}}, [$ent,$cb]);
 
283
      dbg("async: query %s already underway, adding no.%d %s",
 
284
          $id, scalar @{$dns_query_info->{applicants}},
 
285
          $ent->{rulename} || $key);
 
286
 
 
287
    } else {  # DNS query already completed, re-use results
 
288
      # answer already known, just do the callback and be done with it
 
289
      if (!$cb) {
 
290
        dbg("async: query %s already done, re-using for %s", $id, $key);
 
291
      } else {
 
292
        dbg("async: query %s already done, re-using for %s, callback",
 
293
            $id, $key);
 
294
        eval {
 
295
          $cb->($ent, $pkt); 1;
 
296
        } or do {
 
297
          chomp $@;
 
298
          # resignal if alarm went off
 
299
          die "async: (1) $@\n"  if $@ =~ /__alarm__ignore__\(.*\)/s;
 
300
          warn sprintf("query %s completed, callback %s failed: %s\n",
 
301
                       $id, $key, $@);
 
302
        };
 
303
      }
 
304
    }
 
305
  }
 
306
 
 
307
  else {  # no existing query, open a new DNS query
 
308
    $dns_query_info = $self->{all_lookups}{$dnskey} = {};  # new query needed
 
309
    my($id, $blocked);
 
310
    my $dns_query_blockages = $self->{main}->{conf}->{dns_query_blocked};
 
311
    if ($dns_query_blockages) {
 
312
      my $search_list = domain_to_search_list($domain);
 
313
      foreach my $parent_domain (@$search_list) {
 
314
        $blocked = $dns_query_blockages->{$parent_domain};
 
315
        last if defined $blocked; # stop at first defined, can be true or false
 
316
      }
 
317
    }
 
318
    if ($blocked) {
 
319
      dbg("async: blocked by dns_query_restriction: %s", $dnskey);
 
320
    } else {
 
321
      dbg("async: launching %s for %s", $dnskey, $key);
 
322
      $id = $self->{main}->{resolver}->bgsend($domain, $type, $class, sub {
 
323
          my($pkt, $pkt_id, $timestamp) = @_;
 
324
          # this callback sub is called from DnsResolver::poll_responses()
 
325
        # dbg("async: in a bgsend_and_start_lookup callback, id %s", $pkt_id);
 
326
          if ($pkt_id ne $id) {
 
327
            warn "async: mismatched dns id: got $pkt_id, expected $id\n";
 
328
            return;
 
329
          }
 
330
          $self->set_response_packet($pkt_id, $pkt, $ent->{key}, $timestamp);
 
331
          $dns_query_info->{pkt} = $pkt;
 
332
          my $cb_count = 0;
 
333
          foreach my $tuple (@{$dns_query_info->{applicants}}) {
 
334
            my($appl_ent, $appl_cb) = @$tuple;
 
335
            if ($appl_cb) {
 
336
              dbg("async: calling callback on key %s%s", $key,
 
337
                  !defined $appl_ent->{rulename} ? ''
 
338
                    : ", rule ".$appl_ent->{rulename});
 
339
              $cb_count++;
 
340
              eval {
 
341
                $appl_cb->($appl_ent, $pkt); 1;
 
342
              } or do {
 
343
                chomp $@;
 
344
                # resignal if alarm went off
 
345
                die "async: (2) $@\n"  if $@ =~ /__alarm__ignore__\(.*\)/s;
 
346
                warn sprintf("query %s completed, callback %s failed: %s\n",
 
347
                             $id, $appl_ent->{key}, $@);
 
348
              };
 
349
            }
 
350
          }
 
351
          delete $dns_query_info->{applicants};
 
352
          dbg("async: query $id completed, no callbacks run")  if !$cb_count;
 
353
        });
 
354
    }
 
355
    return if !defined $id;
 
356
    $dns_query_info->{id} = $ent->{id} = $id;
 
357
    push(@{$dns_query_info->{applicants}}, [$ent,$cb]);
 
358
    $self->start_lookup($ent, $options{master_deadline});
 
359
  }
 
360
  return $ent;
 
361
}
 
362
 
 
363
# ---------------------------------------------------------------------------
 
364
 
 
365
=item $ent = $async->get_lookup($key)
245
366
 
246
367
Retrieve the pending-lookup object for the given key C<$key>.
247
368
 
248
369
If the lookup is complete, this will return C<undef>.
249
370
 
250
371
Note that a lookup is still considered "pending" until C<complete_lookups()> is
251
 
called, even if it has been reported as complete via C<set_response_packet()>
252
 
or C<report_id_complete()>.
 
372
called, even if it has been reported as complete via C<set_response_packet()>.
253
373
 
254
374
=cut
255
375
 
260
380
 
261
381
# ---------------------------------------------------------------------------
262
382
 
263
 
=item @objs = $async->get_pending_lookups()
264
 
 
265
 
Retrieve the lookup objects for all pending lookups.
266
 
 
267
 
Note that a lookup is still considered "pending" until C<complete_lookups()> is
268
 
called, even if it has been reported as complete via C<set_response_packet()>
269
 
or C<report_id_complete()>.
270
 
 
271
 
=cut
272
 
 
273
 
sub get_pending_lookups {
274
 
  my ($self) = @_;
275
 
  return values %{$self->{pending_lookups}};
276
 
}
277
 
 
278
 
# ---------------------------------------------------------------------------
279
 
 
280
383
=item $async->log_lookups_timing()
281
384
 
282
385
Log sorted timing for all completed lookups.
295
398
 
296
399
=item $alldone = $async->complete_lookups()
297
400
 
298
 
Perform a poll of the pending lookups, to see if any are completed; if they
299
 
are, their <completed_callback> is called with the entry object for that
300
 
lookup.
 
401
Perform a poll of the pending lookups, to see if any are completed.
 
402
Callbacks on completed queries will be called from poll_responses().
301
403
 
302
 
If there are no lookups remaining, or if too long has elapsed since any results
303
 
were returned, C<1> is returned, otherwise C<0>.
 
404
If there are no lookups remaining, or if too much time has elapsed since
 
405
any results were returned, C<1> is returned, otherwise C<0>.
304
406
 
305
407
=cut
306
408
 
363
465
    # A callback routine may generate another DNS query, which may insert
364
466
    # an entry into the %$pending hash thus invalidating the each() context.
365
467
    # So, make sure that callbacks are not called while the each() context
366
 
    # is open, or avoid using each().  [Bug 6937]
 
468
    # is open. [Bug 6937]
367
469
    #
368
 
  # while (my($key,$ent) = each %$pending) {
369
 
    foreach my $key (keys %$pending) {
370
 
      my $ent = $pending->{$key};
 
470
    while (my($key,$ent) = each %$pending) {
371
471
      my $id = $ent->{id};
372
 
      if (defined $ent->{poll_callback}) {  # call a "poll_callback" if exists
373
 
        # be nice, provide fresh info to a callback routine
374
 
        $ent->{status} = 'FINISHED'  if exists $self->{finished}->{$id};
375
 
        # a callback might call set_response_packet() or report_id_complete()
376
 
      # dbg("async: calling poll_callback on key $key");
377
 
        $ent->{poll_callback}->($ent);
378
 
      }
379
 
      my $finished = exists $self->{finished}->{$id};
380
 
      if ($finished) {
 
472
      if (exists $self->{finished}->{$id}) {
 
473
        delete $self->{finished}->{$id};
381
474
        $anydone = 1;
382
 
        delete $self->{finished}->{$id};
383
 
        $ent->{status} = 'FINISHED';
384
475
        $ent->{finish_time} = $now  if !defined $ent->{finish_time};
385
476
        my $elapsed = $ent->{finish_time} - $ent->{start_time};
386
477
        dbg("async: completed in %.3f s: %s", $elapsed, $ent->{display_id});
387
 
 
388
 
        # call a "completed_callback" sub, if one exists
389
 
        if (defined $ent->{completed_callback}) {
390
 
        # dbg("async: calling completed_callback on key $key");
391
 
          $ent->{completed_callback}->($ent);
392
 
        }
393
478
        $self->{timing_by_query}->{". $key"} += $elapsed;
394
479
        $self->{queries_completed}++;
395
480
        $self->{total_queries_completed}++;
434
519
 
435
520
  } or do {
436
521
    my $eval_stat = $@ ne '' ? $@ : "errno=$!";  chomp $eval_stat;
 
522
    # resignal if alarm went off
 
523
    die "async: (3) $eval_stat\n"  if $eval_stat =~ /__alarm__ignore__\(.*\)/s;
437
524
    dbg("async: caught complete_lookups death, aborting: %s", $eval_stat);
438
525
    $alldone = 1;      # abort remaining
439
526
  };
455
542
  my $pending = $self->{pending_lookups};
456
543
  my $foundcnt = 0;
457
544
  my $now = time;
458
 
  foreach my $key (keys %$pending) {
459
 
    my $ent = $pending->{$key};
 
545
 
 
546
  while (my($key,$ent) = each %$pending) {
460
547
    dbg("async: aborting after %.3f s, %s: %s",
461
548
        $now - $ent->{start_time},
462
549
        (defined $ent->{timeout_initial} &&
465
552
        $ent->{display_id} );
466
553
    $foundcnt++;
467
554
    $self->{timing_by_query}->{"X $key"} = $now - $ent->{start_time};
468
 
 
469
 
    if (defined $ent->{completed_callback}) {
470
 
      $ent->{finish_time} = $now  if !defined $ent->{finish_time};
471
 
      $ent->{response_packet} = undef;
472
 
      $ent->{status} = 'ABORTING';
473
 
      $ent->{completed_callback}->($ent);
474
 
    }
 
555
    $ent->{finish_time} = $now  if !defined $ent->{finish_time};
475
556
    delete $pending->{$key};
476
557
  }
 
558
 
 
559
  # call any remaining callbacks, indicating the query has been aborted
 
560
  #
 
561
  my $all_lookups_ref = $self->{all_lookups};
 
562
  foreach my $dnskey (keys %$all_lookups_ref) {
 
563
    my $dns_query_info = $all_lookups_ref->{$dnskey};
 
564
    my $cb_count = 0;
 
565
    foreach my $tuple (@{$dns_query_info->{applicants}}) {
 
566
      my($ent, $cb) = @$tuple;
 
567
      if ($cb) {
 
568
        dbg("async: calling callback/abort on key %s%s", $dnskey,
 
569
            !defined $ent->{rulename} ? '' : ", rule ".$ent->{rulename});
 
570
        $cb_count++;
 
571
        eval {
 
572
          $cb->($ent, undef); 1;
 
573
        } or do {
 
574
          chomp $@;
 
575
          # resignal if alarm went off
 
576
          die "async: (2) $@\n"  if $@ =~ /__alarm__ignore__\(.*\)/s;
 
577
          warn sprintf("query %s aborted, callback %s failed: %s\n",
 
578
                       $dnskey, $ent->{key}, $@);
 
579
        };
 
580
      }
 
581
      dbg("async: query $dnskey aborted, no callbacks run")  if !$cb_count;
 
582
    }
 
583
    delete $dns_query_info->{applicants};
 
584
  }
 
585
 
477
586
  dbg("async: aborted %d remaining lookups", $foundcnt)  if $foundcnt > 0;
478
587
  delete $self->{last_poll_responses_time};
479
588
  $self->{main}->{resolver}->bgabort();
490
599
hash %{$self->{pending_lookups}} where the object which spawned this query can
491
600
be found, and through which futher information about the query is accessible.
492
601
 
493
 
If this was called, C<$pkt> will be available in the C<completed_callback>
494
 
function as C<$ent-<gt>{response_packet}>.
 
602
C<$pkt> may be undef, indicating that no response packet is available, but a
 
603
query has completed (e.g. was aborted or dismissed) and is no longer "pending".
495
604
 
496
 
One or the other of C<set_response_packet()> or C<report_id_complete()>
497
 
should be called, but not both.
 
605
The DNS resolver's response packet C<$pkt> will be made available to a callback
 
606
subroutine through its argument as well as in C<$ent-<gt>{response_packet}>.
498
607
 
499
608
=cut
500
609
 
503
612
  $self->{finished}->{$id} = 1;  # only key existence matters, any value
504
613
  $timestamp = time  if !defined $timestamp;
505
614
  my $pending = $self->{pending_lookups};
506
 
  if (!defined $key) {  # backwards compatibility with 3.2.3 and older plugins
 
615
  if (!defined $key) {  # backward compatibility with 3.2.3 and older plugins
507
616
    # a third-party plugin did not provide $key in a call, search for it:
508
617
    if ($id eq $pending->{$id}->{id}) {  # I feel lucky, key==id ?
509
618
      $key = $id;
518
627
    info("async: no key, response packet not remembered, id $id");
519
628
  } else {
520
629
    my $ent = $pending->{$key};
521
 
    if ($id ne $ent->{id}) {
522
 
      info("async: ignoring response, mismatched id $id, expected $ent->{id}");
 
630
    my $ent_id = $ent->{id};
 
631
    if (!defined $ent_id) {
 
632
      # should not happen, troubleshooting
 
633
      info("async: ignoring response, id %s, ent_id is undef: %s",
 
634
           $id, join(', ', %$ent));
 
635
    } elsif ($id ne $ent_id) {
 
636
      info("async: ignoring response, mismatched id $id, expected $ent_id");
523
637
    } else {
524
638
      $ent->{finish_time} = $timestamp;
525
639
      $ent->{response_packet} = $pkt;
530
644
 
531
645
=item $async->report_id_complete($id,$key,$key,$timestamp)
532
646
 
533
 
Register that a query has completed, and is no longer "pending". C<$id> is the
534
 
ID for the query, and must match the C<id> supplied in C<start_lookup()>.
 
647
Legacy. Equivalent to $self->set_response_packet($id,undef,$key,$timestamp),
 
648
i.e. providing undef as a response packet. Register that a query has
 
649
completed and is no longer "pending". C<$id> is the ID for the query,
 
650
and must match the C<id> supplied in C<start_lookup()>.
535
651
 
536
652
One or the other of C<set_response_packet()> or C<report_id_complete()>
537
653
should be called, but not both.