~ubuntu-branches/ubuntu/vivid/postfix/vivid-proposed

« back to all changes in this revision

Viewing changes to src/qmgr/qmgr_active.c

  • Committer: Bazaar Package Importer
  • Author(s): LaMont Jones
  • Date: 2005-02-27 09:33:07 UTC
  • Revision ID: james.westby@ubuntu.com-20050227093307-cn789t27ibnlh6tf
Tags: upstream-2.1.5
ImportĀ upstreamĀ versionĀ 2.1.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*++
 
2
/* NAME
 
3
/*      qmgr_active 3
 
4
/* SUMMARY
 
5
/*      active queue management
 
6
/* SYNOPSIS
 
7
/*      #include "qmgr.h"
 
8
/*
 
9
/*      void    qmgr_active_feed(scan_info, queue_id)
 
10
/*      QMGR_SCAN *scan_info;
 
11
/*      const char *queue_id;
 
12
/*
 
13
/*      void    qmgr_active_drain()
 
14
/*
 
15
/*      int     qmgr_active_done(message)
 
16
/*      QMGR_MESSAGE *message;
 
17
/* DESCRIPTION
 
18
/*      These functions maintain the active message queue: the set
 
19
/*      of messages that the queue manager is actually working on.
 
20
/*      The active queue is limited in size. Messages are drained
 
21
/*      from the active queue by allocating a delivery process and
 
22
/*      by delivering mail via that process.  Messages leak into the
 
23
/*      active queue only when the active queue is small enough.
 
24
/*      Damaged message files are saved to the "corrupt" directory.
 
25
/*
 
26
/*      qmgr_active_feed() inserts the named message file into
 
27
/*      the active queue. Message files with the wrong name or
 
28
/*      with other wrong properties are skipped but not removed.
 
29
/*      The following queue flags are recognized, other flags being
 
30
/*      ignored:
 
31
/* .IP QMGR_SCAN_ALL
 
32
/*      Examine all queue files. Normally, deferred queue files with
 
33
/*      future time stamps are ignored, and incoming queue files with
 
34
/*      future time stamps are frowned upon.
 
35
/* .PP
 
36
/*      qmgr_active_drain() allocates one delivery process.
 
37
/*      Process allocation is asynchronous. Once the delivery
 
38
/*      process is available, an attempt is made to deliver
 
39
/*      a message via it. Message delivery is asynchronous, too.
 
40
/*
 
41
/*      qmgr_active_done() deals with a message after delivery
 
42
/*      has been tried for all in-core recipients. If the message
 
43
/*      was bounced, a bounce message is sent to the sender, or
 
44
/*      to the Errors-To: address if one was specified.
 
45
/*      If there are more on-file recipients, a new batch of
 
46
/*      in-core recipients is read from the queue file. Otherwise,
 
47
/*      if a delivery agent marked the queue file as corrupt,
 
48
/*      the queue file is moved to the "corrupt" queue (surprise);
 
49
/*      if at least one delivery failed, the message is moved
 
50
/*      to the deferred queue. The time stamps of a deferred queue
 
51
/*      file are set to the nearest wakeup time of its recipient
 
52
/*      sites (if delivery failed due to a problem with a next-hop
 
53
/*      host), are set into the future by the amount of time the
 
54
/*      message was queued (per-message exponential backoff), or are set
 
55
/*      into the future by a minimal backoff time, whichever is more.
 
56
/*      The minimal_backoff_time parameter specifies the minimal
 
57
/*      amount of time between delivery attempts; maximal_backoff_time
 
58
/*      specifies an upper limit.
 
59
/* DIAGNOSTICS
 
60
/*      Fatal: queue file access failures, out of memory.
 
61
/*      Panic: interface violations, internal consistency errors.
 
62
/*      Warnings: corrupt message file. A corrupt message is saved
 
63
/*      to the "corrupt" queue for further inspection.
 
64
/* LICENSE
 
65
/* .ad
 
66
/* .fi
 
67
/*      The Secure Mailer license must be distributed with this software.
 
68
/* AUTHOR(S)
 
69
/*      Wietse Venema
 
70
/*      IBM T.J. Watson Research
 
71
/*      P.O. Box 704
 
72
/*      Yorktown Heights, NY 10598, USA
 
73
/*--*/
 
74
 
 
75
/* System library. */
 
76
 
 
77
#include <sys_defs.h>
 
78
#include <sys/stat.h>
 
79
#include <dirent.h>
 
80
#include <stdlib.h>
 
81
#include <unistd.h>
 
82
#include <string.h>
 
83
#include <utime.h>
 
84
#include <errno.h>
 
85
 
 
86
#ifndef S_IRWXU                         /* What? no POSIX system? */
 
87
#define S_IRWXU 0700
 
88
#endif
 
89
 
 
90
/* Utility library. */
 
91
 
 
92
#include <msg.h>
 
93
#include <events.h>
 
94
#include <mymalloc.h>
 
95
#include <vstream.h>
 
96
 
 
97
/* Global library. */
 
98
 
 
99
#include <mail_params.h>
 
100
#include <mail_open_ok.h>
 
101
#include <mail_queue.h>
 
102
#include <recipient_list.h>
 
103
#include <bounce.h>
 
104
#include <defer.h>
 
105
#include <trace.h>
 
106
#include <abounce.h>
 
107
#include <rec_type.h>
 
108
 
 
109
/* Application-specific. */
 
110
 
 
111
#include "qmgr.h"
 
112
 
 
113
 /*
 
114
  * A bunch of call-back routines.
 
115
  */
 
116
static void qmgr_active_done_2_bounce_flush(int, char *);
 
117
static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
 
118
static void qmgr_active_done_3_defer_flush(int, char *);
 
119
static void qmgr_active_done_3_defer_warn(int, char *);
 
120
static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
 
121
 
 
122
/* qmgr_active_corrupt - move corrupted file out of the way */
 
123
 
 
124
static void qmgr_active_corrupt(const char *queue_id)
 
125
{
 
126
    char   *myname = "qmgr_active_corrupt";
 
127
 
 
128
    if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
 
129
        if (errno != ENOENT)
 
130
            msg_fatal("%s: save corrupt file queue %s id %s: %m",
 
131
                      myname, MAIL_QUEUE_ACTIVE, queue_id);
 
132
        msg_warn("%s: save corrupt file queue %s id %s: %m",
 
133
                 myname, MAIL_QUEUE_ACTIVE, queue_id);
 
134
    } else {
 
135
        msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"", 
 
136
                queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
 
137
    }
 
138
}
 
139
 
 
140
/* qmgr_active_defer - defer queue file */
 
141
 
 
142
static void qmgr_active_defer(const char *queue_name, const char *queue_id,
 
143
                                      const char *dest_queue, int delay)
 
144
{
 
145
    char   *myname = "qmgr_active_defer";
 
146
    const char *path;
 
147
    struct utimbuf tbuf;
 
148
 
 
149
    if (msg_verbose)
 
150
        msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
 
151
 
 
152
    tbuf.actime = tbuf.modtime = event_time() + delay;
 
153
    path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
 
154
    if (utime(path, &tbuf) < 0 && errno != ENOENT)
 
155
        msg_fatal("%s: update %s time stamps: %m", myname, path);
 
156
    if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
 
157
        if (errno != ENOENT)
 
158
            msg_fatal("%s: rename %s from %s to %s: %m", myname,
 
159
                      queue_id, queue_name, dest_queue);
 
160
        msg_warn("%s: rename %s from %s to %s: %m", myname,
 
161
                 queue_id, queue_name, dest_queue);
 
162
    } else if (msg_verbose) {
 
163
        msg_info("%s: defer %s", myname, queue_id);
 
164
    }
 
165
}
 
166
 
 
167
/* qmgr_active_feed - feed one message into active queue */
 
168
 
 
169
int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
 
170
{
 
171
    char   *myname = "qmgr_active_feed";
 
172
    QMGR_MESSAGE *message;
 
173
    struct stat st;
 
174
    const char *path;
 
175
 
 
176
    if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
 
177
        msg_panic("%s: bad queue %s", myname, scan_info->queue);
 
178
    if (msg_verbose)
 
179
        msg_info("%s: queue %s", myname, scan_info->queue);
 
180
 
 
181
    /*
 
182
     * Make sure this is something we are willing to open.
 
183
     */
 
184
    if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
 
185
        return (0);
 
186
 
 
187
    if (msg_verbose)
 
188
        msg_info("%s: %s", myname, path);
 
189
 
 
190
    /*
 
191
     * Skip files that have time stamps into the future. They need to cool
 
192
     * down. Incoming and deferred files can have future time stamps.
 
193
     */
 
194
    if ((scan_info->flags & QMGR_SCAN_ALL) == 0
 
195
        && st.st_mtime > time((time_t *) 0) + 1) {
 
196
        if (msg_verbose)
 
197
            msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
 
198
                     (long) (st.st_mtime - event_time()));
 
199
        return (0);
 
200
    }
 
201
 
 
202
    /*
 
203
     * Move the message to the active queue. File access errors are fatal.
 
204
     */
 
205
    if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
 
206
        if (errno != ENOENT)
 
207
            msg_fatal("%s: %s: rename from %s to %s: %m", myname,
 
208
                      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
 
209
        msg_warn("%s: %s: rename from %s to %s: %m", myname,
 
210
                 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
 
211
        return (0);
 
212
    }
 
213
 
 
214
    /*
 
215
     * Extract envelope information: sender and recipients. At this point,
 
216
     * mail addresses have been processed by the cleanup service so they
 
217
     * should be in canonical form. Generate requests to deliver this
 
218
     * message.
 
219
     * 
 
220
     * Throwing away queue files seems bad, especially when they made it this
 
221
     * far into the mail system. Therefore we save bad files to a separate
 
222
     * directory for further inspection.
 
223
     * 
 
224
     * After queue manager restart it is possible that a queue file is still
 
225
     * being delivered. In that case (the file is locked), defer delivery by
 
226
     * a minimal amount of time.
 
227
     */
 
228
    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
 
229
                                      scan_info->flags)) == 0) {
 
230
        qmgr_active_corrupt(queue_id);
 
231
        return (0);
 
232
    } else if (message == QMGR_MESSAGE_LOCKED) {
 
233
        qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
 
234
        return (0);
 
235
    } else {
 
236
 
 
237
        /*
 
238
         * Special case if all recipients were already delivered. Send any
 
239
         * bounces and clean up.
 
240
         */
 
241
        if (message->refcount == 0)
 
242
            qmgr_active_done(message);
 
243
        return (1);
 
244
    }
 
245
}
 
246
 
 
247
/* qmgr_active_done - dispose of message after recipients have been tried */
 
248
 
 
249
void    qmgr_active_done(QMGR_MESSAGE *message)
 
250
{
 
251
    char   *myname = "qmgr_active_done";
 
252
    struct stat st;
 
253
 
 
254
    if (msg_verbose)
 
255
        msg_info("%s: %s", myname, message->queue_id);
 
256
 
 
257
    /*
 
258
     * During a previous iteration, an attempt to bounce this message may
 
259
     * have failed, so there may still be a bounce log lying around. XXX By
 
260
     * groping around in the bounce queue, we're trespassing on the bounce
 
261
     * service's territory. But doing so is more robust than depending on the
 
262
     * bounce daemon to do the lookup for us, and for us to do the deleting
 
263
     * after we have received a successful status from the bounce service.
 
264
     * The bounce queue directory blocks are most likely in memory anyway. If
 
265
     * these lookups become a performance problem we will have to build an
 
266
     * in-core cache into the bounce daemon.
 
267
     * 
 
268
     * Don't bounce when the bounce log is empty. The bounce process obviously
 
269
     * failed, and the delivery agent will have requested that the message be
 
270
     * deferred.
 
271
     * 
 
272
     * Bounces are sent asynchronously to avoid stalling while the cleanup
 
273
     * daemon waits for the qmgr to accept the "new mail" trigger.
 
274
     */
 
275
    if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
 
276
        if (st.st_size == 0) {
 
277
            if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
 
278
                msg_fatal("remove %s %s: %m",
 
279
                          MAIL_QUEUE_BOUNCE, message->queue_id);
 
280
        } else {
 
281
            if (msg_verbose)
 
282
                msg_info("%s: bounce %s", myname, message->queue_id);
 
283
            if (message->verp_delims == 0 || var_verp_bounce_off)
 
284
                abounce_flush(BOUNCE_FLAG_KEEP,
 
285
                              message->queue_name,
 
286
                              message->queue_id,
 
287
                              message->encoding,
 
288
                              message->errors_to,
 
289
                              qmgr_active_done_2_bounce_flush,
 
290
                              (char *) message);
 
291
            else
 
292
                abounce_flush_verp(BOUNCE_FLAG_KEEP,
 
293
                                   message->queue_name,
 
294
                                   message->queue_id,
 
295
                                   message->encoding,
 
296
                                   message->errors_to,
 
297
                                   message->verp_delims,
 
298
                                   qmgr_active_done_2_bounce_flush,
 
299
                                   (char *) message);
 
300
            return;
 
301
        }
 
302
    }
 
303
 
 
304
    /*
 
305
     * Asynchronous processing does not reach this point.
 
306
     */
 
307
    qmgr_active_done_2_generic(message);
 
308
}
 
309
 
 
310
/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
 
311
 
 
312
static void qmgr_active_done_2_bounce_flush(int status, char *context)
 
313
{
 
314
    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
 
315
 
 
316
    /*
 
317
     * Process abounce_flush() status and continue processing.
 
318
     */
 
319
    message->flags |= status;
 
320
    qmgr_active_done_2_generic(message);
 
321
}
 
322
 
 
323
/* qmgr_active_done_2_generic - continue processing */
 
324
 
 
325
static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
 
326
{
 
327
    char   *myname = "qmgr_active_done_2_generic";
 
328
    const char *path;
 
329
    struct stat st;
 
330
    int     status;
 
331
 
 
332
    /*
 
333
     * A delivery agent marks a queue file as corrupt by changing its
 
334
     * attributes, and by pretending that delivery was deferred.
 
335
     */
 
336
    if (message->flags
 
337
        && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
 
338
        qmgr_active_corrupt(message->queue_id);
 
339
        qmgr_message_free(message);
 
340
        return;
 
341
    }
 
342
 
 
343
    /*
 
344
     * If we did not read all recipients from this file, go read some more,
 
345
     * but remember whether some recipients have to be tried again.
 
346
     * 
 
347
     * Throwing away queue files seems bad, especially when they made it this
 
348
     * far into the mail system. Therefore we save bad files to a separate
 
349
     * directory for further inspection by a human being.
 
350
     */
 
351
    if (message->rcpt_offset > 0) {
 
352
        if (qmgr_message_realloc(message) == 0) {
 
353
            qmgr_active_corrupt(message->queue_id);
 
354
            qmgr_message_free(message);
 
355
        } else {
 
356
            if (message->refcount == 0)
 
357
                qmgr_active_done(message);      /* recurse for consistency */
 
358
        }
 
359
        return;
 
360
    }
 
361
 
 
362
    /*
 
363
     * As a temporary implementation, synchronously inform the sender of
 
364
     * trace information. This will block for 10 seconds when the qmgr FIFO
 
365
     * is full.
 
366
     */
 
367
    if (message->tflags & (DEL_REQ_FLAG_EXPAND | DEL_REQ_FLAG_RECORD)) {
 
368
        status = trace_flush(message->tflags,
 
369
                             message->queue_name,
 
370
                             message->queue_id,
 
371
                             message->encoding,
 
372
                             message->sender);
 
373
        if (status == 0 && message->tflags_offset)
 
374
            qmgr_message_kill_record(message, message->tflags_offset);
 
375
        message->flags |= status;
 
376
    }
 
377
 
 
378
    /*
 
379
     * If we get to this point we have tried all recipients for this message.
 
380
     * If the message is too old, try to bounce it.
 
381
     * 
 
382
     * Bounces are sent asynchronously to avoid stalling while the cleanup
 
383
     * daemon waits for the qmgr to accept the "new mail" trigger.
 
384
     */
 
385
    if (message->flags) {
 
386
        if (event_time() >= message->arrival_time +
 
387
            (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
 
388
            msg_info("%s: from=<%s>, status=expired, returned to sender",
 
389
                     message->queue_id, message->sender);
 
390
            if (message->verp_delims == 0 || var_verp_bounce_off)
 
391
                adefer_flush(BOUNCE_FLAG_KEEP,
 
392
                             message->queue_name,
 
393
                             message->queue_id,
 
394
                             message->encoding,
 
395
                             message->errors_to,
 
396
                             qmgr_active_done_3_defer_flush,
 
397
                             (char *) message);
 
398
            else
 
399
                adefer_flush_verp(BOUNCE_FLAG_KEEP,
 
400
                                  message->queue_name,
 
401
                                  message->queue_id,
 
402
                                  message->encoding,
 
403
                                  message->errors_to,
 
404
                                  message->verp_delims,
 
405
                                  qmgr_active_done_3_defer_flush,
 
406
                                  (char *) message);
 
407
            return;
 
408
        } else if (message->warn_time > 0
 
409
                   && event_time() > message->warn_time) {
 
410
            if (msg_verbose)
 
411
                msg_info("%s: sending defer warning for %s", myname, message->queue_id);
 
412
            adefer_warn(BOUNCE_FLAG_KEEP,
 
413
                        message->queue_name,
 
414
                        message->queue_id,
 
415
                        message->encoding,
 
416
                        message->errors_to,
 
417
                        qmgr_active_done_3_defer_warn,
 
418
                        (char *) message);
 
419
            return;
 
420
        }
 
421
    }
 
422
 
 
423
    /*
 
424
     * Asynchronous processing does not reach this point.
 
425
     */
 
426
    qmgr_active_done_3_generic(message);
 
427
}
 
428
 
 
429
/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
 
430
 
 
431
static void qmgr_active_done_3_defer_warn(int status, char *context)
 
432
{
 
433
    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
 
434
 
 
435
    /*
 
436
     * Process adefer_warn() completion status and continue processing.
 
437
     */
 
438
    if (status == 0)
 
439
        qmgr_message_update_warn(message);
 
440
    qmgr_active_done_3_generic(message);
 
441
}
 
442
 
 
443
/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
 
444
 
 
445
static void qmgr_active_done_3_defer_flush(int status, char *context)
 
446
{
 
447
    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
 
448
 
 
449
    /*
 
450
     * Process adefer_flush() status and continue processing.
 
451
     */
 
452
    message->flags = status;
 
453
    qmgr_active_done_3_generic(message);
 
454
}
 
455
 
 
456
/* qmgr_active_done_3_generic - continue processing */
 
457
 
 
458
static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
 
459
{
 
460
    char   *myname = "qmgr_active_done_3_generic";
 
461
    int     delay;
 
462
 
 
463
    /*
 
464
     * Some recipients need to be tried again. Move the queue file time
 
465
     * stamps into the future by the amount of time that the message is
 
466
     * delayed, and move the message to the deferred queue. Impose minimal
 
467
     * and maximal backoff times.
 
468
     * 
 
469
     * Since we look at actual time in queue, not time since last delivery
 
470
     * attempt, backoff times will be distributed. However, we can still see
 
471
     * spikes in delivery activity because the interval between deferred
 
472
     * queue scans is finite.
 
473
     */
 
474
    if (message->flags) {
 
475
        if (message->arrival_time > 0) {
 
476
            delay = event_time() - message->arrival_time;
 
477
            if (delay > var_max_backoff_time)
 
478
                delay = var_max_backoff_time;
 
479
            if (delay < var_min_backoff_time)
 
480
                delay = var_min_backoff_time;
 
481
        } else {
 
482
            delay = var_min_backoff_time;
 
483
        }
 
484
        qmgr_active_defer(message->queue_name, message->queue_id,
 
485
                          MAIL_QUEUE_DEFERRED, delay);
 
486
    }
 
487
 
 
488
    /*
 
489
     * All recipients done. Remove the queue file.
 
490
     */
 
491
    else {
 
492
        if (mail_queue_remove(message->queue_name, message->queue_id)) {
 
493
            if (errno != ENOENT)
 
494
                msg_fatal("%s: remove %s from %s: %m", myname,
 
495
                          message->queue_id, message->queue_name);
 
496
            msg_warn("%s: remove %s from %s: %m", myname,
 
497
                     message->queue_id, message->queue_name);
 
498
        } else {
 
499
            /* Same format as logged by postsuper. */
 
500
            msg_info("%s: removed", message->queue_id);
 
501
        }
 
502
    }
 
503
 
 
504
    /*
 
505
     * Finally, delete the in-core message structure.
 
506
     */
 
507
    qmgr_message_free(message);
 
508
}
 
509
 
 
510
/* qmgr_active_drain - drain active queue by allocating a delivery process */
 
511
 
 
512
void    qmgr_active_drain(void)
 
513
{
 
514
    QMGR_TRANSPORT *transport;
 
515
 
 
516
    /*
 
517
     * Allocate one delivery process for every transport with pending mail.
 
518
     * The process allocation completes asynchronously.
 
519
     */
 
520
    while ((transport = qmgr_transport_select()) != 0) {
 
521
        if (msg_verbose)
 
522
            msg_info("qmgr_active_drain: allocate %s", transport->name);
 
523
        qmgr_transport_alloc(transport, qmgr_deliver);
 
524
    }
 
525
}