~ubuntu-branches/ubuntu/karmic/postfix/karmic-security

« back to all changes in this revision

Viewing changes to src/qmgr/qmgr_transport.c

  • Committer: Bazaar Package Importer
  • Author(s): Christoph Berg
  • Date: 2008-09-07 14:02:15 UTC
  • mfrom: (29.1.21 intrepid)
  • Revision ID: james.westby@ubuntu.com-20080907140215-d9zf8bj803ditke5
Tags: 2.5.5-1.1
* Non-maintainer upload.
* Add rsyslog.d config snipped to create a /dev/log syslog socket in the
  postfix chroot.  Also, add a note about other syslog daemons to
  README.Debian.  Closes: #311812

Show diffs side-by-side

added added

removed removed

Lines of Context:
65
65
/*      P.O. Box 704
66
66
/*      Yorktown Heights, NY 10598, USA
67
67
/*
68
 
/*      Scheduler enhancements:
 
68
/*      Preemptive scheduler enhancements:
69
69
/*      Patrik Rak
70
70
/*      Modra 6
71
71
/*      155 00, Prague, Czech Republic
76
76
#include <sys_defs.h>
77
77
#include <unistd.h>
78
78
 
 
79
#include <sys/time.h>                   /* FD_SETSIZE */
 
80
#include <sys/types.h>                  /* FD_SETSIZE */
 
81
#include <unistd.h>                     /* FD_SETSIZE */
 
82
 
 
83
#ifdef USE_SYS_SELECT_H
 
84
#include <sys/select.h>                 /* FD_SETSIZE */
 
85
#endif
 
86
 
79
87
/* Utility library. */
80
88
 
81
89
#include <msg.h>
110
118
    QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
111
119
};
112
120
 
 
121
 /*
 
122
  * Connections to delivery agents are managed asynchronously. Each delivery
 
123
  * agent connection goes through multiple wait states:
 
124
  * 
 
125
  * - With Linux/Solaris and old queue manager implementations only, wait for
 
126
  * the server to invoke accept().
 
127
  * 
 
128
  * - Wait for the delivery agent's announcement that it is ready to receive a
 
129
  * delivery request.
 
130
  * 
 
131
  * - Wait for the delivery request completion status.
 
132
  * 
 
133
  * Older queue manager implementations had only one pending delivery agent
 
134
  * connection per transport. With low-latency destinations, the output rates
 
135
  * were reduced on Linux/Solaris systems that had the extra wait state.
 
136
  * 
 
137
  * To maximize delivery agent output rates with low-latency destinations, the
 
138
  * following changes were made to the queue manager by the end of the 2.4
 
139
  * development cycle:
 
140
  * 
 
141
  * - The Linux/Solaris accept() wait state was eliminated.
 
142
  * 
 
143
  * - A pipeline was implemented for pending delivery agent connections. The
 
144
  * number of pending delivery agent connections was increased from one to
 
145
  * two: the number of before-delivery wait states, plus one extra pipeline
 
146
  * slot to prevent the pipeline from stalling easily. Increasing the
 
147
  * pipeline much further actually hurt performance.
 
148
  * 
 
149
  * - To reduce queue manager disk competition with delivery agents, the queue
 
150
  * scanning algorithm was modified to import only one message per interrupt.
 
151
  * The incoming and deferred queue scans now happen on alternate interrupts.
 
152
  * 
 
153
  * Simplistically reasoned, a non-zero (incoming + active) queue length is
 
154
  * equivalent to a time shift for mail deliveries; this is undesirable when
 
155
  * delivery agents are not fully utilized.
 
156
  * 
 
157
  * On the other hand a non-empty active queue is what allows us to do clever
 
158
  * things such as queue file prefetch, concurrency windows, and connection
 
159
  * caching; the idea is that such "thinking time" is affordable only after
 
160
  * the output channels are maxed out.
 
161
  */
 
162
#ifndef QMGR_TRANSPORT_MAX_PEND
 
163
#define QMGR_TRANSPORT_MAX_PEND 2
 
164
#endif
 
165
 
113
166
/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
114
167
 
115
168
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
196
249
    event_cancel_timer(qmgr_transport_abort, context);
197
250
 
198
251
    /*
199
 
     * Disable further read events that end up calling this function.
 
252
     * Disable further read events that end up calling this function, and
 
253
     * free up this pending connection pipeline slot.
200
254
     */
201
 
    event_disable_readwrite(vstream_fileno(alloc->stream));
202
 
    alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
 
255
    if (alloc->stream) {
 
256
        event_disable_readwrite(vstream_fileno(alloc->stream));
 
257
        non_blocking(vstream_fileno(alloc->stream), BLOCKING);
 
258
    }
 
259
    alloc->transport->pending -= 1;
203
260
 
204
261
    /*
205
262
     * Notify the requestor.
208
265
    myfree((char *) alloc);
209
266
}
210
267
 
211
 
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
212
 
 
213
 
/* qmgr_transport_connect - handle connection request completion */
214
 
 
215
 
static void qmgr_transport_connect(int unused_event, char *context)
216
 
{
217
 
    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
218
 
 
219
 
    /*
220
 
     * This code is necessary for some versions of LINUX, where connect(2)
221
 
     * blocks until the application performs an accept(2). Reportedly, the
222
 
     * same can happen on Solaris 2.5.1.
223
 
     */
224
 
    event_disable_readwrite(vstream_fileno(alloc->stream));
225
 
    non_blocking(vstream_fileno(alloc->stream), BLOCKING);
226
 
    event_enable_read(vstream_fileno(alloc->stream),
227
 
                      qmgr_transport_event, (char *) alloc);
228
 
}
229
 
 
230
 
#endif
231
 
 
232
268
/* qmgr_transport_select - select transport for allocation */
233
269
 
234
270
QMGR_TRANSPORT *qmgr_transport_select(void)
235
271
{
236
272
    QMGR_TRANSPORT *xport;
237
273
    QMGR_QUEUE *queue;
 
274
    int     need;
238
275
 
239
276
    /*
240
277
     * If we find a suitable transport, rotate the list of transports to
241
278
     * effectuate round-robin selection. See similar selection code in
242
279
     * qmgr_peer_select().
 
280
     * 
 
281
     * This function is called repeatedly until all transports have maxed out
 
282
     * the number of pending delivery agent connections, until all delivery
 
283
     * agent concurrency windows are maxed out, or until we run out of "todo"
 
284
     * queue entries.
243
285
     */
244
 
#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
 
286
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
245
287
 
246
288
    for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
247
 
        if (xport->flags & STAY_AWAY)
 
289
        if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
 
290
            || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
248
291
            continue;
 
292
        need = xport->pending + 1;
249
293
        for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
250
 
            if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
 
294
            if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
 
295
                                          queue->todo_refcount)) <= 0) {
251
296
                QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
252
297
                if (msg_verbose)
253
298
                    msg_info("qmgr_transport_select: %s", xport->name);
263
308
void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
264
309
{
265
310
    QMGR_TRANSPORT_ALLOC *alloc;
266
 
    VSTREAM *stream;
267
 
    DSN     dsn;
268
311
 
269
312
    /*
270
313
     * Sanity checks.
271
314
     */
272
315
    if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
273
316
        msg_panic("qmgr_transport: dead transport: %s", transport->name);
274
 
    if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
275
 
        msg_panic("qmgr_transport: nested allocation: %s", transport->name);
 
317
    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
 
318
        msg_panic("qmgr_transport: excess allocation: %s", transport->name);
276
319
 
277
320
    /*
278
321
     * Connect to the well-known port for this delivery service, and wake up
279
 
     * when a process announces its availability. In the mean time, block out
280
 
     * other delivery process allocation attempts for this transport. In case
281
 
     * of problems, back off. Do not hose the system when it is in trouble
 
322
     * when a process announces its availability. Allow only a limited number
 
323
     * of delivery process allocation attempts for this transport. In case of
 
324
     * problems, back off. Do not hose the system when it is in trouble
282
325
     * already.
 
326
     * 
 
327
     * Use non-blocking connect(), so that Linux won't block the queue manager
 
328
     * until the delivery agent calls accept().
 
329
     * 
 
330
     * When the connection to delivery agent cannot be completed, notify the
 
331
     * event handler so that it can throttle the transport and defer the todo
 
332
     * queues, just like it does when communication fails *after* connection
 
333
     * completion.
 
334
     * 
 
335
     * Before Postfix 2.4, the event handler was not invoked after connect()
 
336
     * error, and mail was not deferred. Because of this, mail would be stuck
 
337
     * in the active queue after triggering a "connection refused" condition.
283
338
     */
284
 
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
285
 
#define BLOCK_MODE      NON_BLOCKING
286
 
#define ENABLE_EVENTS   event_enable_write
287
 
#define EVENT_HANDLER   qmgr_transport_connect
288
 
#else
289
 
#define BLOCK_MODE      BLOCKING
290
 
#define ENABLE_EVENTS   event_enable_read
291
 
#define EVENT_HANDLER   qmgr_transport_event
292
 
#endif
293
 
 
294
 
    if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
295
 
        msg_warn("connect to transport %s: %m", transport->name);
296
 
        qmgr_transport_throttle(transport,
297
 
                                DSN_SIMPLE(&dsn, "4.3.0",
298
 
                                           "mail transport unavailable"));
299
 
        return;
300
 
    }
301
339
    alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
302
 
    alloc->stream = stream;
303
340
    alloc->transport = transport;
304
341
    alloc->notify = notify;
305
 
    transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
306
 
    ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
 
342
    transport->pending += 1;
 
343
    if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
 
344
                                      NON_BLOCKING)) == 0) {
 
345
        msg_warn("connect to transport %s: %m", transport->name);
 
346
        event_request_timer(qmgr_transport_event, (char *) alloc, 0);
 
347
        return;
 
348
    }
 
349
#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
 
350
#ifndef THRESHOLD_FD_WORKAROUND
 
351
#define THRESHOLD_FD_WORKAROUND 128
 
352
#endif
 
353
    vstream_control(alloc->stream,
 
354
                    VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND,
 
355
                    VSTREAM_CTL_END);
 
356
#endif
 
357
    event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
 
358
                      (char *) alloc);
307
359
 
308
360
    /*
309
361
     * Guard against broken systems.
322
374
        msg_panic("qmgr_transport_create: transport exists: %s", name);
323
375
    transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
324
376
    transport->flags = 0;
 
377
    transport->pending = 0;
325
378
    transport->name = mystrdup(name);
326
379
 
327
380
    /*
333
386
    transport->recipient_limit =
334
387
        get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
335
388
                           var_dest_rcpt_limit, 0, 0);
 
389
    transport->init_dest_concurrency =
 
390
        get_mail_conf_int2(name, _INIT_DEST_CON,
 
391
                           var_init_dest_concurrency, 1, 0);
 
392
    transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
 
393
                                                var_dest_rate_delay, 
 
394
                                                's', 0, 0);
336
395
 
337
 
    if (transport->dest_concurrency_limit == 0
338
 
        || transport->dest_concurrency_limit >= var_init_dest_concurrency)
339
 
        transport->init_dest_concurrency = var_init_dest_concurrency;
340
 
    else
 
396
    if (transport->rate_delay > 0)
 
397
        transport->dest_concurrency_limit = 1;
 
398
    if (transport->dest_concurrency_limit != 0
 
399
    && transport->dest_concurrency_limit < transport->init_dest_concurrency)
341
400
        transport->init_dest_concurrency = transport->dest_concurrency_limit;
342
401
 
343
402
    transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST,
353
412
                                                var_xport_rcpt_limit, 0, 0);
354
413
    transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
355
414
                                                var_stack_rcpt_limit, 0, 0);
 
415
    transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
 
416
                                              var_xport_refill_limit, 1, 0);
 
417
    transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
 
418
                                         var_xport_refill_delay, 's', 1, 0);
356
419
 
357
420
    transport->queue_byname = htable_create(0);
358
421
    QMGR_LIST_INIT(transport->queue_list);
366
429
    transport->candidate_cache_time = (time_t) 0;
367
430
    transport->blocker_tag = 1;
368
431
    transport->dsn = 0;
 
432
    qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
 
433
                       VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
 
434
    qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
 
435
                       VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
 
436
    transport->fail_cohort_limit =
 
437
        get_mail_conf_int2(name, _CONC_COHORT_LIM,
 
438
                           var_conc_cohort_limit, 0, 0);
369
439
    if (qmgr_transport_byname == 0)
370
440
        qmgr_transport_byname = htable_create(10);
371
441
    htable_enter(qmgr_transport_byname, name, (char *) transport);