summaryrefslogtreecommitdiffstats
path: root/src/oqmgr/qmgr_deliver.c
blob: 100ccc73dd824659065988e6a604799176ea20f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
/*++
/* NAME
/*	qmgr_deliver 3
/* SUMMARY
/*	deliver one per-site queue entry to that site
/* SYNOPSIS
/*	#include "qmgr.h"
/*
/*	int	qmgr_deliver_concurrency;
/*
/*	int	qmgr_deliver(transport, fp)
/*	QMGR_TRANSPORT *transport;
/*	VSTREAM	*fp;
/* DESCRIPTION
/*	This module implements the client side of the `queue manager
/*	to delivery agent' protocol. The queue manager uses
/*	asynchronous I/O so that it can drive multiple delivery
/*	agents in parallel. Depending on the outcome of a delivery
/*	attempt, the status of messages, queues and transports is
/*	updated.
/*
/*	qmgr_deliver_concurrency is a global counter that says how
/*	many delivery processes are in use. This can be used, for
/*	example, to control the size of the `active' message queue.
/*
/*	qmgr_deliver() executes when a delivery process announces its
/*	availability for the named transport. It arranges for delivery
/*	of a suitable queue entry.  The \fIfp\fR argument specifies a
/*	stream that is connected to the delivery process, or a null
/*	pointer if the transport accepts no connection. Upon completion
/*	of delivery (successful or not), the stream is closed, so that the
/*	delivery process is released.
/* DIAGNOSTICS
/* LICENSE
/* .ad
/* .fi
/*	The Secure Mailer license must be distributed with this software.
/* AUTHOR(S)
/*	Wietse Venema
/*	IBM T.J. Watson Research
/*	P.O. Box 704
/*	Yorktown Heights, NY 10598, USA
/*
/*	Wietse Venema
/*	Google, Inc.
/*	111 8th Avenue
/*	New York, NY 10011, USA
/*--*/

/* System library. */

#include <sys_defs.h>
#include <time.h>
#include <string.h>

/* Utility library. */

#include <msg.h>
#include <vstring.h>
#include <vstream.h>
#include <vstring_vstream.h>
#include <events.h>
#include <iostuff.h>
#include <stringops.h>
#include <mymalloc.h>

/* Global library. */

#include <mail_queue.h>
#include <mail_proto.h>
#include <recipient_list.h>
#include <mail_params.h>
#include <deliver_request.h>
#include <verp_sender.h>
#include <dsn_util.h>
#include <dsn_buf.h>
#include <dsb_scan.h>
#include <rcpt_print.h>
#include <smtputf8.h>

/* Application-specific. */

#include "qmgr.h"

 /*
  * Important note on the _transport_rate_delay implementation: after
  * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
  * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
  * or qmgr_transport_throttle(). Otherwise, transports with non-zero
  * _transport_rate_delay will become stuck.
  */

int     qmgr_deliver_concurrency;

 /*
  * Message delivery status codes.
  */
#define DELIVER_STAT_OK		0	/* all recipients delivered */
#define DELIVER_STAT_DEFER	1	/* try some recipients later */
#define DELIVER_STAT_CRASH	2	/* mailer internal problem */

/* qmgr_deliver_initial_reply - retrieve initial delivery process response */

static int qmgr_deliver_initial_reply(VSTREAM *stream)
{
    if (peekfd(vstream_fileno(stream)) < 0) {
	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
	return (DELIVER_STAT_CRASH);
    } else if (attr_scan(stream, ATTR_FLAG_STRICT,
		  RECV_ATTR_STREQ(MAIL_ATTR_PROTO, MAIL_ATTR_PROTO_DELIVER),
			 ATTR_TYPE_END) != 0) {
	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
	return (DELIVER_STAT_DEFER);
    } else {
	return (0);
    }
}

/* qmgr_deliver_final_reply - retrieve final delivery process response */

static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
{
    int     stat;

    if (peekfd(vstream_fileno(stream)) < 0) {
	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
	return (DELIVER_STAT_CRASH);
    } else if (attr_scan(stream, ATTR_FLAG_STRICT,
			 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
			 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
			 ATTR_TYPE_END) != 2) {
	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
	return (DELIVER_STAT_CRASH);
    } else {
	return (stat ? DELIVER_STAT_DEFER : 0);
    }
}

/* qmgr_deliver_send_request - send delivery request to delivery process */

static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
{
    RECIPIENT_LIST list = entry->rcpt_list;
    RECIPIENT *recipient;
    QMGR_MESSAGE *message = entry->message;
    VSTRING *sender_buf = 0;
    MSG_STATS stats;
    char   *sender;
    int     flags;
    int     smtputf8 = message->smtputf8;
    const char *addr;

    /*
     * Todo: integrate with code up-stream that builds the delivery request.
     */
    for (recipient = list.info; recipient < list.info + list.len; recipient++)
	if (var_smtputf8_enable && (addr = recipient->address)[0]
	    && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
	    smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
	    if (message->verp_delims)
		smtputf8 |= SMTPUTF8_FLAG_SENDER;
	}

    /*
     * If variable envelope return path is requested, change prefix+@origin
     * into prefix+user=domain@origin. Note that with VERP there is only one
     * recipient per delivery.
     */
    if (message->verp_delims == 0) {
	sender = message->sender;
    } else {
	sender_buf = vstring_alloc(100);
	verp_sender(sender_buf, message->verp_delims,
		    message->sender, list.info);
	sender = vstring_str(sender_buf);
    }

    flags = message->tflags
	| entry->queue->dflags
	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
    (void) QMGR_MSG_STATS(&stats, message);
    attr_print(stream, ATTR_FLAG_NONE,
	       SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
	       SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
	       SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
	       SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
	       SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
	       SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
	       SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
	       SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
	       SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
	       SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
	       SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
	       SEND_ATTR_FUNC(msg_stats_print, (const void *) &stats),
    /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
	     SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
	       SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
    /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
	       SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
	     SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
	       SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
    /* XXX Ditto if we want to pass TLS certificate info. */
	       SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
	     SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
	       SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
	       ATTR_TYPE_END);
    if (sender_buf != 0)
	vstring_free(sender_buf);
    for (recipient = list.info; recipient < list.info + list.len; recipient++)
	attr_print(stream, ATTR_FLAG_NONE,
		   SEND_ATTR_FUNC(rcpt_print, (const void *) recipient),
		   ATTR_TYPE_END);
    if (vstream_fflush(stream) != 0) {
	msg_warn("write to process (%s): %m", entry->queue->transport->name);
	return (-1);
    } else {
	if (msg_verbose)
	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
	return (0);
    }
}

/* qmgr_deliver_abort - transport response watchdog */

static void qmgr_deliver_abort(int unused_event, void *context)
{
    QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
    QMGR_QUEUE *queue = entry->queue;
    QMGR_TRANSPORT *transport = queue->transport;
    QMGR_MESSAGE *message = entry->message;

    msg_fatal("%s: timeout receiving delivery status from transport: %s",
	      message->queue_id, transport->name);
}

/* qmgr_deliver_update - process delivery status report */

static void qmgr_deliver_update(int unused_event, void *context)
{
    QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
    QMGR_QUEUE *queue = entry->queue;
    QMGR_TRANSPORT *transport = queue->transport;
    QMGR_MESSAGE *message = entry->message;
    static DSN_BUF *dsb;
    int     status;

    /*
     * Release the delivery agent from a "hot" queue entry.
     */
#define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
	event_disable_readwrite(vstream_fileno(entry->stream)); \
	(void) vstream_fclose(entry->stream); \
	entry->stream = 0; \
	qmgr_deliver_concurrency--; \
    } while (0)

    if (dsb == 0)
	dsb = dsb_create();

    /*
     * The message transport has responded. Stop the watchdog timer.
     */
    event_cancel_timer(qmgr_deliver_abort, context);

    /*
     * Retrieve the delivery agent status report. The numerical status code
     * indicates if delivery should be tried again. The reason text is sent
     * only when a site should be avoided for a while, so that the queue
     * manager can log why it does not even try to schedule delivery to the
     * affected recipients.
     */
    status = qmgr_deliver_final_reply(entry->stream, dsb);

    /*
     * The mail delivery process failed for some reason (although delivery
     * may have been successful). Back off with this transport type for a
     * while. Dispose of queue entries for this transport that await
     * selection (the todo lists). Stay away from queue entries that have
     * been selected (the busy lists), or we would have dangling pointers.
     * The queue itself won't go away before we dispose of the current queue
     * entry.
     */
    if (status == DELIVER_STAT_CRASH) {
	message->flags |= DELIVER_STAT_DEFER;
#if 0
	whatsup = concatenate("unknown ", transport->name,
			      " mail transport error", (char *) 0);
	qmgr_transport_throttle(transport,
				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
	myfree(whatsup);
#else
	qmgr_transport_throttle(transport,
				DSN_SIMPLE(&dsb->dsn, "4.3.0",
					   "unknown mail transport error"));
#endif
	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
		 transport->name);

	/*
	 * Assume the worst and write a defer logfile record for each
	 * recipient. This omission was already present in the first queue
	 * manager implementation of 199703, and was fixed 200511.
	 * 
	 * To avoid the synchronous qmgr_defer_recipient() operation for each
	 * recipient of this queue entry, release the delivery process and
	 * move the entry back to the todo queue. Let qmgr_defer_transport()
	 * log the recipient asynchronously if possible, and get out of here.
	 * Note: if asynchronous logging is not possible,
	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
	 * the entry becomes a dangling pointer.
	 */
	QMGR_DELIVER_RELEASE_AGENT(entry);
	qmgr_entry_unselect(queue, entry);
	qmgr_defer_transport(transport, &dsb->dsn);
	return;
    }

    /*
     * This message must be tried again.
     * 
     * If we have a problem talking to this site, back off with this site for a
     * while; dispose of queue entries for this site that await selection
     * (the todo list); stay away from queue entries that have been selected
     * (the busy list), or we would have dangling pointers. The queue itself
     * won't go away before we dispose of the current queue entry.
     * 
     * XXX Caution: DSN_COPY() will panic on empty status or reason.
     */
#define SUSPENDED	"delivery temporarily suspended: "

    if (status == DELIVER_STAT_DEFER) {
	message->flags |= DELIVER_STAT_DEFER;
	if (VSTRING_LEN(dsb->status)) {
	    /* Sanitize the DSN status/reason from the delivery agent. */
	    if (!dsn_valid(vstring_str(dsb->status)))
		vstring_strcpy(dsb->status, "4.0.0");
	    if (VSTRING_LEN(dsb->reason) == 0)
		vstring_strcpy(dsb->reason, "unknown error");
	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
	    if (QMGR_QUEUE_READY(queue)) {
		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
		if (QMGR_QUEUE_THROTTLED(queue))
		    qmgr_defer_todo(queue, &dsb->dsn);
	    }
	}
    }

    /*
     * No problems detected. Mark the transport and queue as alive. The queue
     * itself won't go away before we dispose of the current queue entry.
     */
    if (status != DELIVER_STAT_CRASH) {
	qmgr_transport_unthrottle(transport);
	if (VSTRING_LEN(dsb->reason) == 0)
	    qmgr_queue_unthrottle(queue);
    }

    /*
     * Release the delivery process, and give some other queue entry a chance
     * to be delivered. When all recipients for a message have been tried,
     * decide what to do next with this message: defer, bounce, delete.
     */
    QMGR_DELIVER_RELEASE_AGENT(entry);
    qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
}

/* qmgr_deliver - deliver one per-site queue entry */

void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
{
    QMGR_QUEUE *queue;
    QMGR_ENTRY *entry;
    DSN     dsn;

    /*
     * Find out if this delivery process is really available. Once elected,
     * the delivery process is supposed to express its happiness. If there is
     * a problem, wipe the pending deliveries for this transport. This
     * routine runs in response to an external event, so it does not run
     * while some other queue manipulation is happening.
     */
    if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
#if 0
	whatsup = concatenate(transport->name,
			      " mail transport unavailable", (char *) 0);
	qmgr_transport_throttle(transport,
				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
	myfree(whatsup);
#else
	qmgr_transport_throttle(transport,
				DSN_SIMPLE(&dsn, "4.3.0",
					   "mail transport unavailable"));
#endif
	qmgr_defer_transport(transport, &dsn);
	if (stream)
	    (void) vstream_fclose(stream);
	return;
    }

    /*
     * Find a suitable queue entry. Things may have changed since this
     * transport was allocated. If no suitable entry is found,
     * unceremoniously disconnect from the delivery process. The delivery
     * agent request reading routine is prepared for the queue manager to
     * change its mind for no apparent reason.
     */
    if ((queue = qmgr_queue_select(transport)) == 0
	|| (entry = qmgr_entry_select(queue)) == 0) {
	(void) vstream_fclose(stream);
	return;
    }

    /*
     * Send the queue file info and recipient info to the delivery process.
     * If there is a problem, wipe the pending deliveries for this transport.
     * This routine runs in response to an external event, so it does not run
     * while some other queue manipulation is happening.
     */
    if (qmgr_deliver_send_request(entry, stream) < 0) {
	qmgr_entry_unselect(queue, entry);
#if 0
	whatsup = concatenate(transport->name,
			      " mail transport unavailable", (char *) 0);
	qmgr_transport_throttle(transport,
				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
	myfree(whatsup);
#else
	qmgr_transport_throttle(transport,
				DSN_SIMPLE(&dsn, "4.3.0",
					   "mail transport unavailable"));
#endif
	qmgr_defer_transport(transport, &dsn);
	/* warning: entry and queue may be dangling pointers here */
	(void) vstream_fclose(stream);
	return;
    }

    /*
     * If we get this far, go wait for the delivery status report.
     */
    qmgr_deliver_concurrency++;
    entry->stream = stream;
    event_enable_read(vstream_fileno(stream),
		      qmgr_deliver_update, (void *) entry);

    /*
     * Guard against broken systems.
     */
    event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
}