summaryrefslogtreecommitdiffstats
path: root/src/qmgr/qmgr_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/qmgr/qmgr_queue.c')
-rw-r--r--src/qmgr/qmgr_queue.c439
1 files changed, 439 insertions, 0 deletions
diff --git a/src/qmgr/qmgr_queue.c b/src/qmgr/qmgr_queue.c
new file mode 100644
index 0000000..d2ffe09
--- /dev/null
+++ b/src/qmgr/qmgr_queue.c
@@ -0,0 +1,439 @@
+/*++
+/* NAME
+/* qmgr_queue 3
+/* SUMMARY
+/* per-destination queues
+/* SYNOPSIS
+/* #include "qmgr.h"
+/*
+/* int qmgr_queue_count;
+/*
+/* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
+/* QMGR_TRANSPORT *transport;
+/* const char *name;
+/* const char *nexthop;
+/*
+/* void qmgr_queue_done(queue)
+/* QMGR_QUEUE *queue;
+/*
+/* QMGR_QUEUE *qmgr_queue_find(transport, name)
+/* QMGR_TRANSPORT *transport;
+/* const char *name;
+/*
+/* void qmgr_queue_throttle(queue, dsn)
+/* QMGR_QUEUE *queue;
+/* DSN *dsn;
+/*
+/* void qmgr_queue_unthrottle(queue)
+/* QMGR_QUEUE *queue;
+/*
+/* void qmgr_queue_suspend(queue, delay)
+/* QMGR_QUEUE *queue;
+/* int delay;
+/* DESCRIPTION
+/* These routines add/delete/manipulate per-destination queues.
+/* Each queue corresponds to a specific transport and destination.
+/* Each queue has a `todo' list of delivery requests for that
+/* destination, and a `busy' list of delivery requests in progress.
+/*
+/* qmgr_queue_count is a global counter for the total number
+/* of in-core queue structures.
+/*
+/* qmgr_queue_create() creates an empty named queue for the named
+/* transport and destination. The queue is given an initial
+/* concurrency limit as specified with the
+/* \fIinitial_destination_concurrency\fR configuration parameter,
+/* provided that it does not exceed the transport-specific
+/* concurrency limit.
+/*
+/* qmgr_queue_done() disposes of a per-destination queue after all
+/* its entries have been taken care of. It is an error to dispose
+/* of a dead queue.
+/*
+/* qmgr_queue_find() looks up the named queue for the named
+/* transport. A null result means that the queue was not found.
+/*
+/* qmgr_queue_throttle() handles a delivery error, and decrements the
+/* concurrency limit for the destination, with a lower bound of 1.
+/* When the cohort failure bound is reached, qmgr_queue_throttle()
+/* sets the concurrency limit to zero and starts a timer
+/* to re-enable delivery to the destination after a configurable delay.
+/*
+/* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
+/* The concurrency limit for the destination is incremented,
+/* provided that it does not exceed the destination concurrency
+/* limit specified for the transport. This routine implements
+/* "slow open" mode, and eliminates the "thundering herd" problem.
+/*
+/* qmgr_queue_suspend() suspends delivery for this destination
+/* briefly. This function invalidates any scheduling decisions
+/* that are based on the present queue's concurrency window.
+/* To compensate for work skipped by qmgr_entry_done(), the
+/* status of blocker jobs is re-evaluated after the queue is
+/* resumed.
+/* DIAGNOSTICS
+/* Panic: consistency check failure.
+/* LICENSE
+/* .ad
+/* .fi
+/* The Secure Mailer license must be distributed with this software.
+/* AUTHOR(S)
+/* Wietse Venema
+/* IBM T.J. Watson Research
+/* P.O. Box 704
+/* Yorktown Heights, NY 10598, USA
+/*
+/* Pre-emptive scheduler enhancements:
+/* Patrik Rak
+/* Modra 6
+/* 155 00, Prague, Czech Republic
+/*
+/* Concurrency scheduler enhancements with:
+/* Victor Duchovni
+/* Morgan Stanley
+/*--*/
+
+/* System library. */
+
+#include <sys_defs.h>
+#include <time.h>
+
+/* Utility library. */
+
+#include <msg.h>
+#include <mymalloc.h>
+#include <events.h>
+#include <htable.h>
+
+/* Global library. */
+
+#include <mail_params.h>
+#include <recipient_list.h>
+#include <mail_proto.h> /* QMGR_LOG_WINDOW */
+
+/* Application-specific. */
+
+#include "qmgr.h"
+
+int qmgr_queue_count;
+
+#define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
+ (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
+ || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
+
+#define QMGR_LOG_FEEDBACK(feedback) \
+ if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
+ msg_info("%s: feedback %g", myname, feedback);
+
+#define QMGR_LOG_WINDOW(queue) \
+ if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
+ msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
+ myname, queue->name, queue->transport->dest_concurrency_limit, \
+ queue->window, queue->success, queue->failure, queue->fail_cohorts);
+
+/* qmgr_queue_resume - resume delivery to destination */
+
+static void qmgr_queue_resume(int event, void *context)
+{
+ QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
+ const char *myname = "qmgr_queue_resume";
+
+ /*
+ * Sanity checks.
+ */
+ if (!QMGR_QUEUE_SUSPENDED(queue))
+ msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
+
+ /*
+ * We can't simply force delivery on this queue: the transport's pending
+ * count may already be maxed out, and there may be other constraints
+ * that definitely should be none of our business. The best we can do is
+ * to play by the same rules as everyone else: let qmgr_active_drain()
+ * and round-robin selection take care of message selection.
+ */
+ queue->window = 1;
+
+ /*
+ * Every event handler that leaves a queue in the "ready" state should
+ * remove the queue when it is empty.
+ *
+ * XXX Do not omit the redundant test below. It is here to simplify code
+ * consistency checks. The check is trivially eliminated by the compiler
+ * optimizer. There is no need to sacrifice code clarity for the sake of
+ * performance.
+ *
+ * XXX Do not expose the blocker job logic here. Rate-limited queues are not
+ * a performance-critical feature. Here, too, there is no need to
+ * sacrifice code clarity for the sake of performance.
+ */
+ if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
+ qmgr_queue_done(queue);
+ else
+ qmgr_job_blocker_update(queue);
+}
+
+/* qmgr_queue_suspend - briefly suspend a destination */
+
+void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
+{
+ const char *myname = "qmgr_queue_suspend";
+
+ /*
+ * Sanity checks.
+ */
+ if (!QMGR_QUEUE_READY(queue))
+ msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
+ if (queue->busy_refcount > 0)
+ msg_panic("%s: queue is busy", myname);
+
+ /*
+ * Set the queue status to "suspended". No-one is supposed to remove a
+ * queue in suspended state.
+ */
+ queue->window = QMGR_QUEUE_STAT_SUSPENDED;
+ event_request_timer(qmgr_queue_resume, (void *) queue, delay);
+}
+
+/* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
+
+static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context)
+{
+ QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
+
+ /*
+ * This routine runs when a wakeup timer goes off; it does not run in the
+ * context of some queue manipulation. Therefore, it is safe to discard
+ * this in-core queue when it is empty and when this site is not dead.
+ */
+ qmgr_queue_unthrottle(queue);
+ if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
+ qmgr_queue_done(queue);
+}
+
+/* qmgr_queue_unthrottle - give this destination another chance */
+
+void qmgr_queue_unthrottle(QMGR_QUEUE *queue)
+{
+ const char *myname = "qmgr_queue_unthrottle";
+ QMGR_TRANSPORT *transport = queue->transport;
+ double feedback;
+
+ if (msg_verbose)
+ msg_info("%s: queue %s", myname, queue->name);
+
+ /*
+ * Sanity checks.
+ */
+ if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue))
+ msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
+
+ /*
+ * Don't restart the negative feedback hysteresis cycle with every
+ * positive feedback. Restart it only when we make a positive concurrency
+ * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
+ * Otherwise negative feedback would be too aggressive: negative feedback
+ * takes effect immediately at the start of its hysteresis cycle.
+ */
+ queue->fail_cohorts = 0;
+
+ /*
+ * Special case when this site was dead.
+ */
+ if (QMGR_QUEUE_THROTTLED(queue)) {
+ event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue);
+ if (queue->dsn == 0)
+ msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
+ dsn_free(queue->dsn);
+ queue->dsn = 0;
+ /* Back from the almost grave, best concurrency is anyone's guess. */
+ if (queue->busy_refcount > 0)
+ queue->window = queue->busy_refcount;
+ else
+ queue->window = transport->init_dest_concurrency;
+ queue->success = queue->failure = 0;
+ QMGR_LOG_WINDOW(queue);
+ return;
+ }
+
+ /*
+ * Increase the destination's concurrency limit until we reach the
+ * transport's concurrency limit. Allow for a margin the size of the
+ * initial destination concurrency, so that we're not too gentle.
+ *
+ * Why is the concurrency increment based on preferred concurrency and not
+ * on the number of outstanding delivery requests? The latter fluctuates
+ * wildly when deliveries complete in bursts (artificial benchmark
+ * measurements), and does not account for cached connections.
+ *
+ * Keep the window within reasonable distance from actual concurrency
+ * otherwise negative feedback will be ineffective. This expression
+ * assumes that busy_refcount changes gradually. This is invalid when
+ * deliveries complete in bursts (artificial benchmark measurements).
+ */
+ if (transport->dest_concurrency_limit == 0
+ || transport->dest_concurrency_limit > queue->window)
+ if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
+ feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
+ QMGR_LOG_FEEDBACK(feedback);
+ queue->success += feedback;
+ /* Prepare for overshoot (feedback > hysteresis, rounding error). */
+ while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
+ queue->window += transport->pos_feedback.hysteresis;
+ queue->success -= transport->pos_feedback.hysteresis;
+ queue->failure = 0;
+ }
+ /* Prepare for overshoot. */
+ if (transport->dest_concurrency_limit > 0
+ && queue->window > transport->dest_concurrency_limit)
+ queue->window = transport->dest_concurrency_limit;
+ }
+ QMGR_LOG_WINDOW(queue);
+}
+
+/* qmgr_queue_throttle - handle destination delivery failure */
+
+void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
+{
+ const char *myname = "qmgr_queue_throttle";
+ QMGR_TRANSPORT *transport = queue->transport;
+ double feedback;
+
+ /*
+ * Sanity checks.
+ */
+ if (!QMGR_QUEUE_READY(queue))
+ msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
+ if (queue->dsn)
+ msg_panic("%s: queue %s: spurious reason %s",
+ myname, queue->name, queue->dsn->reason);
+ if (msg_verbose)
+ msg_info("%s: queue %s: %s %s",
+ myname, queue->name, dsn->status, dsn->reason);
+
+ /*
+ * Don't restart the positive feedback hysteresis cycle with every
+ * negative feedback. Restart it only when we make a negative concurrency
+ * adjustment (i.e. at the start of a negative feedback hysteresis
+ * cycle). Otherwise positive feedback would be too weak (positive
+ * feedback does not take effect until the end of its hysteresis cycle).
+ */
+
+ /*
+ * This queue is declared dead after a configurable number of
+ * pseudo-cohort failures.
+ */
+ if (QMGR_QUEUE_READY(queue)) {
+ queue->fail_cohorts += 1.0 / queue->window;
+ if (transport->fail_cohort_limit > 0
+ && queue->fail_cohorts >= transport->fail_cohort_limit)
+ queue->window = QMGR_QUEUE_STAT_THROTTLED;
+ }
+
+ /*
+ * Decrease the destination's concurrency limit until we reach 1. Base
+ * adjustments on the concurrency limit itself, instead of using the
+ * actual concurrency. The latter fluctuates wildly when deliveries
+ * complete in bursts (artificial benchmark measurements).
+ *
+ * Even after reaching 1, we maintain the negative hysteresis cycle so that
+ * negative feedback can cancel out positive feedback.
+ */
+ if (QMGR_QUEUE_READY(queue)) {
+ feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
+ QMGR_LOG_FEEDBACK(feedback);
+ queue->failure -= feedback;
+ /* Prepare for overshoot (feedback > hysteresis, rounding error). */
+ while (queue->failure - feedback / 2 < 0) {
+ queue->window -= transport->neg_feedback.hysteresis;
+ queue->success = 0;
+ queue->failure += transport->neg_feedback.hysteresis;
+ }
+ /* Prepare for overshoot. */
+ if (queue->window < 1)
+ queue->window = 1;
+ }
+
+ /*
+ * Special case for a site that just was declared dead.
+ */
+ if (QMGR_QUEUE_THROTTLED(queue)) {
+ queue->dsn = DSN_COPY(dsn);
+ event_request_timer(qmgr_queue_unthrottle_wrapper,
+ (void *) queue, var_min_backoff_time);
+ queue->dflags = 0;
+ }
+ QMGR_LOG_WINDOW(queue);
+}
+
+/* qmgr_queue_done - delete in-core queue for site */
+
+void qmgr_queue_done(QMGR_QUEUE *queue)
+{
+ const char *myname = "qmgr_queue_done";
+ QMGR_TRANSPORT *transport = queue->transport;
+
+ /*
+ * Sanity checks. It is an error to delete an in-core queue with pending
+ * messages or timers.
+ */
+ if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
+ msg_panic("%s: refcount: %d", myname,
+ queue->busy_refcount + queue->todo_refcount);
+ if (queue->todo.next || queue->busy.next)
+ msg_panic("%s: queue not empty: %s", myname, queue->name);
+ if (!QMGR_QUEUE_READY(queue))
+ msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
+ if (queue->dsn)
+ msg_panic("%s: queue %s: spurious reason %s",
+ myname, queue->name, queue->dsn->reason);
+
+ /*
+ * Clean up this in-core queue.
+ */
+ QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers);
+ htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0);
+ myfree(queue->name);
+ myfree(queue->nexthop);
+ qmgr_queue_count--;
+ myfree((void *) queue);
+}
+
+/* qmgr_queue_create - create in-core queue for site */
+
+QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
+ const char *nexthop)
+{
+ QMGR_QUEUE *queue;
+
+ /*
+ * If possible, choose an initial concurrency of > 1 so that one bad
+ * message or one bad network won't slow us down unnecessarily.
+ */
+
+ queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
+ qmgr_queue_count++;
+ queue->dflags = 0;
+ queue->last_done = 0;
+ queue->name = mystrdup(name);
+ queue->nexthop = mystrdup(nexthop);
+ queue->todo_refcount = 0;
+ queue->busy_refcount = 0;
+ queue->transport = transport;
+ queue->window = transport->init_dest_concurrency;
+ queue->success = queue->failure = queue->fail_cohorts = 0;
+ QMGR_LIST_INIT(queue->todo);
+ QMGR_LIST_INIT(queue->busy);
+ queue->dsn = 0;
+ queue->clog_time_to_warn = 0;
+ queue->blocker_tag = 0;
+ QMGR_LIST_APPEND(transport->queue_list, queue, peers);
+ htable_enter(transport->queue_byname, name, (void *) queue);
+ return (queue);
+}
+
+/* qmgr_queue_find - find in-core named queue */
+
+QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
+{
+ return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
+}