110
118
QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
122
* Connections to delivery agents are managed asynchronously. Each delivery
123
* agent connection goes through multiple wait states:
125
* - With Linux/Solaris and old queue manager implementations only, wait for
126
* the server to invoke accept().
128
* - Wait for the delivery agent's announcement that it is ready to receive a
131
* - Wait for the delivery request completion status.
133
* Older queue manager implementations had only one pending delivery agent
134
* connection per transport. With low-latency destinations, the output rates
135
* were reduced on Linux/Solaris systems that had the extra wait state.
137
* To maximize delivery agent output rates with low-latency destinations, the
138
* following changes were made to the queue manager by the end of the 2.4
141
* - The Linux/Solaris accept() wait state was eliminated.
143
* - A pipeline was implemented for pending delivery agent connections. The
144
* number of pending delivery agent connections was increased from one to
145
* two: the number of before-delivery wait states, plus one extra pipeline
146
* slot to prevent the pipeline from stalling easily. Increasing the
147
* pipeline much further actually hurt performance.
149
* - To reduce queue manager disk competition with delivery agents, the queue
150
* scanning algorithm was modified to import only one message per interrupt.
151
* The incoming and deferred queue scans now happen on alternate interrupts.
153
* Simplistically reasoned, a non-zero (incoming + active) queue length is
154
* equivalent to a time shift for mail deliveries; this is undesirable when
155
* delivery agents are not fully utilized.
157
* On the other hand a non-empty active queue is what allows us to do clever
158
* things such as queue file prefetch, concurrency windows, and connection
159
* caching; the idea is that such "thinking time" is affordable only after
160
* the output channels are maxed out.
162
#ifndef QMGR_TRANSPORT_MAX_PEND
163
#define QMGR_TRANSPORT_MAX_PEND 2
113
166
/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
115
168
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
196
249
event_cancel_timer(qmgr_transport_abort, context);
199
* Disable further read events that end up calling this function.
252
* Disable further read events that end up calling this function, and
253
* free up this pending connection pipeline slot.
201
event_disable_readwrite(vstream_fileno(alloc->stream));
202
alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
256
event_disable_readwrite(vstream_fileno(alloc->stream));
257
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
259
alloc->transport->pending -= 1;
205
262
* Notify the requestor.
208
265
myfree((char *) alloc);
211
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
213
/* qmgr_transport_connect - handle connection request completion */
215
static void qmgr_transport_connect(int unused_event, char *context)
217
QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
220
* This code is necessary for some versions of LINUX, where connect(2)
221
* blocks until the application performs an accept(2). Reportedly, the
222
* same can happen on Solaris 2.5.1.
224
event_disable_readwrite(vstream_fileno(alloc->stream));
225
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
226
event_enable_read(vstream_fileno(alloc->stream),
227
qmgr_transport_event, (char *) alloc);
232
268
/* qmgr_transport_select - select transport for allocation */
234
270
QMGR_TRANSPORT *qmgr_transport_select(void)
236
272
QMGR_TRANSPORT *xport;
237
273
QMGR_QUEUE *queue;
240
277
* If we find a suitable transport, rotate the list of transports to
241
278
* effectuate round-robin selection. See similar selection code in
242
279
* qmgr_peer_select().
281
* This function is called repeatedly until all transports have maxed out
282
* the number of pending delivery agent connections, until all delivery
283
* agent concurrency windows are maxed out, or until we run out of "todo"
244
#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
286
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
246
288
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
247
if (xport->flags & STAY_AWAY)
289
if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
290
|| xport->pending >= QMGR_TRANSPORT_MAX_PEND)
292
need = xport->pending + 1;
249
293
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
250
if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
294
if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
295
queue->todo_refcount)) <= 0) {
251
296
QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
253
298
msg_info("qmgr_transport_select: %s", xport->name);
263
308
void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
265
310
QMGR_TRANSPORT_ALLOC *alloc;
272
315
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
273
316
msg_panic("qmgr_transport: dead transport: %s", transport->name);
274
if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
275
msg_panic("qmgr_transport: nested allocation: %s", transport->name);
317
if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
318
msg_panic("qmgr_transport: excess allocation: %s", transport->name);
278
321
* Connect to the well-known port for this delivery service, and wake up
279
* when a process announces its availability. In the mean time, block out
280
* other delivery process allocation attempts for this transport. In case
281
* of problems, back off. Do not hose the system when it is in trouble
322
* when a process announces its availability. Allow only a limited number
323
* of delivery process allocation attempts for this transport. In case of
324
* problems, back off. Do not hose the system when it is in trouble
327
* Use non-blocking connect(), so that Linux won't block the queue manager
328
* until the delivery agent calls accept().
330
* When the connection to delivery agent cannot be completed, notify the
331
* event handler so that it can throttle the transport and defer the todo
332
* queues, just like it does when communication fails *after* connection
335
* Before Postfix 2.4, the event handler was not invoked after connect()
336
* error, and mail was not deferred. Because of this, mail would be stuck
337
* in the active queue after triggering a "connection refused" condition.
284
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
285
#define BLOCK_MODE NON_BLOCKING
286
#define ENABLE_EVENTS event_enable_write
287
#define EVENT_HANDLER qmgr_transport_connect
289
#define BLOCK_MODE BLOCKING
290
#define ENABLE_EVENTS event_enable_read
291
#define EVENT_HANDLER qmgr_transport_event
294
if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
295
msg_warn("connect to transport %s: %m", transport->name);
296
qmgr_transport_throttle(transport,
297
DSN_SIMPLE(&dsn, "4.3.0",
298
"mail transport unavailable"));
301
339
alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
302
alloc->stream = stream;
303
340
alloc->transport = transport;
304
341
alloc->notify = notify;
305
transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
306
ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
342
transport->pending += 1;
343
if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
344
NON_BLOCKING)) == 0) {
345
msg_warn("connect to transport %s: %m", transport->name);
346
event_request_timer(qmgr_transport_event, (char *) alloc, 0);
349
#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
350
#ifndef THRESHOLD_FD_WORKAROUND
351
#define THRESHOLD_FD_WORKAROUND 128
353
vstream_control(alloc->stream,
354
VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND,
357
event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
309
361
* Guard against broken systems.
333
386
transport->recipient_limit =
334
387
get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
335
388
var_dest_rcpt_limit, 0, 0);
389
transport->init_dest_concurrency =
390
get_mail_conf_int2(name, _INIT_DEST_CON,
391
var_init_dest_concurrency, 1, 0);
392
transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
337
if (transport->dest_concurrency_limit == 0
338
|| transport->dest_concurrency_limit >= var_init_dest_concurrency)
339
transport->init_dest_concurrency = var_init_dest_concurrency;
396
if (transport->rate_delay > 0)
397
transport->dest_concurrency_limit = 1;
398
if (transport->dest_concurrency_limit != 0
399
&& transport->dest_concurrency_limit < transport->init_dest_concurrency)
341
400
transport->init_dest_concurrency = transport->dest_concurrency_limit;
343
402
transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST,
353
412
var_xport_rcpt_limit, 0, 0);
354
413
transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
355
414
var_stack_rcpt_limit, 0, 0);
415
transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
416
var_xport_refill_limit, 1, 0);
417
transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
418
var_xport_refill_delay, 's', 1, 0);
357
420
transport->queue_byname = htable_create(0);
358
421
QMGR_LIST_INIT(transport->queue_list);
366
429
transport->candidate_cache_time = (time_t) 0;
367
430
transport->blocker_tag = 1;
368
431
transport->dsn = 0;
432
qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
433
VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
434
qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
435
VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
436
transport->fail_cohort_limit =
437
get_mail_conf_int2(name, _CONC_COHORT_LIM,
438
var_conc_cohort_limit, 0, 0);
369
439
if (qmgr_transport_byname == 0)
370
440
qmgr_transport_byname = htable_create(10);
371
441
htable_enter(qmgr_transport_byname, name, (char *) transport);