diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 16:18:56 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 16:18:56 +0000 |
commit | b7c15c31519dc44c1f691e0466badd556ffe9423 (patch) | |
tree | f944572f288bab482a615e09af627d9a2b6727d8 /src/qmgr/qmgr_message.c | |
parent | Initial commit. (diff) | |
download | postfix-b7c15c31519dc44c1f691e0466badd556ffe9423.tar.xz postfix-b7c15c31519dc44c1f691e0466badd556ffe9423.zip |
Adding upstream version 3.7.10.upstream/3.7.10upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/qmgr/qmgr_message.c | 1622 |
1 files changed, 1622 insertions, 0 deletions
diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c new file mode 100644 index 0000000..79143f3 --- /dev/null +++ b/src/qmgr/qmgr_message.c @@ -0,0 +1,1622 @@ +/*++ +/* NAME +/* qmgr_message 3 +/* SUMMARY +/* in-core message structures +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_message_count; +/* int qmgr_recipient_count; +/* int qmgr_vrfy_pend_count; +/* +/* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode) +/* const char *class; +/* const char *name; +/* int qflags; +/* mode_t mode; +/* +/* QMGR_MESSAGE *qmgr_message_realloc(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_free(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_update_warn(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_kill_record(message, offset) +/* QMGR_MESSAGE *message; +/* long offset; +/* DESCRIPTION +/* This module performs en-gross operations on queue messages. +/* +/* qmgr_message_count is a global counter for the total number +/* of in-core message structures (i.e. the total size of the +/* `active' message queue). +/* +/* qmgr_recipient_count is a global counter for the total number +/* of in-core recipient structures (i.e. the sum of all recipients +/* in all in-core message structures). +/* +/* qmgr_vrfy_pend_count is a global counter for the total +/* number of in-core message structures that are associated +/* with an address verification request. Requests that exceed +/* the address_verify_pending_limit are deferred immediately. +/* This is a backup mechanism for a more refined enforcement +/* mechanism in the verify(8) daemon. +/* +/* qmgr_message_alloc() creates an in-core message structure +/* with sender and recipient information taken from the named queue +/* file. A null result means the queue file could not be read or +/* that the queue file contained incorrect information. A result +/* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number +/* of recipients read from a queue file is limited by the global +/* var_qmgr_rcpt_limit configuration parameter. When the limit +/* is reached, the \fIrcpt_offset\fR structure member is set to +/* the position where the read was terminated. Recipients are +/* run through the resolver, and are assigned to destination +/* queues. Recipients that cannot be assigned are deferred or +/* bounced. Mail that has bounced twice is silently absorbed. +/* A non-zero mode means change the queue file permissions. +/* +/* qmgr_message_realloc() resumes reading recipients from the queue +/* file, and updates the recipient list and \fIrcpt_offset\fR message +/* structure members. A null result means that the file could not be +/* read or that the file contained incorrect information. Recipient +/* limit imposed this time is based on the position of the message +/* job(s) on corresponding transport job list(s). It's considered +/* an error to call this when the recipient slots can't be allocated. +/* +/* qmgr_message_free() destroys an in-core message structure and makes +/* the resources available for reuse. It is an error to destroy +/* a message structure that is still referenced by queue entry structures. +/* +/* qmgr_message_update_warn() takes a closed message, opens it, updates +/* the warning field, and closes it again. +/* +/* qmgr_message_kill_record() takes a closed message, opens it, updates +/* the record type at the given offset to "killed", and closes the file. +/* A killed envelope record is ignored. Killed records are not allowed +/* inside the message content. +/* DIAGNOSTICS +/* Warnings: malformed message file. Fatal errors: out of memory. +/* SEE ALSO +/* envelope(3) message envelope parser +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <sys/stat.h> +#include <stdlib.h> +#include <stdio.h> /* sscanf() */ +#include <fcntl.h> +#include <errno.h> +#include <unistd.h> +#include <string.h> +#include <ctype.h> + +/* Utility library. */ + +#include <msg.h> +#include <mymalloc.h> +#include <vstring.h> +#include <vstream.h> +#include <split_at.h> +#include <valid_hostname.h> +#include <argv.h> +#include <stringops.h> +#include <myflock.h> +#include <sane_time.h> + +/* Global library. */ + +#include <dict.h> +#include <mail_queue.h> +#include <mail_params.h> +#include <canon_addr.h> +#include <record.h> +#include <rec_type.h> +#include <sent.h> +#include <deliver_completed.h> +#include <opened.h> +#include <verp_sender.h> +#include <mail_proto.h> +#include <qmgr_user.h> +#include <split_addr.h> +#include <dsn_mask.h> +#include <rec_attr_map.h> + +/* Client stubs. */ + +#include <rewrite_clnt.h> +#include <resolve_clnt.h> + +/* Application-specific. */ + +#include "qmgr.h" + +int qmgr_message_count; +int qmgr_recipient_count; +int qmgr_vrfy_pend_count; + +/* qmgr_message_create - create in-core message structure */ + +static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, + const char *queue_id, int qflags) +{ + QMGR_MESSAGE *message; + + message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE)); + qmgr_message_count++; + message->flags = 0; + message->qflags = qflags; + message->tflags = 0; + message->tflags_offset = 0; + message->rflags = QMGR_READ_FLAG_DEFAULT; + message->fp = 0; + message->refcount = 0; + message->single_rcpt = 0; + message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0; + message->create_time = 0; + GETTIMEOFDAY(&message->active_time); + message->queued_time = sane_time(); + message->refill_time = 0; + message->data_offset = 0; + message->queue_id = mystrdup(queue_id); + message->queue_name = mystrdup(queue_name); + message->encoding = 0; + message->sender = 0; + message->dsn_envid = 0; + message->dsn_ret = 0; + message->smtputf8 = 0; + message->filter_xport = 0; + message->inspect_xport = 0; + message->redirect_addr = 0; + message->data_size = 0; + message->cont_length = 0; + message->warn_offset = 0; + message->warn_time = 0; + message->rcpt_offset = 0; + message->verp_delims = 0; + message->client_name = 0; + message->client_addr = 0; + message->client_port = 0; + message->client_proto = 0; + message->client_helo = 0; + message->sasl_method = 0; + message->sasl_username = 0; + message->sasl_sender = 0; + message->log_ident = 0; + message->rewrite_context = 0; + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + message->rcpt_count = 0; + message->rcpt_limit = var_qmgr_msg_rcpt_limit; + message->rcpt_unread = 0; + QMGR_LIST_INIT(message->job_list); + return (message); +} + +/* qmgr_message_close - close queue file */ + +static void qmgr_message_close(QMGR_MESSAGE *message) +{ + vstream_fclose(message->fp); + message->fp = 0; +} + +/* qmgr_message_open - open queue file */ + +static int qmgr_message_open(QMGR_MESSAGE *message) +{ + + /* + * Sanity check. + */ + if (message->fp) + msg_panic("%s: queue file is open", message->queue_id); + + /* + * Open this queue file. Skip files that we cannot open. Back off when + * the system appears to be running out of resources. + */ + if ((message->fp = mail_queue_open(message->queue_name, + message->queue_id, + O_RDWR, 0)) == 0) { + if (errno != ENOENT) + msg_fatal("open %s %s: %m", message->queue_name, message->queue_id); + msg_warn("open %s %s: %m", message->queue_name, message->queue_id); + return (-1); + } + return (0); +} + +/* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */ + +static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message) +{ + VSTRING *buf; + long orig_offset, extra_offset; + int rec_type; + char *start; + + /* + * Initialize. No early returns or we have a memory leak. + */ + buf = vstring_alloc(100); + if ((orig_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Rewind to the very beginning to make sure we see all records. + */ + if (vstream_fseek(message->fp, 0, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Scan through the old style queue file. Count the total number of + * recipients and find the data/extra sections offsets. Note that the new + * queue files require that data_size equals extra_offset - data_offset, + * so we set data_size to this as well and ignore the size record itself + * completely. + */ + message->rcpt_unread = 0; + for (;;) { + rec_type = rec_get(message->fp, buf, 0); + if (rec_type <= 0) + /* Report missing end record later. */ + break; + start = vstring_str(buf); + if (msg_verbose > 1) + msg_info("old-style scan record %c %s", rec_type, start); + if (rec_type == REC_TYPE_END) + break; + if (rec_type == REC_TYPE_DONE + || rec_type == REC_TYPE_RCPT + || rec_type == REC_TYPE_DRCP) { + message->rcpt_unread++; + continue; + } + if (rec_type == REC_TYPE_MESG) { + if (message->data_offset == 0) { + if ((message->data_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + if ((extra_offset = atol(start)) <= message->data_offset) + msg_fatal("bad extra offset %s file %s", + start, VSTREAM_PATH(message->fp)); + if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + message->data_size = extra_offset - message->data_offset; + } + continue; + } + } + + /* + * Clean up. + */ + if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + vstring_free(buf); + + /* + * Sanity checks. Verify that all required information was found, + * including the queue file end marker. + */ + if (message->data_offset == 0 || rec_type != REC_TYPE_END) + msg_fatal("%s: envelope records out of order", message->queue_id); +} + +/* qmgr_message_read - read envelope records */ + +static int qmgr_message_read(QMGR_MESSAGE *message) +{ + VSTRING *buf; + int rec_type; + long curr_offset; + long save_offset = message->rcpt_offset; /* save a flag */ + int save_unread = message->rcpt_unread; /* save a count */ + char *start; + int recipient_limit; + const char *error_text; + char *name; + char *value; + char *orig_rcpt = 0; + int count; + int dsn_notify = 0; + char *dsn_orcpt = 0; + int n; + int have_log_client_attr = 0; + static const char env_rec_types[] = REC_TYPE_ENVELOPE REC_TYPE_EXTRACT; + static const char extra_rec_type[] = {REC_TYPE_XTRA, 0}; + const char *expected_rec_types; + + /* + * Initialize. No early returns or we have a memory leak. + */ + buf = vstring_alloc(100); + + /* + * If we re-open this file, skip over on-file recipient records that we + * already looked at, and refill the in-core recipient address list. + * + * For the first time, the message recipient limit is calculated from the + * global recipient limit. This is to avoid reading little recipients + * when the active queue is near empty. When the queue becomes full, only + * the necessary amount is read in core. Such priming is necessary + * because there are no message jobs yet. + * + * For the next time, the recipient limit is based solely on the message + * jobs' positions in the job lists and/or job stacks. + */ + if (message->rcpt_offset) { + if (message->rcpt_list.len) + msg_panic("%s: recipient list not empty on recipient reload", + message->queue_id); + if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + message->rcpt_offset = 0; + recipient_limit = message->rcpt_limit - message->rcpt_count; + } else { + recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count; + if (recipient_limit < message->rcpt_limit) + recipient_limit = message->rcpt_limit; + } + /* Keep interrupt latency in check. */ + if (recipient_limit > 5000) + recipient_limit = 5000; + if (recipient_limit <= 0) + msg_panic("%s: no recipient slots available", message->queue_id); + if (msg_verbose) + msg_info("%s: recipient limit %d", message->queue_id, recipient_limit); + + /* + * Read envelope records. XXX Rely on the front-end programs to enforce + * record size limits. Read up to recipient_limit recipients from the + * queue file, to protect against memory exhaustion. Recipient records + * may appear before or after the message content, so we keep reading + * from the queue file until we have enough recipients (rcpt_offset != 0) + * and until we know all the non-recipient information. + * + * Note that the total recipient count record is accurate only for fresh + * queue files. After some of the recipients are marked as done and the + * queue file is deferred, it can be used as upper bound estimate only. + * Fortunately, this poses no major problem on the scheduling algorithm, + * as the only impact is that the already deferred messages are not + * chosen by qmgr_job_candidate() as often as they could. + * + * On the first open, we must examine all non-recipient records. + * + * Optimization: when we know that recipient records are not mixed with + * non-recipient records, as is typical with mailing list mail, then we + * can avoid having to examine all the queue file records before we can + * start deliveries. This avoids some file system thrashing with huge + * mailing lists. + */ + for (;;) { + expected_rec_types = env_rec_types; + if ((curr_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + if (curr_offset == message->data_offset && curr_offset > 0) { + if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + curr_offset += message->data_size; + expected_rec_types = extra_rec_type; + } + rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE); + start = vstring_str(buf); + if (msg_verbose > 1) + msg_info("record %c %s", rec_type, start); + if (rec_type == REC_TYPE_PTR) { + if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR) + break; + /* Need to update curr_offset after pointer jump. */ + continue; + } + if (rec_type <= 0) { + msg_warn("%s: message rejected: missing end record", + message->queue_id); + break; + } + if (strchr(expected_rec_types, rec_type) == 0) { + msg_warn("Unexpected record type '%c' at offset %ld", + rec_type, (long) curr_offset); + rec_type = REC_TYPE_ERROR; + break; + } + if (rec_type == REC_TYPE_END) { + message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; + break; + } + + /* + * Map named attributes to pseudo record types, so that we don't have + * to pollute the queue file with records that are incompatible with + * past Postfix versions. Preferably, people should be able to back + * out from an upgrade without losing mail. + */ + if (rec_type == REC_TYPE_ATTR) { + if ((error_text = split_nameval(start, &name, &value)) != 0) { + msg_warn("%s: bad attribute record: %s: %.200s", + message->queue_id, error_text, start); + rec_type = REC_TYPE_ERROR; + break; + } + if ((n = rec_attr_map(name)) != 0) { + start = value; + rec_type = n; + } + } + + /* + * Process recipient records. + */ + if (rec_type == REC_TYPE_RCPT) { + /* See also below for code setting orig_rcpt etc. */ + if (message->rcpt_offset == 0) { + message->rcpt_unread--; + recipient_list_add(&message->rcpt_list, curr_offset, + dsn_orcpt ? dsn_orcpt : "", + dsn_notify ? dsn_notify : 0, + orig_rcpt ? orig_rcpt : "", start); + if (dsn_orcpt) { + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (orig_rcpt) { + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (dsn_notify) + dsn_notify = 0; + if (message->rcpt_list.len >= recipient_limit) { + if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", + VSTREAM_PATH(message->fp)); + if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) + /* We already examined all non-recipient records. */ + break; + if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER) + /* Examine all remaining non-recipient records. */ + continue; + /* Optimizations for "pure recipient" record sections. */ + if (curr_offset > message->data_offset) { + /* We already examined all non-recipient records. */ + message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; + break; + } + + /* + * Examine non-recipient records in the extracted + * segment. Note that this skips to the message start + * record, because the handler for that record changes + * the expectations for allowed record types. + */ + if (vstream_fseek(message->fp, message->data_offset, + SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + continue; + } + } + continue; + } + if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) { + if (message->rcpt_offset == 0) { + message->rcpt_unread--; + if (dsn_orcpt) { + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (orig_rcpt) { + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (dsn_notify) + dsn_notify = 0; + } + continue; + } + if (rec_type == REC_TYPE_DSN_ORCPT) { + /* See also above for code clearing dsn_orcpt. */ + if (dsn_orcpt != 0) { + msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>", + message->queue_id, dsn_orcpt); + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (message->rcpt_offset == 0) + dsn_orcpt = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_DSN_NOTIFY) { + /* See also above for code clearing dsn_notify. */ + if (dsn_notify != 0) { + msg_warn("%s: ignoring out-of-order DSN notify flags <%d>", + message->queue_id, dsn_notify); + dsn_notify = 0; + } + if (message->rcpt_offset == 0) { + if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n)) + msg_warn("%s: ignoring malformed DSN notify flags <%.200s>", + message->queue_id, start); + else + dsn_notify = n; + continue; + } + } + if (rec_type == REC_TYPE_ORCP) { + /* See also above for code clearing orig_rcpt. */ + if (orig_rcpt != 0) { + msg_warn("%s: ignoring out-of-order original recipient <%.200s>", + message->queue_id, orig_rcpt); + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (message->rcpt_offset == 0) + orig_rcpt = mystrdup(start); + continue; + } + + /* + * Process non-recipient records. + */ + if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) + /* We already examined all non-recipient records. */ + continue; + if (rec_type == REC_TYPE_SIZE) { + if (message->data_offset == 0) { + if ((count = sscanf(start, "%ld %ld %d %d %ld %d", + &message->data_size, &message->data_offset, + &message->rcpt_unread, &message->rflags, + &message->cont_length, + &message->smtputf8)) >= 3) { + /* Postfix >= 1.0 (a.k.a. 20010228). */ + if (message->data_offset <= 0 || message->data_size <= 0) { + msg_warn("%s: invalid size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + if (message->rflags & ~QMGR_READ_FLAG_USER) { + msg_warn("%s: invalid flags in size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + } else if (count == 1) { + /* Postfix < 1.0 (a.k.a. 20010228). */ + qmgr_message_oldstyle_scan(message); + } else { + /* Can't happen. */ + msg_warn("%s: message rejected: weird size record", + message->queue_id); + rec_type = REC_TYPE_ERROR; + break; + } + } + /* Postfix < 2.4 compatibility. */ + if (message->cont_length == 0) { + message->cont_length = message->data_size; + } else if (message->cont_length < 0) { + msg_warn("%s: invalid size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + continue; + } + if (rec_type == REC_TYPE_TIME) { + if (message->arrival_time.tv_sec == 0) + REC_TYPE_TIME_SCAN(start, message->arrival_time); + continue; + } + if (rec_type == REC_TYPE_CTIME) { + if (message->create_time == 0) + message->create_time = atol(start); + continue; + } + if (rec_type == REC_TYPE_FILT) { + if (message->filter_xport != 0) + myfree(message->filter_xport); + message->filter_xport = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_INSP) { + if (message->inspect_xport != 0) + myfree(message->inspect_xport); + message->inspect_xport = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_RDR) { + if (message->redirect_addr != 0) + myfree(message->redirect_addr); + message->redirect_addr = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_FROM) { + if (message->sender == 0) { + message->sender = mystrdup(start); + opened(message->queue_id, message->sender, + message->cont_length, message->rcpt_unread, + "queue %s", message->queue_name); + } + continue; + } + if (rec_type == REC_TYPE_DSN_ENVID) { + /* Allow Milter override. */ + if (message->dsn_envid != 0) + myfree(message->dsn_envid); + message->dsn_envid = mystrdup(start); + } + if (rec_type == REC_TYPE_DSN_RET) { + /* Allow Milter override. */ + if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n)) + msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s", + message->queue_id, start); + else + message->dsn_ret = n; + } + if (rec_type == REC_TYPE_ATTR) { + /* Allow extra segment to override envelope segment info. */ + if (strcmp(name, MAIL_ATTR_ENCODING) == 0) { + if (message->encoding != 0) + myfree(message->encoding); + message->encoding = mystrdup(value); + } + + /* + * Backwards compatibility. Before Postfix 2.3, the logging + * attributes were called client_name, etc. Now they are called + * log_client_name. etc., and client_name is used for the actual + * client information. To support old queue files we accept both + * names for the purpose of logging; the new name overrides the + * old one. + * + * XXX Do not use the "legacy" client_name etc. attribute values for + * initializing the logging attributes, when this file already + * contains the "modern" log_client_name etc. logging attributes. + * Otherwise, logging attributes that are not present in the + * queue file would be set with information from the real client. + */ + else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_name == 0) + message->client_name = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) { + if (have_log_client_attr == 0 && message->client_addr == 0) + message->client_addr = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) { + if (have_log_client_attr == 0 && message->client_port == 0) + message->client_port = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_proto == 0) + message->client_proto = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_helo == 0) + message->client_helo = mystrdup(value); + } + /* Original client attributes. */ + else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) { + if (message->client_name != 0) + myfree(message->client_name); + message->client_name = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) { + if (message->client_addr != 0) + myfree(message->client_addr); + message->client_addr = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) { + if (message->client_port != 0) + myfree(message->client_port); + message->client_port = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) { + if (message->client_proto != 0) + myfree(message->client_proto); + message->client_proto = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) { + if (message->client_helo != 0) + myfree(message->client_helo); + message->client_helo = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) { + if (message->sasl_method == 0) + message->sasl_method = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_METHOD, value); + } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) { + if (message->sasl_username == 0) + message->sasl_username = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_USERNAME, value); + } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) { + if (message->sasl_sender == 0) + message->sasl_sender = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_SENDER, value); + } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) { + if (message->log_ident == 0) + message->log_ident = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_LOG_IDENT, value); + } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) { + if (message->rewrite_context == 0) + message->rewrite_context = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_RWR_CONTEXT, value); + } + + /* + * Optional tracing flags (verify, sendmail -v, sendmail -bv). + * This record is killed after a trace logfile report is sent and + * after the logfile is deleted. + */ + else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) { + if (message->tflags == 0) { + message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value)); + if (message->tflags == DEL_REQ_FLAG_RECORD) + message->tflags_offset = curr_offset; + else + message->tflags_offset = 0; + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) + qmgr_vrfy_pend_count++; + } + } + continue; + } + if (rec_type == REC_TYPE_WARN) { + if (message->warn_offset == 0) { + message->warn_offset = curr_offset; + REC_TYPE_WARN_SCAN(start, message->warn_time); + } + continue; + } + if (rec_type == REC_TYPE_VERP) { + if (message->verp_delims == 0) { + if (message->sender == 0 || message->sender[0] == 0) { + msg_warn("%s: ignoring VERP request for null sender", + message->queue_id); + } else if (verp_delims_verify(start) != 0) { + msg_warn("%s: ignoring bad VERP request: \"%.100s\"", + message->queue_id, start); + } else { + if (msg_verbose) + msg_info("%s: enabling VERP for sender \"%.100s\"", + message->queue_id, message->sender); + message->single_rcpt = 1; + message->verp_delims = mystrdup(start); + } + } + continue; + } + } + + /* + * Grr. + */ + if (dsn_orcpt != 0) { + if (rec_type > 0) + msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>", + message->queue_id, dsn_orcpt); + myfree(dsn_orcpt); + } + if (orig_rcpt != 0) { + if (rec_type > 0) + msg_warn("%s: ignoring out-of-order original recipient <%.200s>", + message->queue_id, orig_rcpt); + myfree(orig_rcpt); + } + + /* + * After sending a "delayed" warning, request sender notification when + * message delivery is completed. While "mail delayed" notifications are + * bad enough because they multiply the amount of email traffic, "delay + * cleared" notifications are even worse because they come in a sudden + * burst when the queue drains after a network outage. + */ + if (var_dsn_delay_cleared && message->warn_time < 0) + message->tflags |= DEL_REQ_FLAG_REC_DLY_SENT; + + /* + * Remember when we have read the last recipient batch. Note that we do + * it here after reading as reading might have used considerable amount + * of time. + */ + message->refill_time = sane_time(); + + /* + * Avoid clumsiness elsewhere in the program. When sending data across an + * IPC channel, sending an empty string is more convenient than sending a + * null pointer. + */ + if (message->dsn_envid == 0) + message->dsn_envid = mystrdup(""); + if (message->encoding == 0) + message->encoding = mystrdup(MAIL_ATTR_ENC_NONE); + if (message->client_name == 0) + message->client_name = mystrdup(""); + if (message->client_addr == 0) + message->client_addr = mystrdup(""); + if (message->client_port == 0) + message->client_port = mystrdup(""); + if (message->client_proto == 0) + message->client_proto = mystrdup(""); + if (message->client_helo == 0) + message->client_helo = mystrdup(""); + if (message->sasl_method == 0) + message->sasl_method = mystrdup(""); + if (message->sasl_username == 0) + message->sasl_username = mystrdup(""); + if (message->sasl_sender == 0) + message->sasl_sender = mystrdup(""); + if (message->log_ident == 0) + message->log_ident = mystrdup(""); + if (message->rewrite_context == 0) + message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL); + /* Postfix < 2.3 compatibility. */ + if (message->create_time == 0) + message->create_time = message->arrival_time.tv_sec; + + /* + * Clean up. + */ + vstring_free(buf); + + /* + * Sanity checks. Verify that all required information was found, + * including the queue file end marker. + */ + if (message->rcpt_unread < 0 + || (message->rcpt_offset == 0 && message->rcpt_unread != 0)) { + msg_warn("%s: rcpt count mismatch (%d)", + message->queue_id, message->rcpt_unread); + message->rcpt_unread = 0; + } + if (rec_type <= 0) { + /* Already logged warning. */ + } else if (message->arrival_time.tv_sec == 0) { + msg_warn("%s: message rejected: missing arrival time record", + message->queue_id); + } else if (message->sender == 0) { + msg_warn("%s: message rejected: missing sender record", + message->queue_id); + } else if (message->data_offset == 0) { + msg_warn("%s: message rejected: missing size record", + message->queue_id); + } else { + return (0); + } + message->rcpt_offset = save_offset; /* restore flag */ + message->rcpt_unread = save_unread; /* restore count */ + recipient_list_free(&message->rcpt_list); + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + return (-1); +} + +/* qmgr_message_update_warn - update the time of next delay warning */ + +void qmgr_message_update_warn(QMGR_MESSAGE *message) +{ + + /* + * After the "mail delayed" warning, optionally send a "delay cleared" + * notification. + */ + if (qmgr_message_open(message) + || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0 + || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT, + REC_TYPE_WARN_ARG(-1)) < 0 + || vstream_fflush(message->fp)) + msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); + qmgr_message_close(message); +} + +/* qmgr_message_kill_record - mark one message record as killed */ + +void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset) +{ + if (offset <= 0) + msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset); + if (qmgr_message_open(message) + || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0 + || vstream_fflush(message->fp)) + msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); + qmgr_message_close(message); +} + +/* qmgr_message_sort_compare - compare recipient information */ + +static int qmgr_message_sort_compare(const void *p1, const void *p2) +{ + RECIPIENT *rcpt1 = (RECIPIENT *) p1; + RECIPIENT *rcpt2 = (RECIPIENT *) p2; + QMGR_QUEUE *queue1; + QMGR_QUEUE *queue2; + char *at1; + char *at2; + int result; + + /* + * Compare most significant to least significant recipient attributes. + * The comparison function must be transitive, so NULL values need to be + * assigned an ordinal (we set NULL last). + */ + + queue1 = rcpt1->u.queue; + queue2 = rcpt2->u.queue; + if (queue1 != 0 && queue2 == 0) + return (-1); + if (queue1 == 0 && queue2 != 0) + return (1); + if (queue1 != 0 && queue2 != 0) { + + /* + * Compare message transport. + */ + if ((result = strcmp(queue1->transport->name, + queue2->transport->name)) != 0) + return (result); + + /* + * Compare queue name (nexthop or recipient@nexthop). + */ + if ((result = strcmp(queue1->name, queue2->name)) != 0) + return (result); + } + + /* + * Compare recipient domain. + */ + at1 = strrchr(rcpt1->address, '@'); + at2 = strrchr(rcpt2->address, '@'); + if (at1 == 0 && at2 != 0) + return (1); + if (at1 != 0 && at2 == 0) + return (-1); + if (at1 != 0 && at2 != 0 + && (result = strcasecmp_utf8(at1, at2)) != 0) + return (result); + + /* + * Compare recipient address. + */ + return (strcmp(rcpt1->address, rcpt2->address)); +} + +/* qmgr_message_sort - sort message recipient addresses by domain */ + +static void qmgr_message_sort(QMGR_MESSAGE *message) +{ + qsort((void *) message->rcpt_list.info, message->rcpt_list.len, + sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare); + if (msg_verbose) { + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *rcpt; + + msg_info("start sorted recipient list"); + for (rcpt = list.info; rcpt < list.info + list.len; rcpt++) + msg_info("qmgr_message_sort: %s", rcpt->address); + msg_info("end sorted recipient list"); + } +} + +/* qmgr_resolve_one - resolve or skip one recipient */ + +static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient, + const char *addr, RESOLVE_REPLY *reply) +{ +#define QMGR_REDIRECT(rp, tp, np) do { \ + (rp)->flags = 0; \ + vstring_strcpy((rp)->transport, (tp)); \ + vstring_strcpy((rp)->nexthop, (np)); \ + } while (0) + + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0) + resolve_clnt_query_from(message->sender, addr, reply); + else + resolve_clnt_verify_from(message->sender, addr, reply); + if (reply->flags & RESOLVE_FLAG_FAIL) { + QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY, + "4.3.0 address resolver failure"); + return (0); + } else if (reply->flags & RESOLVE_FLAG_ERROR) { + QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR, + "5.1.3 bad address syntax"); + return (0); + } else { + return (0); + } +} + +/* qmgr_message_resolve - resolve recipients */ + +static void qmgr_message_resolve(QMGR_MESSAGE *message) +{ + static ARGV *defer_xport_argv; + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *recipient; + QMGR_TRANSPORT *transport = 0; + QMGR_QUEUE *queue = 0; + RESOLVE_REPLY reply; + VSTRING *queue_name; + char *at; + char **cpp; + char *nexthop; + ssize_t len; + int status; + DSN dsn; + MSG_STATS stats; + DSN *saved_dsn; + +#define STREQ(x,y) (strcmp(x,y) == 0) +#define STR vstring_str +#define LEN VSTRING_LEN + + resolve_clnt_init(&reply); + queue_name = vstring_alloc(1); + for (recipient = list.info; recipient < list.info + list.len; recipient++) { + + /* + * Redirect overrides all else. But only once (per entire message). + * For consistency with the remainder of Postfix, rewrite the address + * to canonical form before resolving it. + */ + if (message->redirect_addr) { + if (recipient > list.info) { + recipient->u.queue = 0; + continue; + } + message->rcpt_offset = 0; + message->rcpt_unread = 0; + + rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr, + reply.recipient); + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + if (qmgr_resolve_one(message, recipient, + recipient->address, &reply) < 0) + continue; + if (!STREQ(recipient->address, STR(reply.recipient))) + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + } + + /* + * Content filtering overrides the address resolver. + * + * XXX Bypass content_filter inspection for user-generated probes + * (sendmail -bv). MTA-generated probes never have the "please filter + * me" bits turned on, but we handle them here anyway for the sake of + * future proofing. + */ +#define FILTER_WITHOUT_NEXTHOP(filter, next) \ + (((next) = split_at((filter), ':')) == 0 || *(next) == 0) + +#define RCPT_WITHOUT_DOMAIN(rcpt, next) \ + ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0) + + else if (message->filter_xport + && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) { + reply.flags = 0; + vstring_strcpy(reply.transport, message->filter_xport); + if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop) + && *(nexthop = var_def_filter_nexthop) == 0 + && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop)) + nexthop = var_myhostname; + vstring_strcpy(reply.nexthop, nexthop); + vstring_strcpy(reply.recipient, recipient->address); + } + + /* + * Resolve the destination to (transport, nexthop, address). The + * result address may differ from the one specified by the sender. + */ + else { + if (qmgr_resolve_one(message, recipient, + recipient->address, &reply) < 0) + continue; + if (!STREQ(recipient->address, STR(reply.recipient))) + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + } + + /* + * Bounce null recipients. This should never happen, but is most + * likely the result of a fault in a different program, so aborting + * the queue manager process does not help. + */ + if (recipient->address[0] == 0) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR, + "5.1.3 null recipient address"); + } + + /* + * Redirect a forced-to-expire message without defer log to the retry + * service, so that its defer log will contain an appropriate reason. + * Do not redirect such a message to the error service, because if + * that request fails, a defer log would be created with reason + * "bounce or trace service failure" which would make no sense. Note + * that if the bounce service fails to create a defer log, the + * message will be returned as undeliverable anyway, because it is + * expired. + */ + if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.7.0 message is administratively expired"); + } + + /* + * Discard mail to the local double bounce address here, so this + * system can run without a local delivery agent. They'd still have + * to configure something for mail directed to the local postmaster, + * though, but that is an RFC requirement anyway. + * + * XXX This lookup should be done in the resolver, and the mail should + * be directed to a general-purpose null delivery agent. + */ + if (reply.flags & RESOLVE_CLASS_LOCAL) { + at = strrchr(STR(reply.recipient), '@'); + len = (at ? (at - STR(reply.recipient)) + : strlen(STR(reply.recipient))); + if (strncasecmp_utf8(STR(reply.recipient), + var_double_bounce_sender, len) == 0 + && !var_double_bounce_sender[len]) { + status = sent(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", DSN_SIMPLE(&dsn, "2.0.0", + "undeliverable postmaster notification discarded")); + if (status == 0) { + deliver_completed(message->fp, recipient->offset); +#if 0 + /* It's the default verification probe sender address. */ + msg_warn("%s: undeliverable postmaster notification discarded", + message->queue_id); +#endif + } else + message->flags |= status; + continue; + } + } + + /* + * Optionally defer deliveries over specific transports, unless the + * restriction is lifted temporarily. + */ + if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) { + if (defer_xport_argv == 0) + defer_xport_argv = argv_split(var_defer_xports, CHARS_COMMA_SP); + for (cpp = defer_xport_argv->argv; *cpp; cpp++) + if (strcmp(*cpp, STR(reply.transport)) == 0) + break; + if (*cpp) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.3.2 deferred transport"); + } + } + + /* + * Safety: defer excess address verification requests. + */ + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0 + && qmgr_vrfy_pend_count > var_vrfy_pend_limit) + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.3.2 Too many address verification requests"); + + /* + * Look up or instantiate the proper transport. + */ + if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) { + if ((transport = qmgr_transport_find(STR(reply.transport))) == 0) + transport = qmgr_transport_create(STR(reply.transport)); + queue = 0; + } + + /* + * This message is being flushed. If need-be unthrottle the + * transport. + */ + if ((message->qflags & QMGR_FLUSH_EACH) != 0 + && QMGR_TRANSPORT_THROTTLED(transport)) + qmgr_transport_unthrottle(transport); + + /* + * This transport is dead. Defer delivery to this recipient. + */ + if (QMGR_TRANSPORT_THROTTLED(transport)) { + saved_dsn = transport->dsn; + if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) { + nexthop = qmgr_error_nexthop(saved_dsn); + vstring_strcpy(reply.nexthop, nexthop); + myfree(nexthop); + queue = 0; + } else { + qmgr_defer_recipient(message, recipient, saved_dsn); + continue; + } + } + + /* + * The nexthop destination provides the default name for the + * per-destination queue. When the delivery agent accepts only one + * recipient per delivery, give each recipient its own queue, so that + * deliveries to different recipients of the same message can happen + * in parallel, and so that we can enforce per-recipient concurrency + * limits and prevent one recipient from tying up all the delivery + * agent resources. We use recipient@nexthop as queue name rather + * than the actual recipient domain name, so that one recipient in + * multiple equivalent domains cannot evade the per-recipient + * concurrency limit. Split the address on the recipient delimiter if + * one is defined, so that extended addresses don't get extra + * delivery slots. + * + * Fold the result to lower case so that we don't have multiple queues + * for the same name. + * + * Important! All recipients in a queue must have the same nexthop + * value. It is OK to have multiple queues with the same nexthop + * value, but only when those queues are named after recipients. + * + * The single-recipient code below was written for local(8) like + * delivery agents, and assumes that all domains that deliver to the + * same (transport + nexthop) are aliases for $nexthop. Delivery + * concurrency is changed from per-domain into per-recipient, by + * changing the queue name from nexthop into localpart@nexthop. + * + * XXX This assumption is incorrect when different destinations share + * the same (transport + nexthop). In reality, such transports are + * rarely configured to use single-recipient deliveries. The fix is + * to decouple the per-destination recipient limit from the + * per-destination concurrency. + */ + vstring_strcpy(queue_name, STR(reply.nexthop)); + if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0 + && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0 + && transport->recipient_limit == 1) { + /* Copy the recipient localpart. */ + at = strrchr(STR(reply.recipient), '@'); + len = (at ? (at - STR(reply.recipient)) + : strlen(STR(reply.recipient))); + vstring_strncpy(queue_name, STR(reply.recipient), len); + /* Remove the address extension from the recipient localpart. */ + if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim)) + vstring_truncate(queue_name, strlen(STR(queue_name))); + /* Assume the recipient domain is equivalent to nexthop. */ + vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop)); + } + lowercase(STR(queue_name)); + + /* + * This transport is alive. Find or instantiate a queue for this + * recipient. + */ + if (queue == 0 || !STREQ(queue->name, STR(queue_name))) { + if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0) + queue = qmgr_queue_create(transport, STR(queue_name), + STR(reply.nexthop)); + } + + /* + * This message is being flushed. If need-be unthrottle the queue. + */ + if ((message->qflags & QMGR_FLUSH_EACH) != 0 + && QMGR_QUEUE_THROTTLED(queue)) + qmgr_queue_unthrottle(queue); + + /* + * This queue is dead. Defer delivery to this recipient. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + saved_dsn = queue->dsn; + if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) { + qmgr_defer_recipient(message, recipient, saved_dsn); + continue; + } + } + + /* + * This queue is alive. Bind this recipient to this queue instance. + */ + recipient->u.queue = queue; + } + resolve_clnt_free(&reply); + vstring_free(queue_name); +} + +/* qmgr_message_assign - assign recipients to specific delivery requests */ + +static void qmgr_message_assign(QMGR_MESSAGE *message) +{ + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *recipient; + QMGR_ENTRY *entry = 0; + QMGR_QUEUE *queue; + QMGR_JOB *job = 0; + QMGR_PEER *peer = 0; + + /* + * Try to bundle as many recipients in a delivery request as we can. When + * the recipient resolves to the same site and transport as an existing + * recipient, do not create a new queue entry, just move that recipient + * to the recipient list of the existing queue entry. All this provided + * that we do not exceed the transport-specific limit on the number of + * recipients per transaction. + */ +#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit))) + + for (recipient = list.info; recipient < list.info + list.len; recipient++) { + + /* + * Skip recipients with a dead transport or destination. + */ + if ((queue = recipient->u.queue) == 0) + continue; + + /* + * Lookup or instantiate the message job if necessary. + */ + if (job == 0 || queue->transport != job->transport) { + job = qmgr_job_obtain(message, queue->transport); + peer = 0; + } + + /* + * Lookup or instantiate job peer if necessary. + */ + if (peer == 0 || queue != peer->queue) + peer = qmgr_peer_obtain(job, queue); + + /* + * Lookup old or instantiate new recipient entry. We try to reuse the + * last existing entry whenever the recipient limit permits. + */ + entry = peer->entry_list.prev; + if (message->single_rcpt || entry == 0 + || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len)) + entry = qmgr_entry_create(peer, message); + + /* + * Add the recipient to the current entry and increase all those + * recipient counters accordingly. + */ + recipient_list_add(&entry->rcpt_list, recipient->offset, + recipient->dsn_orcpt, recipient->dsn_notify, + recipient->orig_addr, recipient->address); + job->rcpt_count++; + message->rcpt_count++; + qmgr_recipient_count++; + } + + /* + * Release the message recipient list and reinitialize it for the next + * time. + */ + recipient_list_free(&message->rcpt_list); + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + + /* + * Note that even if qmgr_job_obtain() reset the job candidate cache of + * all transports to which we assigned new recipients, this message may + * have other jobs which we didn't touch at all this time. But the number + * of unread recipients affecting the candidate selection might have + * changed considerably, so we must invalidate the caches if it might be + * of some use. + */ + for (job = message->job_list.next; job; job = job->message_peers.next) + if (job->selected_entries < job->read_entries + && job->blocker_tag != job->transport->blocker_tag) + job->transport->candidate_cache_current = 0; +} + +/* qmgr_message_move_limits - recycle unused recipient slots */ + +static void qmgr_message_move_limits(QMGR_MESSAGE *message) +{ + QMGR_JOB *job; + + for (job = message->job_list.next; job; job = job->message_peers.next) + qmgr_job_move_limits(job); +} + +/* qmgr_message_free - release memory for in-core message structure */ + +void qmgr_message_free(QMGR_MESSAGE *message) +{ + QMGR_JOB *job; + + if (message->refcount != 0) + msg_panic("qmgr_message_free: reference len: %d", message->refcount); + if (message->fp) + msg_panic("qmgr_message_free: queue file is open"); + while ((job = message->job_list.next) != 0) + qmgr_job_free(job); + myfree(message->queue_id); + myfree(message->queue_name); + if (message->dsn_envid) + myfree(message->dsn_envid); + if (message->encoding) + myfree(message->encoding); + if (message->sender) + myfree(message->sender); + if (message->verp_delims) + myfree(message->verp_delims); + if (message->filter_xport) + myfree(message->filter_xport); + if (message->inspect_xport) + myfree(message->inspect_xport); + if (message->redirect_addr) + myfree(message->redirect_addr); + if (message->client_name) + myfree(message->client_name); + if (message->client_addr) + myfree(message->client_addr); + if (message->client_port) + myfree(message->client_port); + if (message->client_proto) + myfree(message->client_proto); + if (message->client_helo) + myfree(message->client_helo); + if (message->sasl_method) + myfree(message->sasl_method); + if (message->sasl_username) + myfree(message->sasl_username); + if (message->sasl_sender) + myfree(message->sasl_sender); + if (message->log_ident) + myfree(message->log_ident); + if (message->rewrite_context) + myfree(message->rewrite_context); + recipient_list_free(&message->rcpt_list); + qmgr_message_count--; + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) + qmgr_vrfy_pend_count--; + myfree((void *) message); +} + +/* qmgr_message_alloc - create in-core message structure */ + +QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, + int qflags, mode_t mode) +{ + const char *myname = "qmgr_message_alloc"; + QMGR_MESSAGE *message; + struct stat st; + + if (msg_verbose) + msg_info("%s: %s %s", myname, queue_name, queue_id); + + /* + * Create an in-core message structure. + */ + message = qmgr_message_create(queue_name, queue_id, qflags); + + /* + * Extract message envelope information: time of arrival, sender address, + * recipient addresses. Skip files with malformed envelope information. + */ +#define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT) + + if (qmgr_message_open(message) < 0) { + qmgr_message_free(message); + return (0); + } + if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) { + msg_info("%s: skipped, still being delivered", queue_id); + qmgr_message_close(message); + qmgr_message_free(message); + return (QMGR_MESSAGE_LOCKED); + } + if (qmgr_message_read(message) < 0) { + qmgr_message_close(message); + qmgr_message_free(message); + return (0); + } else { + + /* + * We have validated the queue file content, so it is safe to modify + * the file properties now. + */ + if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0) + msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp)); + + /* + * If this message is forced to expire, use the existing defer + * logfile records and do not assign any deliveries, leaving the + * refcount at zero. If this message is forced to expire, but no + * defer logfile records are available, assign deliveries to the + * retry transport so that the sender will still find out what + * recipients are affected and why. Either way, do not assign normal + * deliveries because that would be undesirable especially with mail + * that was expired in the 'hold' queue. + */ + if ((message->qflags & QMGR_FORCE_EXPIRE) != 0 + && stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_DEFER, + queue_id), &st) == 0 && st.st_size > 0) { + /* Use this defer log; don't assign deliveries (refcount == 0). */ + message->flags = 1; /* simplify downstream code */ + qmgr_message_close(message); + return (message); + } + + /* + * Reset the defer log. This code should not be here, but we must + * reset the defer log *after* acquiring the exclusive lock on the + * queue file and *before* resolving new recipients. Since all those + * operations are encapsulated so nicely by this routine, the defer + * log reset has to be done here as well. + * + * Note: it is safe to remove the defer logfile from a previous queue + * run of this queue file, because the defer log contains information + * about recipients that still exist in this queue file. + */ + if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT) + msg_fatal("%s: %s: remove %s %s: %m", myname, + queue_id, MAIL_QUEUE_DEFER, queue_id); + qmgr_message_sort(message); + qmgr_message_resolve(message); + qmgr_message_sort(message); + qmgr_message_assign(message); + qmgr_message_close(message); + if (message->rcpt_offset == 0) + qmgr_message_move_limits(message); + return (message); + } +} + +/* qmgr_message_realloc - refresh in-core message structure */ + +QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_message_realloc"; + + /* + * Sanity checks. + */ + if (message->rcpt_offset <= 0) + msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset); + if (msg_verbose) + msg_info("%s: %s %s offset %ld", myname, message->queue_name, + message->queue_id, message->rcpt_offset); + + /* + * Extract recipient addresses. Skip files with malformed envelope + * information. + */ + if (qmgr_message_open(message) < 0) + return (0); + if (qmgr_message_read(message) < 0) { + qmgr_message_close(message); + return (0); + } else { + qmgr_message_sort(message); + qmgr_message_resolve(message); + qmgr_message_sort(message); + qmgr_message_assign(message); + qmgr_message_close(message); + if (message->rcpt_offset == 0) + qmgr_message_move_limits(message); + return (message); + } +} |