summaryrefslogtreecommitdiffstats
path: root/src/oqmgr/qmgr_entry.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/oqmgr/qmgr_entry.c')
-rw-r--r--src/oqmgr/qmgr_entry.c391
1 files changed, 391 insertions, 0 deletions
diff --git a/src/oqmgr/qmgr_entry.c b/src/oqmgr/qmgr_entry.c
new file mode 100644
index 0000000..d5f3264
--- /dev/null
+++ b/src/oqmgr/qmgr_entry.c
@@ -0,0 +1,391 @@
+/*++
+/* NAME
+/* qmgr_entry 3
+/* SUMMARY
+/* per-site queue entries
+/* SYNOPSIS
+/* #include "qmgr.h"
+/*
+/* QMGR_ENTRY *qmgr_entry_create(queue, message)
+/* QMGR_QUEUE *queue;
+/* QMGR_MESSAGE *message;
+/*
+/* void qmgr_entry_done(entry, which)
+/* QMGR_ENTRY *entry;
+/* int which;
+/*
+/* QMGR_ENTRY *qmgr_entry_select(queue)
+/* QMGR_QUEUE *queue;
+/*
+/* void qmgr_entry_unselect(queue, entry)
+/* QMGR_QUEUE *queue;
+/* QMGR_ENTRY *entry;
+/*
+/* void qmgr_entry_move_todo(dst, entry)
+/* QMGR_QUEUE *dst;
+/* QMGR_ENTRY *entry;
+/* DESCRIPTION
+/* These routines add/delete/manipulate per-site message
+/* delivery requests.
+/*
+/* qmgr_entry_create() creates an entry for the named queue and
+/* message, and appends the entry to the queue's todo list.
+/* Filling in and cleaning up the recipients is the responsibility
+/* of the caller.
+/*
+/* qmgr_entry_done() discards a per-site queue entry. The
+/* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
+/* of the site's `busy' list (i.e. queue entries that have been
+/* selected for actual delivery), or QMGR_QUEUE_TODO for an entry
+/* of the site's `todo' list (i.e. queue entries awaiting selection
+/* for actual delivery).
+/*
+/* qmgr_entry_done() triggers cleanup of the per-site queue when
+/* the site has no pending deliveries, and the site is either
+/* alive, or the site is dead and the number of in-core queues
+/* exceeds a configurable limit (see qmgr_queue_done()).
+/*
+/* qmgr_entry_done() triggers special action when the last in-core
+/* queue entry for a message is done with: either read more
+/* recipients from the queue file, delete the queue file, or move
+/* the queue file to the deferred queue; send bounce reports to the
+/* message originator (see qmgr_active_done()).
+/*
+/* qmgr_entry_select() selects the next entry from the named
+/* per-site queue's `todo' list for actual delivery. The entry is
+/* moved to the queue's `busy' list: the list of messages being
+/* delivered.
+/*
+/* qmgr_entry_unselect() takes the named entry off the named
+/* per-site queue's `busy' list and moves it to the queue's
+/* `todo' list.
+/*
+/* qmgr_entry_move_todo() moves the specified "todo" queue entry
+/* to the specified "todo" queue.
+/* DIAGNOSTICS
+/* Panic: interface violations, internal inconsistencies.
+/* LICENSE
+/* .ad
+/* .fi
+/* The Secure Mailer license must be distributed with this software.
+/* AUTHOR(S)
+/* Wietse Venema
+/* IBM T.J. Watson Research
+/* P.O. Box 704
+/* Yorktown Heights, NY 10598, USA
+/*--*/
+
+/* System library. */
+
+#include <sys_defs.h>
+#include <stdlib.h>
+#include <time.h>
+
+/* Utility library. */
+
+#include <msg.h>
+#include <mymalloc.h>
+#include <events.h>
+#include <vstream.h>
+
+/* Global library. */
+
+#include <mail_params.h>
+#include <deliver_request.h> /* opportunistic session caching */
+
+/* Application-specific. */
+
+#include "qmgr.h"
+
+/* qmgr_entry_select - select queue entry for delivery */
+
+QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *queue)
+{
+ const char *myname = "qmgr_entry_select";
+ QMGR_ENTRY *entry;
+
+ if ((entry = queue->todo.prev) != 0) {
+ QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry);
+ queue->todo_refcount--;
+ QMGR_LIST_APPEND(queue->busy, entry);
+ queue->busy_refcount++;
+
+ /*
+ * With opportunistic session caching, the delivery agent must not
+ * only 1) save a session upon completion, but also 2) reuse a cached
+ * session upon the next delivery request. In order to not miss out
+ * on 2), we have to make caching sticky or else we get silly
+ * behavior when the in-memory queue drains. Specifically, new
+ * connections must not be made as long as cached connections exist.
+ *
+ * Safety: don't enable opportunistic session caching unless the queue
+ * manager is able to schedule concurrent or back-to-back deliveries
+ * (we need to recognize back-to-back deliveries for transports with
+ * concurrency 1).
+ *
+ * If caching has previously been enabled, but is not now, fetch any
+ * existing entries from the cache, but don't add new ones.
+ */
+#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \
+ (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
+
+#define BACK_TO_BACK_DELIVERY() \
+ (queue->last_done + 1 >= event_time())
+
+ /*
+ * Turn on session caching after we get up to speed. Don't enable
+ * session caching just because we have concurrent deliveries. This
+ * prevents unnecessary session caching when we have a burst of mail
+ * <= the initial concurrency limit.
+ */
+ if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) {
+ if (BACK_TO_BACK_DELIVERY()) {
+ if (msg_verbose)
+ msg_info("%s: allowing on-demand session caching for %s",
+ myname, queue->name);
+ queue->dflags |= DEL_REQ_FLAG_CONN_MASK;
+ }
+ }
+
+ /*
+ * Turn off session caching when concurrency drops and we're running
+ * out of steam. This is what prevents from turning off session
+ * caching too early, and from making new connections while old ones
+ * are still cached.
+ */
+ else {
+ if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) {
+ if (msg_verbose)
+ msg_info("%s: disallowing on-demand session caching for %s",
+ myname, queue->name);
+ queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE;
+ }
+ }
+ }
+ return (entry);
+}
+
+/* qmgr_entry_unselect - unselect queue entry for delivery */
+
+void qmgr_entry_unselect(QMGR_QUEUE *queue, QMGR_ENTRY *entry)
+{
+ QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry);
+ queue->busy_refcount--;
+ QMGR_LIST_APPEND(queue->todo, entry);
+ queue->todo_refcount++;
+}
+
+/* qmgr_entry_move_todo - move entry between todo queues */
+
+void qmgr_entry_move_todo(QMGR_QUEUE *dst, QMGR_ENTRY *entry)
+{
+ const char *myname = "qmgr_entry_move_todo";
+ QMGR_MESSAGE *message = entry->message;
+ QMGR_QUEUE *src = entry->queue;
+ QMGR_ENTRY *new_entry;
+
+ if (entry->stream != 0)
+ msg_panic("%s: queue %s entry is busy", myname, src->name);
+ if (QMGR_QUEUE_THROTTLED(dst))
+ msg_panic("%s: destination queue %s is throttled", myname, dst->name);
+ if (QMGR_TRANSPORT_THROTTLED(dst->transport))
+ msg_panic("%s: destination transport %s is throttled",
+ myname, dst->transport->name);
+
+ /*
+ * Create new entry, swap the recipients between the old and new entries,
+ * then dispose of the old entry. This gives us any end-game actions that
+ * are implemented by qmgr_entry_done(), so we don't have to duplicate
+ * those actions here.
+ *
+ * XXX This does not enforce the per-entry recipient limit, but that is not
+ * a problem as long as qmgr_entry_move_todo() is called only to bounce
+ * or defer mail.
+ */
+ new_entry = qmgr_entry_create(dst, message);
+ recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list);
+ qmgr_entry_done(entry, QMGR_QUEUE_TODO);
+}
+
+/* qmgr_entry_done - dispose of queue entry */
+
+void qmgr_entry_done(QMGR_ENTRY *entry, int which)
+{
+ const char *myname = "qmgr_entry_done";
+ QMGR_QUEUE *queue = entry->queue;
+ QMGR_MESSAGE *message = entry->message;
+ QMGR_TRANSPORT *transport = queue->transport;
+
+ /*
+ * Take this entry off the in-core queue.
+ */
+ if (entry->stream != 0)
+ msg_panic("%s: file is open", myname);
+ if (which == QMGR_QUEUE_BUSY) {
+ QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry);
+ queue->busy_refcount--;
+ } else if (which == QMGR_QUEUE_TODO) {
+ QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry);
+ queue->todo_refcount--;
+ } else {
+ msg_panic("%s: bad queue spec: %d", myname, which);
+ }
+
+ /*
+ * Free the recipient list and decrease the in-core recipient count
+ * accordingly.
+ */
+ qmgr_recipient_count -= entry->rcpt_list.len;
+ recipient_list_free(&entry->rcpt_list);
+
+ myfree((void *) entry);
+
+ /*
+ * Maintain back-to-back delivery status.
+ */
+ if (which == QMGR_QUEUE_BUSY)
+ queue->last_done = event_time();
+
+ /*
+ * Suspend a rate-limited queue, so that mail trickles out.
+ */
+ if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
+ if (queue->window > 1)
+ msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
+ myname, transport->name, queue->name, queue->window);
+ if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */
+ qmgr_queue_unthrottle(queue);
+ if (QMGR_QUEUE_READY(queue))
+ qmgr_queue_suspend(queue, transport->rate_delay);
+ }
+
+ /*
+ * When the in-core queue for this site is empty and when this site is
+ * not dead, discard the in-core queue. When this site is dead, but the
+ * number of in-core queues exceeds some threshold, get rid of this
+ * in-core queue anyway, in order to avoid running out of memory.
+ *
+ * See also: qmgr_entry_move_todo().
+ */
+ if (queue->todo.next == 0 && queue->busy.next == 0) {
+ if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit)
+ qmgr_queue_unthrottle(queue);
+ if (QMGR_QUEUE_READY(queue))
+ qmgr_queue_done(queue);
+ }
+
+ /*
+ * Update the in-core message reference count. When the in-core message
+ * structure has no more references, dispose of the message.
+ *
+ * When the in-core recipient count falls below a threshold, and this
+ * message has more recipients, read more recipients now. If we read more
+ * recipients as soon as the recipient count falls below the in-core
+ * recipient limit, we do not give other messages a chance until this
+ * message is delivered. That's good for mailing list deliveries, bad for
+ * one-to-one mail. If we wait until the in-core recipient count drops
+ * well below the in-core recipient limit, we give other mail a chance,
+ * but we also allow list deliveries to become interleaved. In the worst
+ * case, people near the start of a mailing list get a burst of postings
+ * today, while people near the end of the list get that same burst of
+ * postings a whole day later.
+ */
+#define FUDGE(x) ((x) * (var_qmgr_fudge / 100.0))
+ message->refcount--;
+ if (message->rcpt_offset > 0
+ && qmgr_recipient_count < FUDGE(var_qmgr_rcpt_limit) - 100)
+ qmgr_message_realloc(message);
+ if (message->refcount == 0)
+ qmgr_active_done(message);
+}
+
+/* qmgr_entry_create - create queue todo entry */
+
+QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *queue, QMGR_MESSAGE *message)
+{
+ QMGR_ENTRY *entry;
+
+ /*
+ * Sanity check.
+ */
+ if (QMGR_QUEUE_THROTTLED(queue))
+ msg_panic("qmgr_entry_create: dead queue: %s", queue->name);
+
+ /*
+ * Create the delivery request.
+ */
+ entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
+ entry->stream = 0;
+ entry->message = message;
+ recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE);
+ message->refcount++;
+ entry->queue = queue;
+ QMGR_LIST_APPEND(queue->todo, entry);
+ queue->todo_refcount++;
+
+ /*
+ * Warn if a destination is falling behind while the active queue
+ * contains a non-trivial amount of single-recipient email. When a
+ * destination takes up more and more space in the active queue, then
+ * other mail will not get through and delivery performance will suffer.
+ *
+ * XXX At this point in the code, the busy reference count is still less
+ * than the concurrency limit (otherwise this code would not be invoked
+ * in the first place) so we have to make some awkward adjustments
+ * below.
+ *
+ * XXX The queue length test below looks at the active queue share of an
+ * individual destination. This catches the case where mail for one
+ * destination is falling behind because it has to round-robin compete
+ * with many other destinations. However, Postfix will also perform
+ * poorly when most of the active queue is tied up by a small number of
+ * concurrency limited destinations. The queue length test below detects
+ * such conditions only indirectly.
+ *
+ * XXX This code does not detect the case that the active queue is being
+ * starved because incoming mail is pounding the disk.
+ */
+ if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) {
+ int queue_length = queue->todo_refcount + queue->busy_refcount;
+ time_t now;
+ QMGR_TRANSPORT *transport;
+ double active_share;
+
+ if (queue_length > var_qmgr_active_limit / 5
+ && (now = event_time()) >= queue->clog_time_to_warn) {
+ active_share = queue_length / (double) qmgr_message_count;
+ msg_warn("mail for %s is using up %d of %d active queue entries",
+ queue->nexthop, queue_length, qmgr_message_count);
+ if (active_share < 0.9)
+ msg_warn("this may slow down other mail deliveries");
+ transport = queue->transport;
+ if (transport->dest_concurrency_limit > 0
+ && transport->dest_concurrency_limit <= queue->busy_refcount + 1)
+ msg_warn("you may need to increase the main.cf %s%s from %d",
+ transport->name, _DEST_CON_LIMIT,
+ transport->dest_concurrency_limit);
+ else if (queue->window > var_qmgr_active_limit * active_share)
+ msg_warn("you may need to increase the main.cf %s from %d",
+ VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit);
+ else if (queue->peers.next != queue->peers.prev)
+ msg_warn("you may need a separate master.cf transport for %s",
+ queue->nexthop);
+ else {
+ msg_warn("you may need to reduce %s connect and helo timeouts",
+ transport->name);
+ msg_warn("so that Postfix quickly skips unavailable hosts");
+ msg_warn("you may need to increase the main.cf %s and %s",
+ VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME);
+ msg_warn("so that Postfix wastes less time on undeliverable mail");
+ msg_warn("you may need to increase the master.cf %s process limit",
+ transport->name);
+ }
+ msg_warn("please avoid flushing the whole queue when you have");
+ msg_warn("lots of deferred mail, that is bad for performance");
+ msg_warn("to turn off these warnings specify: %s = 0",
+ VAR_QMGR_CLOG_WARN_TIME);
+ queue->clog_time_to_warn = now + var_qmgr_clog_warn_time;
+ }
+ }
+ return (entry);
+}