From b5896ba9f6047e7031e2bdee0622d543e11a6734 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 6 May 2024 03:46:30 +0200 Subject: Adding upstream version 3.4.23. Signed-off-by: Daniel Baumann --- src/qmgr/.indent.pro | 1 + src/qmgr/.printfck | 25 + src/qmgr/Makefile.in | 389 +++++++++++ src/qmgr/qmgr.c | 822 ++++++++++++++++++++++++ src/qmgr/qmgr.h | 545 ++++++++++++++++ src/qmgr/qmgr_active.c | 587 +++++++++++++++++ src/qmgr/qmgr_bounce.c | 71 ++ src/qmgr/qmgr_defer.c | 163 +++++ src/qmgr/qmgr_deliver.c | 458 +++++++++++++ src/qmgr/qmgr_enable.c | 107 +++ src/qmgr/qmgr_entry.c | 452 +++++++++++++ src/qmgr/qmgr_error.c | 121 ++++ src/qmgr/qmgr_feedback.c | 177 +++++ src/qmgr/qmgr_job.c | 978 ++++++++++++++++++++++++++++ src/qmgr/qmgr_message.c | 1570 +++++++++++++++++++++++++++++++++++++++++++++ src/qmgr/qmgr_move.c | 104 +++ src/qmgr/qmgr_peer.c | 154 +++++ src/qmgr/qmgr_queue.c | 439 +++++++++++++ src/qmgr/qmgr_scan.c | 185 ++++++ src/qmgr/qmgr_transport.c | 504 +++++++++++++++ 20 files changed, 7852 insertions(+) create mode 120000 src/qmgr/.indent.pro create mode 100644 src/qmgr/.printfck create mode 100644 src/qmgr/Makefile.in create mode 100644 src/qmgr/qmgr.c create mode 100644 src/qmgr/qmgr.h create mode 100644 src/qmgr/qmgr_active.c create mode 100644 src/qmgr/qmgr_bounce.c create mode 100644 src/qmgr/qmgr_defer.c create mode 100644 src/qmgr/qmgr_deliver.c create mode 100644 src/qmgr/qmgr_enable.c create mode 100644 src/qmgr/qmgr_entry.c create mode 100644 src/qmgr/qmgr_error.c create mode 100644 src/qmgr/qmgr_feedback.c create mode 100644 src/qmgr/qmgr_job.c create mode 100644 src/qmgr/qmgr_message.c create mode 100644 src/qmgr/qmgr_move.c create mode 100644 src/qmgr/qmgr_peer.c create mode 100644 src/qmgr/qmgr_queue.c create mode 100644 src/qmgr/qmgr_scan.c create mode 100644 src/qmgr/qmgr_transport.c (limited to 'src/qmgr') diff --git a/src/qmgr/.indent.pro b/src/qmgr/.indent.pro new file mode 120000 index 0000000..5c837ec --- /dev/null +++ b/src/qmgr/.indent.pro @@ -0,0 +1 @@ +../../.indent.pro \ No newline at end of file diff --git a/src/qmgr/.printfck b/src/qmgr/.printfck new file mode 100644 index 0000000..66016ed --- /dev/null +++ b/src/qmgr/.printfck @@ -0,0 +1,25 @@ +been_here_xt 2 0 +bounce_append 5 0 +cleanup_out_format 1 0 +defer_append 5 0 +mail_command 1 0 +mail_print 1 0 +msg_error 0 0 +msg_fatal 0 0 +msg_info 0 0 +msg_panic 0 0 +msg_warn 0 0 +opened 4 0 +post_mail_fprintf 1 0 +qmgr_message_bounce 2 0 +rec_fprintf 2 0 +sent 4 0 +smtp_cmd 1 0 +smtp_mesg_fail 2 0 +smtp_printf 1 0 +smtp_rcpt_fail 3 0 +smtp_site_fail 2 0 +udp_syslog 1 0 +vstream_fprintf 1 0 +vstream_printf 0 0 +vstring_sprintf 1 0 diff --git a/src/qmgr/Makefile.in b/src/qmgr/Makefile.in new file mode 100644 index 0000000..0f5771d --- /dev/null +++ b/src/qmgr/Makefile.in @@ -0,0 +1,389 @@ +SHELL = /bin/sh +SRCS = qmgr.c qmgr_active.c qmgr_transport.c qmgr_queue.c qmgr_entry.c \ + qmgr_message.c qmgr_deliver.c qmgr_move.c \ + qmgr_job.c qmgr_peer.c \ + qmgr_defer.c qmgr_enable.c qmgr_scan.c qmgr_bounce.c qmgr_error.c \ + qmgr_feedback.c +OBJS = qmgr.o qmgr_active.o qmgr_transport.o qmgr_queue.o qmgr_entry.o \ + qmgr_message.o qmgr_deliver.o qmgr_move.o \ + qmgr_job.o qmgr_peer.o \ + qmgr_defer.o qmgr_enable.o qmgr_scan.o qmgr_bounce.o qmgr_error.o \ + qmgr_feedback.o +HDRS = qmgr.h +TESTSRC = +DEFS = -I. -I$(INC_DIR) -D$(SYSTYPE) +CFLAGS = $(DEBUG) $(OPT) $(DEFS) +TESTPROG= +PROG = qmgr +INC_DIR = ../../include +LIBS = ../../lib/lib$(LIB_PREFIX)master$(LIB_SUFFIX) \ + ../../lib/lib$(LIB_PREFIX)global$(LIB_SUFFIX) \ + ../../lib/lib$(LIB_PREFIX)util$(LIB_SUFFIX) + +.c.o:; $(CC) $(CFLAGS) -c $*.c + +$(PROG): $(OBJS) $(LIBS) + $(CC) $(CFLAGS) $(SHLIB_RPATH) -o $@ $(OBJS) $(LIBS) $(SYSLIBS) + +$(OBJS): ../../conf/makedefs.out + +Makefile: Makefile.in + cat ../../conf/makedefs.out $? >$@ + +test: $(TESTPROG) + +tests: + +root_tests: + +update: ../../libexec/$(PROG) + +../../libexec/$(PROG): $(PROG) + cp $(PROG) ../../libexec/$(PROG) + +printfck: $(OBJS) $(PROG) + rm -rf printfck + mkdir printfck + cp *.h printfck + sed '1,/^# do not edit/!d' Makefile >printfck/Makefile + set -e; for i in *.c; do printfck -f .printfck $$i >printfck/$$i; done + cd printfck; make "INC_DIR=../../../include" `cd ..; ls *.o` + +lint: + lint $(DEFS) $(SRCS) $(LINTFIX) + +clean: + rm -f *.o *core $(PROG) $(TESTPROG) junk + rm -rf printfck + +tidy: clean + +depend: $(MAKES) + (sed '1,/^# do not edit/!d' Makefile.in; \ + set -e; for i in [a-z][a-z0-9]*.c; do \ + $(CC) -E $(DEFS) $(INCL) $$i | grep -v '[<>]' | sed -n -e '/^# *1 *"\([^"]*\)".*/{' \ + -e 's//'`echo $$i|sed 's/c$$/o/'`': \1/' \ + -e 's/o: \.\//o: /' -e p -e '}' ; \ + done | LANG=C sort -u) | grep -v '[.][o][:][ ][/]' >$$$$ && mv $$$$ Makefile.in + @$(EXPORT) make -f Makefile.in Makefile 1>&2 + +# do not edit below this line - it is generated by 'make depend' +qmgr.o: ../../include/argv.h +qmgr.o: ../../include/attr.h +qmgr.o: ../../include/check_arg.h +qmgr.o: ../../include/dict.h +qmgr.o: ../../include/dsn.h +qmgr.o: ../../include/events.h +qmgr.o: ../../include/flush_clnt.h +qmgr.o: ../../include/htable.h +qmgr.o: ../../include/iostuff.h +qmgr.o: ../../include/mail_conf.h +qmgr.o: ../../include/mail_flow.h +qmgr.o: ../../include/mail_params.h +qmgr.o: ../../include/mail_proto.h +qmgr.o: ../../include/mail_queue.h +qmgr.o: ../../include/mail_server.h +qmgr.o: ../../include/mail_version.h +qmgr.o: ../../include/master_proto.h +qmgr.o: ../../include/msg.h +qmgr.o: ../../include/myflock.h +qmgr.o: ../../include/mymalloc.h +qmgr.o: ../../include/nvtable.h +qmgr.o: ../../include/recipient_list.h +qmgr.o: ../../include/scan_dir.h +qmgr.o: ../../include/sys_defs.h +qmgr.o: ../../include/vbuf.h +qmgr.o: ../../include/vstream.h +qmgr.o: ../../include/vstring.h +qmgr.o: qmgr.c +qmgr.o: qmgr.h +qmgr_active.o: ../../include/abounce.h +qmgr_active.o: ../../include/attr.h +qmgr_active.o: ../../include/bounce.h +qmgr_active.o: ../../include/check_arg.h +qmgr_active.o: ../../include/defer.h +qmgr_active.o: ../../include/deliver_request.h +qmgr_active.o: ../../include/dsn.h +qmgr_active.o: ../../include/dsn_buf.h +qmgr_active.o: ../../include/dsn_mask.h +qmgr_active.o: ../../include/events.h +qmgr_active.o: ../../include/htable.h +qmgr_active.o: ../../include/mail_open_ok.h +qmgr_active.o: ../../include/mail_params.h +qmgr_active.o: ../../include/mail_queue.h +qmgr_active.o: ../../include/msg.h +qmgr_active.o: ../../include/msg_stats.h +qmgr_active.o: ../../include/mymalloc.h +qmgr_active.o: ../../include/nvtable.h +qmgr_active.o: ../../include/qmgr_user.h +qmgr_active.o: ../../include/rec_type.h +qmgr_active.o: ../../include/recipient_list.h +qmgr_active.o: ../../include/scan_dir.h +qmgr_active.o: ../../include/sys_defs.h +qmgr_active.o: ../../include/trace.h +qmgr_active.o: ../../include/vbuf.h +qmgr_active.o: ../../include/vstream.h +qmgr_active.o: ../../include/vstring.h +qmgr_active.o: ../../include/warn_stat.h +qmgr_active.o: qmgr.h +qmgr_active.o: qmgr_active.c +qmgr_bounce.o: ../../include/attr.h +qmgr_bounce.o: ../../include/bounce.h +qmgr_bounce.o: ../../include/check_arg.h +qmgr_bounce.o: ../../include/deliver_completed.h +qmgr_bounce.o: ../../include/deliver_request.h +qmgr_bounce.o: ../../include/dsn.h +qmgr_bounce.o: ../../include/dsn_buf.h +qmgr_bounce.o: ../../include/htable.h +qmgr_bounce.o: ../../include/msg_stats.h +qmgr_bounce.o: ../../include/mymalloc.h +qmgr_bounce.o: ../../include/nvtable.h +qmgr_bounce.o: ../../include/recipient_list.h +qmgr_bounce.o: ../../include/scan_dir.h +qmgr_bounce.o: ../../include/sys_defs.h +qmgr_bounce.o: ../../include/vbuf.h +qmgr_bounce.o: ../../include/vstream.h +qmgr_bounce.o: ../../include/vstring.h +qmgr_bounce.o: qmgr.h +qmgr_bounce.o: qmgr_bounce.c +qmgr_defer.o: ../../include/attr.h +qmgr_defer.o: ../../include/bounce.h +qmgr_defer.o: ../../include/check_arg.h +qmgr_defer.o: ../../include/defer.h +qmgr_defer.o: ../../include/deliver_request.h +qmgr_defer.o: ../../include/dsn.h +qmgr_defer.o: ../../include/dsn_buf.h +qmgr_defer.o: ../../include/htable.h +qmgr_defer.o: ../../include/iostuff.h +qmgr_defer.o: ../../include/mail_proto.h +qmgr_defer.o: ../../include/msg.h +qmgr_defer.o: ../../include/msg_stats.h +qmgr_defer.o: ../../include/mymalloc.h +qmgr_defer.o: ../../include/nvtable.h +qmgr_defer.o: ../../include/recipient_list.h +qmgr_defer.o: ../../include/scan_dir.h +qmgr_defer.o: ../../include/sys_defs.h +qmgr_defer.o: ../../include/vbuf.h +qmgr_defer.o: ../../include/vstream.h +qmgr_defer.o: ../../include/vstring.h +qmgr_defer.o: qmgr.h +qmgr_defer.o: qmgr_defer.c +qmgr_deliver.o: ../../include/attr.h +qmgr_deliver.o: ../../include/check_arg.h +qmgr_deliver.o: ../../include/deliver_request.h +qmgr_deliver.o: ../../include/dsb_scan.h +qmgr_deliver.o: ../../include/dsn.h +qmgr_deliver.o: ../../include/dsn_buf.h +qmgr_deliver.o: ../../include/dsn_util.h +qmgr_deliver.o: ../../include/events.h +qmgr_deliver.o: ../../include/htable.h +qmgr_deliver.o: ../../include/iostuff.h +qmgr_deliver.o: ../../include/mail_params.h +qmgr_deliver.o: ../../include/mail_proto.h +qmgr_deliver.o: ../../include/mail_queue.h +qmgr_deliver.o: ../../include/msg.h +qmgr_deliver.o: ../../include/msg_stats.h +qmgr_deliver.o: ../../include/mymalloc.h +qmgr_deliver.o: ../../include/nvtable.h +qmgr_deliver.o: ../../include/rcpt_print.h +qmgr_deliver.o: ../../include/recipient_list.h +qmgr_deliver.o: ../../include/scan_dir.h +qmgr_deliver.o: ../../include/smtputf8.h +qmgr_deliver.o: ../../include/stringops.h +qmgr_deliver.o: ../../include/sys_defs.h +qmgr_deliver.o: ../../include/vbuf.h +qmgr_deliver.o: ../../include/verp_sender.h +qmgr_deliver.o: ../../include/vstream.h +qmgr_deliver.o: ../../include/vstring.h +qmgr_deliver.o: ../../include/vstring_vstream.h +qmgr_deliver.o: qmgr.h +qmgr_deliver.o: qmgr_deliver.c +qmgr_enable.o: ../../include/check_arg.h +qmgr_enable.o: ../../include/dsn.h +qmgr_enable.o: ../../include/msg.h +qmgr_enable.o: ../../include/recipient_list.h +qmgr_enable.o: ../../include/scan_dir.h +qmgr_enable.o: ../../include/sys_defs.h +qmgr_enable.o: ../../include/vbuf.h +qmgr_enable.o: ../../include/vstream.h +qmgr_enable.o: qmgr.h +qmgr_enable.o: qmgr_enable.c +qmgr_entry.o: ../../include/attr.h +qmgr_entry.o: ../../include/check_arg.h +qmgr_entry.o: ../../include/deliver_request.h +qmgr_entry.o: ../../include/dsn.h +qmgr_entry.o: ../../include/events.h +qmgr_entry.o: ../../include/htable.h +qmgr_entry.o: ../../include/mail_params.h +qmgr_entry.o: ../../include/msg.h +qmgr_entry.o: ../../include/msg_stats.h +qmgr_entry.o: ../../include/mymalloc.h +qmgr_entry.o: ../../include/nvtable.h +qmgr_entry.o: ../../include/recipient_list.h +qmgr_entry.o: ../../include/scan_dir.h +qmgr_entry.o: ../../include/sys_defs.h +qmgr_entry.o: ../../include/vbuf.h +qmgr_entry.o: ../../include/vstream.h +qmgr_entry.o: ../../include/vstring.h +qmgr_entry.o: qmgr.h +qmgr_entry.o: qmgr_entry.c +qmgr_error.o: ../../include/check_arg.h +qmgr_error.o: ../../include/dsn.h +qmgr_error.o: ../../include/mymalloc.h +qmgr_error.o: ../../include/recipient_list.h +qmgr_error.o: ../../include/scan_dir.h +qmgr_error.o: ../../include/stringops.h +qmgr_error.o: ../../include/sys_defs.h +qmgr_error.o: ../../include/vbuf.h +qmgr_error.o: ../../include/vstream.h +qmgr_error.o: ../../include/vstring.h +qmgr_error.o: qmgr.h +qmgr_error.o: qmgr_error.c +qmgr_feedback.o: ../../include/check_arg.h +qmgr_feedback.o: ../../include/dsn.h +qmgr_feedback.o: ../../include/mail_conf.h +qmgr_feedback.o: ../../include/mail_params.h +qmgr_feedback.o: ../../include/msg.h +qmgr_feedback.o: ../../include/mymalloc.h +qmgr_feedback.o: ../../include/name_code.h +qmgr_feedback.o: ../../include/recipient_list.h +qmgr_feedback.o: ../../include/scan_dir.h +qmgr_feedback.o: ../../include/stringops.h +qmgr_feedback.o: ../../include/sys_defs.h +qmgr_feedback.o: ../../include/vbuf.h +qmgr_feedback.o: ../../include/vstream.h +qmgr_feedback.o: ../../include/vstring.h +qmgr_feedback.o: qmgr.h +qmgr_feedback.o: qmgr_feedback.c +qmgr_job.o: ../../include/check_arg.h +qmgr_job.o: ../../include/dsn.h +qmgr_job.o: ../../include/htable.h +qmgr_job.o: ../../include/msg.h +qmgr_job.o: ../../include/mymalloc.h +qmgr_job.o: ../../include/recipient_list.h +qmgr_job.o: ../../include/sane_time.h +qmgr_job.o: ../../include/scan_dir.h +qmgr_job.o: ../../include/sys_defs.h +qmgr_job.o: ../../include/vbuf.h +qmgr_job.o: ../../include/vstream.h +qmgr_job.o: qmgr.h +qmgr_job.o: qmgr_job.c +qmgr_message.o: ../../include/argv.h +qmgr_message.o: ../../include/attr.h +qmgr_message.o: ../../include/bounce.h +qmgr_message.o: ../../include/canon_addr.h +qmgr_message.o: ../../include/check_arg.h +qmgr_message.o: ../../include/deliver_completed.h +qmgr_message.o: ../../include/deliver_request.h +qmgr_message.o: ../../include/dict.h +qmgr_message.o: ../../include/dsn.h +qmgr_message.o: ../../include/dsn_buf.h +qmgr_message.o: ../../include/dsn_mask.h +qmgr_message.o: ../../include/htable.h +qmgr_message.o: ../../include/iostuff.h +qmgr_message.o: ../../include/mail_params.h +qmgr_message.o: ../../include/mail_proto.h +qmgr_message.o: ../../include/mail_queue.h +qmgr_message.o: ../../include/msg.h +qmgr_message.o: ../../include/msg_stats.h +qmgr_message.o: ../../include/myflock.h +qmgr_message.o: ../../include/mymalloc.h +qmgr_message.o: ../../include/nvtable.h +qmgr_message.o: ../../include/opened.h +qmgr_message.o: ../../include/qmgr_user.h +qmgr_message.o: ../../include/rec_attr_map.h +qmgr_message.o: ../../include/rec_type.h +qmgr_message.o: ../../include/recipient_list.h +qmgr_message.o: ../../include/record.h +qmgr_message.o: ../../include/resolve_clnt.h +qmgr_message.o: ../../include/rewrite_clnt.h +qmgr_message.o: ../../include/sane_time.h +qmgr_message.o: ../../include/scan_dir.h +qmgr_message.o: ../../include/sent.h +qmgr_message.o: ../../include/split_addr.h +qmgr_message.o: ../../include/split_at.h +qmgr_message.o: ../../include/stringops.h +qmgr_message.o: ../../include/sys_defs.h +qmgr_message.o: ../../include/valid_hostname.h +qmgr_message.o: ../../include/vbuf.h +qmgr_message.o: ../../include/verp_sender.h +qmgr_message.o: ../../include/vstream.h +qmgr_message.o: ../../include/vstring.h +qmgr_message.o: qmgr.h +qmgr_message.o: qmgr_message.c +qmgr_move.o: ../../include/check_arg.h +qmgr_move.o: ../../include/dsn.h +qmgr_move.o: ../../include/mail_queue.h +qmgr_move.o: ../../include/mail_scan_dir.h +qmgr_move.o: ../../include/msg.h +qmgr_move.o: ../../include/recipient_list.h +qmgr_move.o: ../../include/scan_dir.h +qmgr_move.o: ../../include/sys_defs.h +qmgr_move.o: ../../include/vbuf.h +qmgr_move.o: ../../include/vstream.h +qmgr_move.o: ../../include/vstring.h +qmgr_move.o: qmgr.h +qmgr_move.o: qmgr_move.c +qmgr_peer.o: ../../include/check_arg.h +qmgr_peer.o: ../../include/dsn.h +qmgr_peer.o: ../../include/htable.h +qmgr_peer.o: ../../include/msg.h +qmgr_peer.o: ../../include/mymalloc.h +qmgr_peer.o: ../../include/recipient_list.h +qmgr_peer.o: ../../include/scan_dir.h +qmgr_peer.o: ../../include/sys_defs.h +qmgr_peer.o: ../../include/vbuf.h +qmgr_peer.o: ../../include/vstream.h +qmgr_peer.o: qmgr.h +qmgr_peer.o: qmgr_peer.c +qmgr_queue.o: ../../include/attr.h +qmgr_queue.o: ../../include/check_arg.h +qmgr_queue.o: ../../include/dsn.h +qmgr_queue.o: ../../include/events.h +qmgr_queue.o: ../../include/htable.h +qmgr_queue.o: ../../include/iostuff.h +qmgr_queue.o: ../../include/mail_params.h +qmgr_queue.o: ../../include/mail_proto.h +qmgr_queue.o: ../../include/msg.h +qmgr_queue.o: ../../include/mymalloc.h +qmgr_queue.o: ../../include/nvtable.h +qmgr_queue.o: ../../include/recipient_list.h +qmgr_queue.o: ../../include/scan_dir.h +qmgr_queue.o: ../../include/sys_defs.h +qmgr_queue.o: ../../include/vbuf.h +qmgr_queue.o: ../../include/vstream.h +qmgr_queue.o: ../../include/vstring.h +qmgr_queue.o: qmgr.h +qmgr_queue.o: qmgr_queue.c +qmgr_scan.o: ../../include/check_arg.h +qmgr_scan.o: ../../include/dsn.h +qmgr_scan.o: ../../include/mail_scan_dir.h +qmgr_scan.o: ../../include/msg.h +qmgr_scan.o: ../../include/mymalloc.h +qmgr_scan.o: ../../include/recipient_list.h +qmgr_scan.o: ../../include/scan_dir.h +qmgr_scan.o: ../../include/sys_defs.h +qmgr_scan.o: ../../include/vbuf.h +qmgr_scan.o: ../../include/vstream.h +qmgr_scan.o: qmgr.h +qmgr_scan.o: qmgr_scan.c +qmgr_transport.o: ../../include/attr.h +qmgr_transport.o: ../../include/check_arg.h +qmgr_transport.o: ../../include/dsn.h +qmgr_transport.o: ../../include/events.h +qmgr_transport.o: ../../include/htable.h +qmgr_transport.o: ../../include/iostuff.h +qmgr_transport.o: ../../include/mail_conf.h +qmgr_transport.o: ../../include/mail_params.h +qmgr_transport.o: ../../include/mail_proto.h +qmgr_transport.o: ../../include/msg.h +qmgr_transport.o: ../../include/mymalloc.h +qmgr_transport.o: ../../include/nvtable.h +qmgr_transport.o: ../../include/recipient_list.h +qmgr_transport.o: ../../include/scan_dir.h +qmgr_transport.o: ../../include/sys_defs.h +qmgr_transport.o: ../../include/vbuf.h +qmgr_transport.o: ../../include/vstream.h +qmgr_transport.o: ../../include/vstring.h +qmgr_transport.o: qmgr.h +qmgr_transport.o: qmgr_transport.c diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c new file mode 100644 index 0000000..a48043e --- /dev/null +++ b/src/qmgr/qmgr.c @@ -0,0 +1,822 @@ +/*++ +/* NAME +/* qmgr 8 +/* SUMMARY +/* Postfix queue manager +/* SYNOPSIS +/* \fBqmgr\fR [generic Postfix daemon options] +/* DESCRIPTION +/* The \fBqmgr\fR(8) daemon awaits the arrival of incoming mail +/* and arranges for its delivery via Postfix delivery processes. +/* The actual mail routing strategy is delegated to the +/* \fBtrivial-rewrite\fR(8) daemon. +/* This program expects to be run from the \fBmaster\fR(8) process +/* manager. +/* +/* Mail addressed to the local \fBdouble-bounce\fR address is +/* logged and discarded. This stops potential loops caused by +/* undeliverable bounce notifications. +/* MAIL QUEUES +/* .ad +/* .fi +/* The \fBqmgr\fR(8) daemon maintains the following queues: +/* .IP \fBincoming\fR +/* Inbound mail from the network, or mail picked up by the +/* local \fBpickup\fR(8) daemon from the \fBmaildrop\fR directory. +/* .IP \fBactive\fR +/* Messages that the queue manager has opened for delivery. Only +/* a limited number of messages is allowed to enter the \fBactive\fR +/* queue (leaky bucket strategy, for a fixed delivery rate). +/* .IP \fBdeferred\fR +/* Mail that could not be delivered upon the first attempt. The queue +/* manager implements exponential backoff by doubling the time between +/* delivery attempts. +/* .IP \fBcorrupt\fR +/* Unreadable or damaged queue files are moved here for inspection. +/* .IP \fBhold\fR +/* Messages that are kept "on hold" are kept here until someone +/* sets them free. +/* DELIVERY STATUS REPORTS +/* .ad +/* .fi +/* The \fBqmgr\fR(8) daemon keeps an eye on per-message delivery status +/* reports in the following directories. Each status report file has +/* the same name as the corresponding message file: +/* .IP \fBbounce\fR +/* Per-recipient status information about why mail is bounced. +/* These files are maintained by the \fBbounce\fR(8) daemon. +/* .IP \fBdefer\fR +/* Per-recipient status information about why mail is delayed. +/* These files are maintained by the \fBdefer\fR(8) daemon. +/* .IP \fBtrace\fR +/* Per-recipient status information as requested with the +/* Postfix "\fBsendmail -v\fR" or "\fBsendmail -bv\fR" command. +/* These files are maintained by the \fBtrace\fR(8) daemon. +/* .PP +/* The \fBqmgr\fR(8) daemon is responsible for asking the +/* \fBbounce\fR(8), \fBdefer\fR(8) or \fBtrace\fR(8) daemons to +/* send delivery reports. +/* STRATEGIES +/* .ad +/* .fi +/* The queue manager implements a variety of strategies for +/* either opening queue files (input) or for message delivery (output). +/* .IP "\fBleaky bucket\fR" +/* This strategy limits the number of messages in the \fBactive\fR queue +/* and prevents the queue manager from running out of memory under +/* heavy load. +/* .IP \fBfairness\fR +/* When the \fBactive\fR queue has room, the queue manager takes one +/* message from the \fBincoming\fR queue and one from the \fBdeferred\fR +/* queue. This prevents a large mail backlog from blocking the delivery +/* of new mail. +/* .IP "\fBslow start\fR" +/* This strategy eliminates "thundering herd" problems by slowly +/* adjusting the number of parallel deliveries to the same destination. +/* .IP "\fBround robin\fR" +/* The queue manager sorts delivery requests by destination. +/* Round-robin selection prevents one destination from dominating +/* deliveries to other destinations. +/* .IP "\fBexponential backoff\fR" +/* Mail that cannot be delivered upon the first attempt is deferred. +/* The time interval between delivery attempts is doubled after each +/* attempt. +/* .IP "\fBdestination status cache\fR" +/* The queue manager avoids unnecessary delivery attempts by +/* maintaining a short-term, in-memory list of unreachable destinations. +/* .IP "\fBpreemptive message scheduling\fR" +/* The queue manager attempts to minimize the average per-recipient delay +/* while still preserving the correct per-message delays, using +/* a sophisticated preemptive message scheduling. +/* TRIGGERS +/* .ad +/* .fi +/* On an idle system, the queue manager waits for the arrival of +/* trigger events, or it waits for a timer to go off. A trigger +/* is a one-byte message. +/* Depending on the message received, the queue manager performs +/* one of the following actions (the message is followed by the +/* symbolic constant used internally by the software): +/* .IP "\fBD (QMGR_REQ_SCAN_DEFERRED)\fR" +/* Start a deferred queue scan. If a deferred queue scan is already +/* in progress, that scan will be restarted as soon as it finishes. +/* .IP "\fBI (QMGR_REQ_SCAN_INCOMING)\fR" +/* Start an incoming queue scan. If an incoming queue scan is already +/* in progress, that scan will be restarted as soon as it finishes. +/* .IP "\fBA (QMGR_REQ_SCAN_ALL)\fR" +/* Ignore deferred queue file time stamps. The request affects +/* the next deferred queue scan. +/* .IP "\fBF (QMGR_REQ_FLUSH_DEAD)\fR" +/* Purge all information about dead transports and destinations. +/* .IP "\fBW (TRIGGER_REQ_WAKEUP)\fR" +/* Wakeup call, This is used by the master server to instantiate +/* servers that should not go away forever. The action is to start +/* an incoming queue scan. +/* .PP +/* The \fBqmgr\fR(8) daemon reads an entire buffer worth of triggers. +/* Multiple identical trigger requests are collapsed into one, and +/* trigger requests are sorted so that \fBA\fR and \fBF\fR precede +/* \fBD\fR and \fBI\fR. Thus, in order to force a deferred queue run, +/* one would request \fBA F D\fR; in order to notify the queue manager +/* of the arrival of new mail one would request \fBI\fR. +/* STANDARDS +/* RFC 3463 (Enhanced status codes) +/* RFC 3464 (Delivery status notifications) +/* SECURITY +/* .ad +/* .fi +/* The \fBqmgr\fR(8) daemon is not security sensitive. It reads +/* single-character messages from untrusted local users, and thus may +/* be susceptible to denial of service attacks. The \fBqmgr\fR(8) daemon +/* does not talk to the outside world, and it can be run at fixed low +/* privilege in a chrooted environment. +/* DIAGNOSTICS +/* Problems and transactions are logged to \fBsyslogd\fR(8) +/* or \fBpostlogd\fR(8). +/* Corrupted message files are saved to the \fBcorrupt\fR queue +/* for further inspection. +/* +/* Depending on the setting of the \fBnotify_classes\fR parameter, +/* the postmaster is notified of bounces and of other trouble. +/* BUGS +/* A single queue manager process has to compete for disk access with +/* multiple front-end processes such as \fBcleanup\fR(8). A sudden burst of +/* inbound mail can negatively impact outbound delivery rates. +/* CONFIGURATION PARAMETERS +/* .ad +/* .fi +/* Changes to \fBmain.cf\fR are not picked up automatically +/* as \fBqmgr\fR(8) +/* is a persistent process. Use the "\fBpostfix reload\fR" command after +/* a configuration change. +/* +/* The text below provides only a parameter summary. See +/* \fBpostconf\fR(5) for more details including examples. +/* +/* In the text below, \fItransport\fR is the first field in a +/* \fBmaster.cf\fR entry. +/* COMPATIBILITY CONTROLS +/* .ad +/* .fi +/* Available before Postfix version 2.5: +/* .IP "\fBallow_min_user (no)\fR" +/* Allow a sender or recipient address to have `-' as the first +/* character. +/* .PP +/* Available with Postfix version 2.7 and later: +/* .IP "\fBdefault_filter_nexthop (empty)\fR" +/* When a content_filter or FILTER request specifies no explicit +/* next-hop destination, use $default_filter_nexthop instead; when +/* that value is empty, use the domain in the recipient address. +/* ACTIVE QUEUE CONTROLS +/* .ad +/* .fi +/* .IP "\fBqmgr_clog_warn_time (300s)\fR" +/* The minimal delay between warnings that a specific destination is +/* clogging up the Postfix active queue. +/* .IP "\fBqmgr_message_active_limit (20000)\fR" +/* The maximal number of messages in the active queue. +/* .IP "\fBqmgr_message_recipient_limit (20000)\fR" +/* The maximal number of recipients held in memory by the Postfix +/* queue manager, and the maximal size of the short-term, +/* in-memory "dead" destination status cache. +/* .IP "\fBqmgr_message_recipient_minimum (10)\fR" +/* The minimal number of in-memory recipients for any message. +/* .IP "\fBdefault_recipient_limit (20000)\fR" +/* The default per-transport upper limit on the number of in-memory +/* recipients. +/* .IP "\fBtransport_recipient_limit ($default_recipient_limit)\fR" +/* A transport-specific override for the default_recipient_limit +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_extra_recipient_limit (1000)\fR" +/* The default value for the extra per-transport limit imposed on the +/* number of in-memory recipients. +/* .IP "\fBtransport_extra_recipient_limit ($default_extra_recipient_limit)\fR" +/* A transport-specific override for the default_extra_recipient_limit +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .PP +/* Available in Postfix version 2.4 and later: +/* .IP "\fBdefault_recipient_refill_limit (100)\fR" +/* The default per-transport limit on the number of recipients refilled at +/* once. +/* .IP "\fBtransport_recipient_refill_limit ($default_recipient_refill_limit)\fR" +/* A transport-specific override for the default_recipient_refill_limit +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_recipient_refill_delay (5s)\fR" +/* The default per-transport maximum delay between recipients refills. +/* .IP "\fBtransport_recipient_refill_delay ($default_recipient_refill_delay)\fR" +/* A transport-specific override for the default_recipient_refill_delay +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* DELIVERY CONCURRENCY CONTROLS +/* .ad +/* .fi +/* .IP "\fBinitial_destination_concurrency (5)\fR" +/* The initial per-destination concurrency level for parallel delivery +/* to the same destination. +/* .IP "\fBdefault_destination_concurrency_limit (20)\fR" +/* The default maximal number of parallel deliveries to the same +/* destination. +/* .IP "\fBtransport_destination_concurrency_limit ($default_destination_concurrency_limit)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_limit parameter value, where +/* \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .PP +/* Available in Postfix version 2.5 and later: +/* .IP "\fBtransport_initial_destination_concurrency ($initial_destination_concurrency)\fR" +/* A transport-specific override for the initial_destination_concurrency +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_destination_concurrency_failed_cohort_limit (1)\fR" +/* How many pseudo-cohorts must suffer connection or handshake +/* failure before a specific destination is considered unavailable +/* (and further delivery is suspended). +/* .IP "\fBtransport_destination_concurrency_failed_cohort_limit ($default_destination_concurrency_failed_cohort_limit)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_failed_cohort_limit parameter value, +/* where \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .IP "\fBdefault_destination_concurrency_negative_feedback (1)\fR" +/* The per-destination amount of delivery concurrency negative +/* feedback, after a delivery completes with a connection or handshake +/* failure. +/* .IP "\fBtransport_destination_concurrency_negative_feedback ($default_destination_concurrency_negative_feedback)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_negative_feedback parameter value, +/* where \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .IP "\fBdefault_destination_concurrency_positive_feedback (1)\fR" +/* The per-destination amount of delivery concurrency positive +/* feedback, after a delivery completes without connection or handshake +/* failure. +/* .IP "\fBtransport_destination_concurrency_positive_feedback ($default_destination_concurrency_positive_feedback)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_positive_feedback parameter value, +/* where \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .IP "\fBdestination_concurrency_feedback_debug (no)\fR" +/* Make the queue manager's feedback algorithm verbose for performance +/* analysis purposes. +/* RECIPIENT SCHEDULING CONTROLS +/* .ad +/* .fi +/* .IP "\fBdefault_destination_recipient_limit (50)\fR" +/* The default maximal number of recipients per message delivery. +/* .IP "\fBtransport_destination_recipient_limit ($default_destination_recipient_limit)\fR" +/* A transport-specific override for the +/* default_destination_recipient_limit parameter value, where +/* \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* MESSAGE SCHEDULING CONTROLS +/* .ad +/* .fi +/* .IP "\fBdefault_delivery_slot_cost (5)\fR" +/* How often the Postfix queue manager's scheduler is allowed to +/* preempt delivery of one message with another. +/* .IP "\fBtransport_delivery_slot_cost ($default_delivery_slot_cost)\fR" +/* A transport-specific override for the default_delivery_slot_cost +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_minimum_delivery_slots (3)\fR" +/* How many recipients a message must have in order to invoke the +/* Postfix queue manager's scheduling algorithm at all. +/* .IP "\fBtransport_minimum_delivery_slots ($default_minimum_delivery_slots)\fR" +/* A transport-specific override for the default_minimum_delivery_slots +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_delivery_slot_discount (50)\fR" +/* The default value for transport-specific _delivery_slot_discount +/* settings. +/* .IP "\fBtransport_delivery_slot_discount ($default_delivery_slot_discount)\fR" +/* A transport-specific override for the default_delivery_slot_discount +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_delivery_slot_loan (3)\fR" +/* The default value for transport-specific _delivery_slot_loan +/* settings. +/* .IP "\fBtransport_delivery_slot_loan ($default_delivery_slot_loan)\fR" +/* A transport-specific override for the default_delivery_slot_loan +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* OTHER RESOURCE AND RATE CONTROLS +/* .ad +/* .fi +/* .IP "\fBminimal_backoff_time (300s)\fR" +/* The minimal time between attempts to deliver a deferred message; +/* prior to Postfix 2.4 the default value was 1000s. +/* .IP "\fBmaximal_backoff_time (4000s)\fR" +/* The maximal time between attempts to deliver a deferred message. +/* .IP "\fBmaximal_queue_lifetime (5d)\fR" +/* Consider a message as undeliverable, when delivery fails with a +/* temporary error, and the time in the queue has reached the +/* maximal_queue_lifetime limit. +/* .IP "\fBqueue_run_delay (300s)\fR" +/* The time between deferred queue scans by the queue manager; +/* prior to Postfix 2.4 the default value was 1000s. +/* .IP "\fBtransport_retry_time (60s)\fR" +/* The time between attempts by the Postfix queue manager to contact +/* a malfunctioning message delivery transport. +/* .PP +/* Available in Postfix version 2.1 and later: +/* .IP "\fBbounce_queue_lifetime (5d)\fR" +/* Consider a bounce message as undeliverable, when delivery fails +/* with a temporary error, and the time in the queue has reached the +/* bounce_queue_lifetime limit. +/* .PP +/* Available in Postfix version 2.5 and later: +/* .IP "\fBdefault_destination_rate_delay (0s)\fR" +/* The default amount of delay that is inserted between individual +/* message deliveries to the same destination and over the same message +/* delivery transport. +/* .IP "\fBtransport_destination_rate_delay ($default_destination_rate_delay)\fR" +/* A transport-specific override for the default_destination_rate_delay +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .PP +/* Available in Postfix version 3.1 and later: +/* .IP "\fBdefault_transport_rate_delay (0s)\fR" +/* The default amount of delay that is inserted between individual +/* message deliveries over the same message delivery transport, +/* regardless of destination. +/* .IP "\fBtransport_transport_rate_delay ($default_transport_rate_delay)\fR" +/* A transport-specific override for the default_transport_rate_delay +/* parameter value, where the initial \fItransport\fR in the parameter +/* name is the master.cf name of the message delivery transport. +/* SAFETY CONTROLS +/* .ad +/* .fi +/* .IP "\fBqmgr_daemon_timeout (1000s)\fR" +/* How much time a Postfix queue manager process may take to handle +/* a request before it is terminated by a built-in watchdog timer. +/* .IP "\fBqmgr_ipc_timeout (60s)\fR" +/* The time limit for the queue manager to send or receive information +/* over an internal communication channel. +/* .PP +/* Available in Postfix version 3.1 and later: +/* .IP "\fBaddress_verify_pending_request_limit (see 'postconf -d' output)\fR" +/* A safety limit that prevents address verification requests from +/* overwhelming the Postfix queue. +/* MISCELLANEOUS CONTROLS +/* .ad +/* .fi +/* .IP "\fBconfig_directory (see 'postconf -d' output)\fR" +/* The default location of the Postfix main.cf and master.cf +/* configuration files. +/* .IP "\fBdefer_transports (empty)\fR" +/* The names of message delivery transports that should not deliver mail +/* unless someone issues "\fBsendmail -q\fR" or equivalent. +/* .IP "\fBdelay_logging_resolution_limit (2)\fR" +/* The maximal number of digits after the decimal point when logging +/* sub-second delay values. +/* .IP "\fBhelpful_warnings (yes)\fR" +/* Log warnings about problematic configuration settings, and provide +/* helpful suggestions. +/* .IP "\fBprocess_id (read-only)\fR" +/* The process ID of a Postfix command or daemon process. +/* .IP "\fBprocess_name (read-only)\fR" +/* The process name of a Postfix command or daemon process. +/* .IP "\fBqueue_directory (see 'postconf -d' output)\fR" +/* The location of the Postfix top-level queue directory. +/* .IP "\fBsyslog_facility (mail)\fR" +/* The syslog facility of Postfix logging. +/* .IP "\fBsyslog_name (see 'postconf -d' output)\fR" +/* A prefix that is prepended to the process name in syslog +/* records, so that, for example, "smtpd" becomes "prefix/smtpd". +/* .PP +/* Available in Postfix version 3.0 and later: +/* .IP "\fBconfirm_delay_cleared (no)\fR" +/* After sending a "your message is delayed" notification, inform +/* the sender when the delay clears up. +/* .PP +/* Available in Postfix 3.3 and later: +/* .IP "\fBservice_name (read-only)\fR" +/* The master.cf service name of a Postfix daemon process. +/* FILES +/* /var/spool/postfix/incoming, incoming queue +/* /var/spool/postfix/active, active queue +/* /var/spool/postfix/deferred, deferred queue +/* /var/spool/postfix/bounce, non-delivery status +/* /var/spool/postfix/defer, non-delivery status +/* /var/spool/postfix/trace, delivery status +/* SEE ALSO +/* trivial-rewrite(8), address routing +/* bounce(8), delivery status reports +/* postconf(5), configuration parameters +/* master(5), generic daemon options +/* master(8), process manager +/* postlogd(8), Postfix logging +/* syslogd(8), system logging +/* README FILES +/* .ad +/* .fi +/* Use "\fBpostconf readme_directory\fR" or +/* "\fBpostconf html_directory\fR" to locate this information. +/* .na +/* .nf +/* SCHEDULER_README, scheduling algorithm +/* QSHAPE_README, Postfix queue analysis +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include +#include +#include +#include + +/* Utility library. */ + +#include +#include +#include +#include + +/* Global library. */ + +#include +#include +#include +#include +#include +#include /* QMGR_SCAN constants */ +#include +#include + +/* Master process interface */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * Tunables. + */ +int var_queue_run_delay; +int var_min_backoff_time; +int var_max_backoff_time; +int var_max_queue_time; +int var_dsn_queue_time; +int var_qmgr_active_limit; +int var_qmgr_rcpt_limit; +int var_qmgr_msg_rcpt_limit; +int var_xport_rcpt_limit; +int var_stack_rcpt_limit; +int var_xport_refill_limit; +int var_xport_refill_delay; +int var_delivery_slot_cost; +int var_delivery_slot_loan; +int var_delivery_slot_discount; +int var_min_delivery_slots; +int var_init_dest_concurrency; +int var_transport_retry_time; +int var_dest_con_limit; +int var_dest_rcpt_limit; +char *var_defer_xports; +int var_local_con_lim; +int var_local_rcpt_lim; +bool var_verp_bounce_off; +int var_qmgr_clog_warn_time; +char *var_conc_pos_feedback; +char *var_conc_neg_feedback; +int var_conc_cohort_limit; +int var_conc_feedback_debug; +int var_xport_rate_delay; +int var_dest_rate_delay; +char *var_def_filter_nexthop; +int var_qmgr_daemon_timeout; +int var_qmgr_ipc_timeout; +int var_dsn_delay_cleared; +int var_vrfy_pend_limit; + +static QMGR_SCAN *qmgr_scans[2]; + +#define QMGR_SCAN_IDX_INCOMING 0 +#define QMGR_SCAN_IDX_DEFERRED 1 +#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0])) + +/* qmgr_deferred_run_event - queue manager heartbeat */ + +static void qmgr_deferred_run_event(int unused_event, void *dummy) +{ + + /* + * This routine runs when it is time for another deferred queue scan. + * Make sure this routine gets called again in the future. + */ + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START); + event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay); +} + +/* qmgr_trigger_event - respond to external trigger(s) */ + +static void qmgr_trigger_event(char *buf, ssize_t len, + char *unused_service, char **argv) +{ + int incoming_flag = 0; + int deferred_flag = 0; + int i; + + /* + * Sanity check. This service takes no command-line arguments. + */ + if (argv[0]) + msg_fatal("unexpected command-line argument: %s", argv[0]); + + /* + * Collapse identical requests that have arrived since we looked last + * time. There is no client feedback so there is no need to process each + * request in order. And as long as we don't have conflicting requests we + * are free to sort them into the most suitable order. + */ +#define QMGR_FLUSH_BEFORE (QMGR_FLUSH_ONCE | QMGR_FLUSH_DFXP) + + for (i = 0; i < len; i++) { + if (msg_verbose) + msg_info("request: %d (%c)", + buf[i], ISALNUM(buf[i]) ? buf[i] : '?'); + switch (buf[i]) { + case TRIGGER_REQ_WAKEUP: + case QMGR_REQ_SCAN_INCOMING: + incoming_flag |= QMGR_SCAN_START; + break; + case QMGR_REQ_SCAN_DEFERRED: + deferred_flag |= QMGR_SCAN_START; + break; + case QMGR_REQ_FLUSH_DEAD: + deferred_flag |= QMGR_FLUSH_BEFORE; + incoming_flag |= QMGR_FLUSH_BEFORE; + break; + case QMGR_REQ_SCAN_ALL: + deferred_flag |= QMGR_SCAN_ALL; + incoming_flag |= QMGR_SCAN_ALL; + break; + default: + if (msg_verbose) + msg_info("request ignored"); + break; + } + } + + /* + * Process each request type at most once. Modifiers take effect upon the + * next queue run. If no queue run is in progress, and a queue scan is + * requested, the request takes effect immediately. + */ + if (incoming_flag != 0) + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag); + if (deferred_flag != 0) + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag); +} + +/* qmgr_loop - queue manager main loop */ + +static int qmgr_loop(char *unused_name, char **unused_argv) +{ + char *path; + ssize_t token_count; + int feed = 0; + int scan_idx; /* Priority order scan index */ + static int first_scan_idx = QMGR_SCAN_IDX_INCOMING; + int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1; + int delay; + + /* + * This routine runs as part of the event handling loop, after the event + * manager has delivered a timer or I/O event (including the completion + * of a connection to a delivery process), or after it has waited for a + * specified amount of time. The result value of qmgr_loop() specifies + * how long the event manager should wait for the next event. + */ +#define DONT_WAIT 0 +#define WAIT_FOR_EVENT (-1) + + /* + * Attempt to drain the active queue by allocating a suitable delivery + * process and by delivering mail via it. Delivery process allocation and + * mail delivery are asynchronous. + */ + qmgr_active_drain(); + + /* + * Let some new blood into the active queue when the queue size is + * smaller than some configurable limit. + * + * We import one message per interrupt, to optimally tune the input count + * for the number of delivery agent protocol wait states, as explained in + * qmgr_transport.c. + */ + delay = WAIT_FOR_EVENT; + for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit + && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) { + last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT; + if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) { + delay = DONT_WAIT; + if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0) + break; + } + } + + /* + * Round-robin the queue scans. When the active queue becomes full, + * prefer new mail over deferred mail. + */ + if (qmgr_message_count < var_qmgr_active_limit) { + first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT; + } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) { + first_scan_idx = QMGR_SCAN_IDX_INCOMING; + } + + /* + * Global flow control. If enabled, slow down receiving processes that + * get ahead of the queue manager, but don't block them completely. + */ + if (var_in_flow_delay > 0) { + token_count = mail_flow_count(); + if (token_count < var_proc_limit) { + if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING) + mail_flow_put(1); + else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0) + mail_flow_put(var_proc_limit - token_count); + } else if (token_count > var_proc_limit) { + mail_flow_get(token_count - var_proc_limit); + } + } + return (delay); +} + +/* pre_accept - see if tables have changed */ + +static void pre_accept(char *unused_name, char **unused_argv) +{ + const char *table; + + if ((table = dict_changed_name()) != 0) { + msg_info("table %s has changed -- restarting", table); + exit(0); + } +} + +/* qmgr_pre_init - pre-jail initialization */ + +static void qmgr_pre_init(char *unused_name, char **unused_argv) +{ + flush_init(); +} + +/* qmgr_post_init - post-jail initialization */ + +static void qmgr_post_init(char *name, char **unused_argv) +{ + + /* + * Backwards compatibility. + */ + if (strcmp(var_procname, "nqmgr") == 0) { + msg_warn("please update the %s/%s file; the new queue manager", + var_config_dir, MASTER_CONF_FILE); + msg_warn("(old name: nqmgr) has become the standard queue manager (new name: qmgr)"); + msg_warn("support for the name old name (nqmgr) will be removed from Postfix"); + } + + /* + * Sanity check. + */ + if (var_qmgr_rcpt_limit < var_qmgr_active_limit) { + msg_warn("%s is smaller than %s - adjusting %s", + VAR_QMGR_RCPT_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_RCPT_LIMIT); + var_qmgr_rcpt_limit = var_qmgr_active_limit; + } + if (var_dsn_queue_time > var_max_queue_time) { + msg_warn("%s is larger than %s - adjusting %s", + VAR_DSN_QUEUE_TIME, VAR_MAX_QUEUE_TIME, VAR_DSN_QUEUE_TIME); + var_dsn_queue_time = var_max_queue_time; + } + + /* + * This routine runs after the skeleton code has entered the chroot jail. + * Prevent automatic process suicide after a limited number of client + * requests or after a limited amount of idle time. Move any left-over + * entries from the active queue to the incoming queue, and give them a + * time stamp into the future, in order to allow ongoing deliveries to + * finish first. Start scanning the incoming and deferred queues. + * Left-over active queue entries are moved to the incoming queue because + * the incoming queue has priority; moving left-overs to the deferred + * queue could cause anomalous delays when "postfix reload/start" are + * issued often. Override the IPC timeout (default 3600s) so that the + * queue manager can reset a broken IPC channel before the watchdog timer + * goes off. + */ + var_ipc_timeout = var_qmgr_ipc_timeout; + var_use_limit = 0; + var_idle_limit = 0; + qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time()); + qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING); + qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START); + qmgr_deferred_run_event(0, (void *) 0); +} + +MAIL_VERSION_STAMP_DECLARE; + +/* main - the main program */ + +int main(int argc, char **argv) +{ + static const CONFIG_STR_TABLE str_table[] = { + VAR_DEFER_XPORTS, DEF_DEFER_XPORTS, &var_defer_xports, 0, 0, + VAR_CONC_POS_FDBACK, DEF_CONC_POS_FDBACK, &var_conc_pos_feedback, 1, 0, + VAR_CONC_NEG_FDBACK, DEF_CONC_NEG_FDBACK, &var_conc_neg_feedback, 1, 0, + VAR_DEF_FILTER_NEXTHOP, DEF_DEF_FILTER_NEXTHOP, &var_def_filter_nexthop, 0, 0, + 0, + }; + static const CONFIG_TIME_TABLE time_table[] = { + VAR_QUEUE_RUN_DELAY, DEF_QUEUE_RUN_DELAY, &var_queue_run_delay, 1, 0, + VAR_MIN_BACKOFF_TIME, DEF_MIN_BACKOFF_TIME, &var_min_backoff_time, 1, 0, + VAR_MAX_BACKOFF_TIME, DEF_MAX_BACKOFF_TIME, &var_max_backoff_time, 1, 0, + VAR_MAX_QUEUE_TIME, DEF_MAX_QUEUE_TIME, &var_max_queue_time, 0, 8640000, + VAR_DSN_QUEUE_TIME, DEF_DSN_QUEUE_TIME, &var_dsn_queue_time, 0, 8640000, + VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0, + VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0, + VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0, + VAR_XPORT_RATE_DELAY, DEF_XPORT_RATE_DELAY, &var_xport_rate_delay, 0, 0, + VAR_DEST_RATE_DELAY, DEF_DEST_RATE_DELAY, &var_dest_rate_delay, 0, 0, + VAR_QMGR_DAEMON_TIMEOUT, DEF_QMGR_DAEMON_TIMEOUT, &var_qmgr_daemon_timeout, 1, 0, + VAR_QMGR_IPC_TIMEOUT, DEF_QMGR_IPC_TIMEOUT, &var_qmgr_ipc_timeout, 1, 0, + 0, + }; + static const CONFIG_INT_TABLE int_table[] = { + VAR_QMGR_ACT_LIMIT, DEF_QMGR_ACT_LIMIT, &var_qmgr_active_limit, 1, 0, + VAR_QMGR_RCPT_LIMIT, DEF_QMGR_RCPT_LIMIT, &var_qmgr_rcpt_limit, 1, 0, + VAR_QMGR_MSG_RCPT_LIMIT, DEF_QMGR_MSG_RCPT_LIMIT, &var_qmgr_msg_rcpt_limit, 1, 0, + VAR_XPORT_RCPT_LIMIT, DEF_XPORT_RCPT_LIMIT, &var_xport_rcpt_limit, 0, 0, + VAR_STACK_RCPT_LIMIT, DEF_STACK_RCPT_LIMIT, &var_stack_rcpt_limit, 0, 0, + VAR_XPORT_REFILL_LIMIT, DEF_XPORT_REFILL_LIMIT, &var_xport_refill_limit, 1, 0, + VAR_DELIVERY_SLOT_COST, DEF_DELIVERY_SLOT_COST, &var_delivery_slot_cost, 0, 0, + VAR_DELIVERY_SLOT_LOAN, DEF_DELIVERY_SLOT_LOAN, &var_delivery_slot_loan, 0, 0, + VAR_DELIVERY_SLOT_DISCOUNT, DEF_DELIVERY_SLOT_DISCOUNT, &var_delivery_slot_discount, 0, 100, + VAR_MIN_DELIVERY_SLOTS, DEF_MIN_DELIVERY_SLOTS, &var_min_delivery_slots, 0, 0, + VAR_INIT_DEST_CON, DEF_INIT_DEST_CON, &var_init_dest_concurrency, 1, 0, + VAR_DEST_CON_LIMIT, DEF_DEST_CON_LIMIT, &var_dest_con_limit, 0, 0, + VAR_DEST_RCPT_LIMIT, DEF_DEST_RCPT_LIMIT, &var_dest_rcpt_limit, 0, 0, + VAR_LOCAL_RCPT_LIMIT, DEF_LOCAL_RCPT_LIMIT, &var_local_rcpt_lim, 0, 0, + VAR_LOCAL_CON_LIMIT, DEF_LOCAL_CON_LIMIT, &var_local_con_lim, 0, 0, + VAR_CONC_COHORT_LIM, DEF_CONC_COHORT_LIM, &var_conc_cohort_limit, 0, 0, + VAR_VRFY_PEND_LIMIT, DEF_VRFY_PEND_LIMIT, &var_vrfy_pend_limit, 1, 0, + 0, + }; + static const CONFIG_BOOL_TABLE bool_table[] = { + VAR_VERP_BOUNCE_OFF, DEF_VERP_BOUNCE_OFF, &var_verp_bounce_off, + VAR_CONC_FDBACK_DEBUG, DEF_CONC_FDBACK_DEBUG, &var_conc_feedback_debug, + VAR_DSN_DELAY_CLEARED, DEF_DSN_DELAY_CLEARED, &var_dsn_delay_cleared, + 0, + }; + + /* + * Fingerprint executables and core dumps. + */ + MAIL_VERSION_STAMP_ALLOCATE; + + /* + * Use the trigger service skeleton, because no-one else should be + * monitoring our service port while this process runs, and because we do + * not talk back to the client. + */ + trigger_server_main(argc, argv, qmgr_trigger_event, + CA_MAIL_SERVER_INT_TABLE(int_table), + CA_MAIL_SERVER_STR_TABLE(str_table), + CA_MAIL_SERVER_BOOL_TABLE(bool_table), + CA_MAIL_SERVER_TIME_TABLE(time_table), + CA_MAIL_SERVER_PRE_INIT(qmgr_pre_init), + CA_MAIL_SERVER_POST_INIT(qmgr_post_init), + CA_MAIL_SERVER_LOOP(qmgr_loop), + CA_MAIL_SERVER_PRE_ACCEPT(pre_accept), + CA_MAIL_SERVER_SOLITARY, + CA_MAIL_SERVER_WATCHDOG(&var_qmgr_daemon_timeout), + 0); +} diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h new file mode 100644 index 0000000..67d0cf5 --- /dev/null +++ b/src/qmgr/qmgr.h @@ -0,0 +1,545 @@ +/*++ +/* NAME +/* qmgr 3h +/* SUMMARY +/* queue manager data structures +/* SYNOPSIS +/* #include "qmgr.h" +/* DESCRIPTION +/* .nf + + /* + * System library. + */ +#include +#include + + /* + * Utility library. + */ +#include +#include + + /* + * Global library. + */ +#include +#include + + /* + * The queue manager is built around lots of mutually-referring structures. + * These typedefs save some typing. + */ +typedef struct QMGR_TRANSPORT QMGR_TRANSPORT; +typedef struct QMGR_QUEUE QMGR_QUEUE; +typedef struct QMGR_ENTRY QMGR_ENTRY; +typedef struct QMGR_MESSAGE QMGR_MESSAGE; +typedef struct QMGR_JOB QMGR_JOB; +typedef struct QMGR_PEER QMGR_PEER; +typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST; +typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST; +typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST; +typedef struct QMGR_JOB_LIST QMGR_JOB_LIST; +typedef struct QMGR_PEER_LIST QMGR_PEER_LIST; +typedef struct QMGR_SCAN QMGR_SCAN; +typedef struct QMGR_FEEDBACK QMGR_FEEDBACK; + + /* + * Hairy macros to update doubly-linked lists. + */ +#define QMGR_LIST_ROTATE(head, object, peers) { \ + head.next->peers.prev = head.prev; \ + head.prev->peers.next = head.next; \ + head.next = object->peers.next; \ + head.next->peers.prev = 0; \ + head.prev = object; \ + object->peers.next = 0; \ +} + +#define QMGR_LIST_UNLINK(head, type, object, peers) { \ + type _next = object->peers.next; \ + type _prev = object->peers.prev; \ + if (_prev) _prev->peers.next = _next; \ + else head.next = _next; \ + if (_next) _next->peers.prev = _prev; \ + else head.prev = _prev; \ + object->peers.next = object->peers.prev = 0; \ +} + +#define QMGR_LIST_LINK(head, pred, object, succ, peers) { \ + object->peers.prev = pred; \ + object->peers.next = succ; \ + if (pred) pred->peers.next = object; \ + else head.next = object; \ + if (succ) succ->peers.prev = object; \ + else head.prev = object; \ +} + +#define QMGR_LIST_PREPEND(head, object, peers) { \ + object->peers.next = head.next; \ + object->peers.prev = 0; \ + if (head.next) { \ + head.next->peers.prev = object; \ + } else { \ + head.prev = object; \ + } \ + head.next = object; \ +} + +#define QMGR_LIST_APPEND(head, object, peers) { \ + object->peers.prev = head.prev; \ + object->peers.next = 0; \ + if (head.prev) { \ + head.prev->peers.next = object; \ + } else { \ + head.next = object; \ + } \ + head.prev = object; \ +} + +#define QMGR_LIST_INIT(head) { \ + head.prev = 0; \ + head.next = 0; \ +} + + /* + * Transports are looked up by name (when we have resolved a message), or + * round-robin wise (when we want to distribute resources fairly). + */ +struct QMGR_TRANSPORT_LIST { + QMGR_TRANSPORT *next; + QMGR_TRANSPORT *prev; +}; + +extern struct HTABLE *qmgr_transport_byname; /* transport by name */ +extern QMGR_TRANSPORT_LIST qmgr_transport_list; /* transports, round robin */ + + /* + * Delivery agents provide feedback, as hints that Postfix should expend + * more or fewer resources on a specific destination domain. The main.cf + * file specifies how feedback affects delivery concurrency: add/subtract a + * constant, a ratio of constants, or a constant divided by the delivery + * concurrency; and it specifies how much feedback must accumulate between + * concurrency updates. + */ +struct QMGR_FEEDBACK { + int hysteresis; /* to pass, need to be this tall */ + double base; /* pre-computed from main.cf */ + int index; /* none, window, sqrt(window) */ +}; + +#define QMGR_FEEDBACK_IDX_NONE 0 /* no window dependence */ +#define QMGR_FEEDBACK_IDX_WIN 1 /* 1/window dependence */ +#if 0 +#define QMGR_FEEDBACK_IDX_SQRT_WIN 2 /* 1/sqrt(window) dependence */ +#endif + +#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN +#include +#endif + +extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *); + +#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN +#define QMGR_FEEDBACK_VAL(fb, win) \ + ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win)) +#else +#define QMGR_FEEDBACK_VAL(fb, win) \ + ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \ + (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \ + (fb).base / sqrt(win)) +#endif + + /* + * Each transport (local, smtp-out, bounce) can have one queue per next hop + * name. Queues are looked up by next hop name (when we have resolved a + * message destination), or round-robin wise (when we want to deliver + * messages fairly). + */ +struct QMGR_QUEUE_LIST { + QMGR_QUEUE *next; + QMGR_QUEUE *prev; +}; + +struct QMGR_JOB_LIST { + QMGR_JOB *next; + QMGR_JOB *prev; +}; + +struct QMGR_TRANSPORT { + int flags; /* blocked, etc. */ + int pending; /* incomplete DA connections */ + char *name; /* transport name */ + int dest_concurrency_limit; /* concurrency per domain */ + int init_dest_concurrency; /* init. per-domain concurrency */ + int recipient_limit; /* recipients per transaction */ + int rcpt_per_stack; /* extra slots reserved for jobs put + * on the job stack */ + int rcpt_unused; /* available in-core recipient slots */ + int refill_limit; /* recipient batch size for message + * refill */ + int refill_delay; /* delay before message refill */ + int slot_cost; /* cost of new preemption slot (# of + * selected entries) */ + int slot_loan; /* preemption boost offset and */ + int slot_loan_factor; /* factor, see qmgr_job_preempt() */ + int min_slots; /* when preemption can take effect at + * all */ + struct HTABLE *queue_byname; /* queues indexed by domain */ + QMGR_QUEUE_LIST queue_list; /* queues, round robin order */ + struct HTABLE *job_byname; /* jobs indexed by queue id */ + QMGR_JOB_LIST job_list; /* list of message jobs (1 per + * message) ordered by scheduler */ + QMGR_JOB_LIST job_bytime; /* jobs ordered by time since queued */ + QMGR_JOB *job_current; /* keeps track of the current job */ + QMGR_JOB *job_next_unread; /* next job with unread recipients */ + QMGR_JOB *candidate_cache; /* cached result from + * qmgr_job_candidate() */ + QMGR_JOB *candidate_cache_current; /* current job tied to the candidate */ + time_t candidate_cache_time; /* when candidate_cache was last + * updated */ + int blocker_tag; /* for marking blocker jobs */ + QMGR_TRANSPORT_LIST peers; /* linkage */ + DSN *dsn; /* why unavailable */ + QMGR_FEEDBACK pos_feedback; /* positive feedback control */ + QMGR_FEEDBACK neg_feedback; /* negative feedback control */ + int fail_cohort_limit; /* flow shutdown control */ + int xport_rate_delay; /* suspend per delivery */ + int rate_delay; /* suspend per delivery */ +}; + +#define QMGR_TRANSPORT_STAT_DEAD (1<<1) +#define QMGR_TRANSPORT_STAT_RATE_LOCK (1<<2) + +typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); +extern QMGR_TRANSPORT *qmgr_transport_select(void); +extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY); +extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *); +extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); +extern QMGR_TRANSPORT *qmgr_transport_create(const char *); +extern QMGR_TRANSPORT *qmgr_transport_find(const char *); + +#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) + + /* + * Each next hop (e.g., a domain name) has its own queue of pending message + * transactions. The "todo" queue contains messages that are to be delivered + * to this next hop. When a message is elected for transmission, it is moved + * from the "todo" queue to the "busy" queue. Messages are taken from the + * "todo" queue in round-robin order. + */ +struct QMGR_ENTRY_LIST { + QMGR_ENTRY *next; + QMGR_ENTRY *prev; +}; + +struct QMGR_QUEUE { + int dflags; /* delivery request options */ + time_t last_done; /* last delivery completion */ + char *name; /* domain name or address */ + char *nexthop; /* domain name */ + int todo_refcount; /* queue entries (todo list) */ + int busy_refcount; /* queue entries (busy list) */ + int window; /* slow open algorithm */ + double success; /* accumulated positive feedback */ + double failure; /* accumulated negative feedback */ + double fail_cohorts; /* pseudo-cohort failure count */ + QMGR_TRANSPORT *transport; /* transport linkage */ + QMGR_ENTRY_LIST todo; /* todo queue entries */ + QMGR_ENTRY_LIST busy; /* messages on the wire */ + QMGR_QUEUE_LIST peers; /* neighbor queues */ + DSN *dsn; /* why unavailable */ + time_t clog_time_to_warn; /* time of last warning */ + int blocker_tag; /* tagged if blocks job list */ +}; + +#define QMGR_QUEUE_TODO 1 /* waiting for service */ +#define QMGR_QUEUE_BUSY 2 /* recipients on the wire */ + +extern int qmgr_queue_count; + +extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *); +extern void qmgr_queue_done(QMGR_QUEUE *); +extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *); +extern void qmgr_queue_unthrottle(QMGR_QUEUE *); +extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *); +extern void qmgr_queue_suspend(QMGR_QUEUE *, int); + + /* + * Exclusive queue states. Originally there were only two: "throttled" and + * "not throttled". It was natural to encode these in the queue window size. + * After 10 years it's not practical to rip out all the working code and + * change representations, so we just clean up the names a little. + * + * Note: only the "ready" state can reach every state (including itself); + * non-ready states can reach only the "ready" state. Other transitions are + * forbidden, because they would result in dangling event handlers. + */ +#define QMGR_QUEUE_STAT_THROTTLED 0 /* back-off timer */ +#define QMGR_QUEUE_STAT_SUSPENDED -1 /* voluntary delay timer */ +#define QMGR_QUEUE_STAT_SAVED -2 /* delayed cleanup timer */ +#define QMGR_QUEUE_STAT_BAD -3 /* can't happen */ + +#define QMGR_QUEUE_READY(q) ((q)->window > 0) +#define QMGR_QUEUE_THROTTLED(q) ((q)->window == QMGR_QUEUE_STAT_THROTTLED) +#define QMGR_QUEUE_SUSPENDED(q) ((q)->window == QMGR_QUEUE_STAT_SUSPENDED) +#define QMGR_QUEUE_SAVED(q) ((q)->window == QMGR_QUEUE_STAT_SAVED) +#define QMGR_QUEUE_BAD(q) ((q)->window <= QMGR_QUEUE_STAT_BAD) + +#define QMGR_QUEUE_STATUS(q) ( \ + QMGR_QUEUE_READY(q) ? "ready" : \ + QMGR_QUEUE_THROTTLED(q) ? "throttled" : \ + QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \ + QMGR_QUEUE_SAVED(q) ? "saved" : \ + "invalid queue status" \ + ) + + /* + * Structure of one next-hop queue entry. In order to save some copying + * effort we allow multiple recipients per transaction. + */ +struct QMGR_ENTRY { + VSTREAM *stream; /* delivery process */ + QMGR_MESSAGE *message; /* message info */ + RECIPIENT_LIST rcpt_list; /* as many as it takes */ + QMGR_QUEUE *queue; /* parent linkage */ + QMGR_PEER *peer; /* parent linkage */ + QMGR_ENTRY_LIST queue_peers; /* per queue neighbor entries */ + QMGR_ENTRY_LIST peer_peers; /* per peer neighbor entries */ +}; + +extern QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *); +extern void qmgr_entry_unselect(QMGR_ENTRY *); +extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *); +extern void qmgr_entry_done(QMGR_ENTRY *, int); +extern QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *, QMGR_MESSAGE *); + + /* + * All common in-core information about a message is kept here. When all + * recipients have been tried the message file is linked to the "deferred" + * queue (some hosts not reachable), to the "bounce" queue (some recipients + * were rejected), and is then removed from the "active" queue. + */ +struct QMGR_MESSAGE { + int flags; /* delivery problems */ + int qflags; /* queuing flags */ + int tflags; /* tracing flags */ + long tflags_offset; /* offset for killing */ + int rflags; /* queue file read flags */ + VSTREAM *fp; /* open queue file or null */ + int refcount; /* queue entries */ + int single_rcpt; /* send one rcpt at a time */ + struct timeval arrival_time; /* start of receive transaction */ + time_t create_time; /* queue file create time */ + struct timeval active_time; /* time of entry into active queue */ + time_t queued_time; /* sanitized time when moved to the + * active queue */ + time_t refill_time; /* sanitized time of last message + * refill */ + long warn_offset; /* warning bounce flag offset */ + time_t warn_time; /* time next warning to be sent */ + long data_offset; /* data seek offset */ + char *queue_name; /* queue name */ + char *queue_id; /* queue file */ + char *encoding; /* content encoding */ + char *sender; /* complete address */ + char *dsn_envid; /* DSN envelope ID */ + int dsn_ret; /* DSN headers/full */ + int smtputf8; /* requires unicode */ + char *verp_delims; /* VERP delimiters */ + char *filter_xport; /* filtering transport */ + char *inspect_xport; /* inspecting transport */ + char *redirect_addr; /* info@spammer.tld */ + long data_size; /* data segment size */ + long cont_length; /* message content length */ + long rcpt_offset; /* more recipients here */ + char *client_name; /* client hostname */ + char *client_addr; /* client address */ + char *client_port; /* client port */ + char *client_proto; /* client protocol */ + char *client_helo; /* helo parameter */ + char *sasl_method; /* SASL method */ + char *sasl_username; /* SASL user name */ + char *sasl_sender; /* SASL sender */ + char *log_ident; /* up-stream queue ID */ + char *rewrite_context; /* address qualification */ + RECIPIENT_LIST rcpt_list; /* complete addresses */ + int rcpt_count; /* used recipient slots */ + int rcpt_limit; /* maximum read in-core */ + int rcpt_unread; /* # of recipients left in queue file */ + QMGR_JOB_LIST job_list; /* jobs delivering this message (1 + * per transport) */ +}; + + /* + * Flags 0-15 are reserved for qmgr_user.h. + */ +#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16) + +#define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1) + +extern int qmgr_message_count; +extern int qmgr_recipient_count; +extern int qmgr_vrfy_pend_count; + +extern void qmgr_message_free(QMGR_MESSAGE *); +extern void qmgr_message_update_warn(QMGR_MESSAGE *); +extern void qmgr_message_kill_record(QMGR_MESSAGE *, long); +extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t); +extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *); + +#define QMGR_MSG_STATS(stats, message) \ + MSG_STATS_INIT2(stats, \ + incoming_arrival, message->arrival_time, \ + active_arrival, message->active_time) + + /* + * Sometimes it's required to access the transport queues and entries on per + * message basis. That's what the QMGR_JOB structure is for - it groups all + * per message information within each transport using a list of QMGR_PEER + * structures. These structures in turn correspond with per message + * QMGR_QUEUE structure and list all per message QMGR_ENTRY structures. + */ +struct QMGR_PEER_LIST { + QMGR_PEER *next; + QMGR_PEER *prev; +}; + +struct QMGR_JOB { + QMGR_MESSAGE *message; /* message delivered by this job */ + QMGR_TRANSPORT *transport; /* transport this job belongs to */ + QMGR_JOB_LIST message_peers; /* per message neighbor linkage */ + QMGR_JOB_LIST transport_peers; /* per transport neighbor linkage */ + QMGR_JOB_LIST time_peers; /* by time neighbor linkage */ + QMGR_JOB *stack_parent; /* stack parent */ + QMGR_JOB_LIST stack_children; /* all stack children */ + QMGR_JOB_LIST stack_siblings; /* stack children linkage */ + int stack_level; /* job stack nesting level (-1 means + * it's not on the lists at all) */ + int blocker_tag; /* tagged if blocks the job list */ + struct HTABLE *peer_byname; /* message job peers, indexed by + * domain */ + QMGR_PEER_LIST peer_list; /* list of message job peers */ + int slots_used; /* slots used during preemption */ + int slots_available; /* slots available for preemption (in + * multiples of slot_cost) */ + int selected_entries; /* # of entries selected for delivery + * so far */ + int read_entries; /* # of entries read in-core so far */ + int rcpt_count; /* used recipient slots */ + int rcpt_limit; /* available recipient slots */ +}; + +struct QMGR_PEER { + QMGR_JOB *job; /* job handling this peer */ + QMGR_QUEUE *queue; /* queue corresponding with this peer */ + int refcount; /* peer entries */ + QMGR_ENTRY_LIST entry_list; /* todo message entries queued for + * this peer */ + QMGR_PEER_LIST peers; /* neighbor linkage */ +}; + +extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *); +extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *); +extern void qmgr_job_blocker_update(QMGR_QUEUE *); + +extern QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *, QMGR_TRANSPORT *); +extern void qmgr_job_free(QMGR_JOB *); +extern void qmgr_job_move_limits(QMGR_JOB *); + +extern QMGR_PEER *qmgr_peer_create(QMGR_JOB *, QMGR_QUEUE *); +extern QMGR_PEER *qmgr_peer_find(QMGR_JOB *, QMGR_QUEUE *); +extern QMGR_PEER *qmgr_peer_obtain(QMGR_JOB *, QMGR_QUEUE *); +extern void qmgr_peer_free(QMGR_PEER *); + + /* + * qmgr_defer.c + */ +extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *); +extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *); +extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); + + /* + * qmgr_bounce.c + */ +extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); + + /* + * qmgr_deliver.c + */ +extern int qmgr_deliver_concurrency; +extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *); + + /* + * qmgr_active.c + */ +extern int qmgr_active_feed(QMGR_SCAN *, const char *); +extern void qmgr_active_drain(void); +extern void qmgr_active_done(QMGR_MESSAGE *); + + /* + * qmgr_move.c + */ +extern void qmgr_move(const char *, const char *, time_t); + + /* + * qmgr_enable.c + */ +extern void qmgr_enable_all(void); +extern void qmgr_enable_transport(QMGR_TRANSPORT *); +extern void qmgr_enable_queue(QMGR_QUEUE *); + + /* + * Queue scan context. + */ +struct QMGR_SCAN { + char *queue; /* queue name */ + int flags; /* private, this run */ + int nflags; /* private, next run */ + struct SCAN_DIR *handle; /* scan */ +}; + + /* + * Flags that control queue scans or destination selection. These are + * similar to the QMGR_REQ_XXX request codes. + */ +#define QMGR_SCAN_START (1<<0) /* start now/restart when done */ +#define QMGR_SCAN_ALL (1<<1) /* all queue file time stamps */ +#define QMGR_FLUSH_ONCE (1<<2) /* unthrottle once */ +#define QMGR_FLUSH_DFXP (1<<3) /* override defer_transports */ +#define QMGR_FLUSH_EACH (1<<4) /* unthrottle per message */ + + /* + * qmgr_scan.c + */ +extern QMGR_SCAN *qmgr_scan_create(const char *); +extern void qmgr_scan_request(QMGR_SCAN *, int); +extern char *qmgr_scan_next(QMGR_SCAN *); + + /* + * qmgr_error.c + */ +extern QMGR_TRANSPORT *qmgr_error_transport(const char *); +extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *); +extern char *qmgr_error_nexthop(DSN *); + +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/*--*/ diff --git a/src/qmgr/qmgr_active.c b/src/qmgr/qmgr_active.c new file mode 100644 index 0000000..49f5ae7 --- /dev/null +++ b/src/qmgr/qmgr_active.c @@ -0,0 +1,587 @@ +/*++ +/* NAME +/* qmgr_active 3 +/* SUMMARY +/* active queue management +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_active_feed(scan_info, queue_id) +/* QMGR_SCAN *scan_info; +/* const char *queue_id; +/* +/* void qmgr_active_drain() +/* +/* int qmgr_active_done(message) +/* QMGR_MESSAGE *message; +/* DESCRIPTION +/* These functions maintain the active message queue: the set +/* of messages that the queue manager is actually working on. +/* The active queue is limited in size. Messages are drained +/* from the active queue by allocating a delivery process and +/* by delivering mail via that process. Messages leak into the +/* active queue only when the active queue is small enough. +/* Damaged message files are saved to the "corrupt" directory. +/* +/* qmgr_active_feed() inserts the named message file into +/* the active queue. Message files with the wrong name or +/* with other wrong properties are skipped but not removed. +/* The following queue flags are recognized, other flags being +/* ignored: +/* .IP QMGR_SCAN_ALL +/* Examine all queue files. Normally, deferred queue files with +/* future time stamps are ignored, and incoming queue files with +/* future time stamps are frowned upon. +/* .PP +/* qmgr_active_drain() allocates one delivery process. +/* Process allocation is asynchronous. Once the delivery +/* process is available, an attempt is made to deliver +/* a message via it. Message delivery is asynchronous, too. +/* +/* qmgr_active_done() deals with a message after delivery +/* has been tried for all in-core recipients. If the message +/* was bounced, a bounce message is sent to the sender, or +/* to the Errors-To: address if one was specified. +/* If there are more on-file recipients, a new batch of +/* in-core recipients is read from the queue file. Otherwise, +/* if a delivery agent marked the queue file as corrupt, +/* the queue file is moved to the "corrupt" queue (surprise); +/* if at least one delivery failed, the message is moved +/* to the deferred queue. The time stamps of a deferred queue +/* file are set to the nearest wakeup time of its recipient +/* sites (if delivery failed due to a problem with a next-hop +/* host), are set into the future by the amount of time the +/* message was queued (per-message exponential backoff), or are set +/* into the future by a minimal backoff time, whichever is more. +/* The minimal_backoff_time parameter specifies the minimal +/* amount of time between delivery attempts; maximal_backoff_time +/* specifies an upper limit. +/* DIAGNOSTICS +/* Fatal: queue file access failures, out of memory. +/* Panic: interface violations, internal consistency errors. +/* Warnings: corrupt message file. A corrupt message is saved +/* to the "corrupt" queue for further inspection. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef S_IRWXU /* What? no POSIX system? */ +#define S_IRWXU 0700 +#endif + +/* Utility library. */ + +#include +#include +#include +#include +#include + +/* Global library. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * A bunch of call-back routines. + */ +static void qmgr_active_done_2_bounce_flush(int, void *); +static void qmgr_active_done_2_generic(QMGR_MESSAGE *); +static void qmgr_active_done_25_trace_flush(int, void *); +static void qmgr_active_done_25_generic(QMGR_MESSAGE *); +static void qmgr_active_done_3_defer_flush(int, void *); +static void qmgr_active_done_3_defer_warn(int, void *); +static void qmgr_active_done_3_generic(QMGR_MESSAGE *); + +/* qmgr_active_corrupt - move corrupted file out of the way */ + +static void qmgr_active_corrupt(const char *queue_id) +{ + const char *myname = "qmgr_active_corrupt"; + + if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) { + if (errno != ENOENT) + msg_fatal("%s: save corrupt file queue %s id %s: %m", + myname, MAIL_QUEUE_ACTIVE, queue_id); + } else { + msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"", + queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT); + } +} + +/* qmgr_active_defer - defer queue file */ + +static void qmgr_active_defer(const char *queue_name, const char *queue_id, + const char *dest_queue, int delay) +{ + const char *myname = "qmgr_active_defer"; + const char *path; + struct utimbuf tbuf; + + if (msg_verbose) + msg_info("wakeup %s after %ld secs", queue_id, (long) delay); + + tbuf.actime = tbuf.modtime = event_time() + delay; + path = mail_queue_path((VSTRING *) 0, queue_name, queue_id); + if (utime(path, &tbuf) < 0 && errno != ENOENT) + msg_fatal("%s: update %s time stamps: %m", myname, path); + if (mail_queue_rename(queue_id, queue_name, dest_queue)) { + if (errno != ENOENT) + msg_fatal("%s: rename %s from %s to %s: %m", myname, + queue_id, queue_name, dest_queue); + msg_warn("%s: rename %s from %s to %s: %m", myname, + queue_id, queue_name, dest_queue); + } else if (msg_verbose) { + msg_info("%s: defer %s", myname, queue_id); + } +} + +/* qmgr_active_feed - feed one message into active queue */ + +int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id) +{ + const char *myname = "qmgr_active_feed"; + QMGR_MESSAGE *message; + struct stat st; + const char *path; + + if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0) + msg_panic("%s: bad queue %s", myname, scan_info->queue); + if (msg_verbose) + msg_info("%s: queue %s", myname, scan_info->queue); + + /* + * Make sure this is something we are willing to open. + */ + if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO) + return (0); + + if (msg_verbose) + msg_info("%s: %s", myname, path); + + /* + * Skip files that have time stamps into the future. They need to cool + * down. Incoming and deferred files can have future time stamps. + */ + if ((scan_info->flags & QMGR_SCAN_ALL) == 0 + && st.st_mtime > time((time_t *) 0) + 1) { + if (msg_verbose) + msg_info("%s: skip %s (%ld seconds)", myname, queue_id, + (long) (st.st_mtime - event_time())); + return (0); + } + + /* + * Move the message to the active queue. File access errors are fatal. + */ + if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) { + if (errno != ENOENT) + msg_fatal("%s: %s: rename from %s to %s: %m", myname, + queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); + msg_warn("%s: %s: rename from %s to %s: %m", myname, + queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); + return (0); + } + + /* + * Extract envelope information: sender and recipients. At this point, + * mail addresses have been processed by the cleanup service so they + * should be in canonical form. Generate requests to deliver this + * message. + * + * Throwing away queue files seems bad, especially when they made it this + * far into the mail system. Therefore we save bad files to a separate + * directory for further inspection. + * + * After queue manager restart it is possible that a queue file is still + * being delivered. In that case (the file is locked), defer delivery by + * a minimal amount of time. + */ +#define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP) + + if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id, + (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? + scan_info->flags | QMGR_FLUSH_AFTER : + scan_info->flags, + (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? + st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE : + 0)) == 0) { + qmgr_active_corrupt(queue_id); + return (0); + } else if (message == QMGR_MESSAGE_LOCKED) { + qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60); + return (0); + } else { + + /* + * Special case if all recipients were already delivered. Send any + * bounces and clean up. + */ + if (message->refcount == 0) + qmgr_active_done(message); + return (1); + } +} + +/* qmgr_active_done - dispose of message after recipients have been tried */ + +void qmgr_active_done(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_active_done"; + struct stat st; + + if (msg_verbose) + msg_info("%s: %s", myname, message->queue_id); + + /* + * During a previous iteration, an attempt to bounce this message may + * have failed, so there may still be a bounce log lying around. XXX By + * groping around in the bounce queue, we're trespassing on the bounce + * service's territory. But doing so is more robust than depending on the + * bounce daemon to do the lookup for us, and for us to do the deleting + * after we have received a successful status from the bounce service. + * The bounce queue directory blocks are most likely in memory anyway. If + * these lookups become a performance problem we will have to build an + * in-core cache into the bounce daemon. + * + * Don't bounce when the bounce log is empty. The bounce process obviously + * failed, and the delivery agent will have requested that the message be + * deferred. + * + * Bounces are sent asynchronously to avoid stalling while the cleanup + * daemon waits for the qmgr to accept the "new mail" trigger. + * + * See also code in cleanup_bounce.c. + */ + if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) { + if (st.st_size == 0) { + if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id)) + msg_fatal("remove %s %s: %m", + MAIL_QUEUE_BOUNCE, message->queue_id); + } else { + if (msg_verbose) + msg_info("%s: bounce %s", myname, message->queue_id); + if (message->verp_delims == 0 || var_verp_bounce_off) + abounce_flush(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_2_bounce_flush, + (void *) message); + else + abounce_flush_verp(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + message->verp_delims, + qmgr_active_done_2_bounce_flush, + (void *) message); + return; + } + } + + /* + * Asynchronous processing does not reach this point. + */ + qmgr_active_done_2_generic(message); +} + +/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */ + +static void qmgr_active_done_2_bounce_flush(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process abounce_flush() status and continue processing. + */ + message->flags |= status; + qmgr_active_done_2_generic(message); +} + +/* qmgr_active_done_2_generic - continue processing */ + +static void qmgr_active_done_2_generic(QMGR_MESSAGE *message) +{ + const char *path; + struct stat st; + + /* + * A delivery agent marks a queue file as corrupt by changing its + * attributes, and by pretending that delivery was deferred. + */ + if (message->flags + && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) { + qmgr_active_corrupt(message->queue_id); + qmgr_message_free(message); + return; + } + + /* + * If we did not read all recipients from this file, go read some more, + * but remember whether some recipients have to be tried again. + * + * Throwing away queue files seems bad, especially when they made it this + * far into the mail system. Therefore we save bad files to a separate + * directory for further inspection by a human being. + */ + if (message->rcpt_offset > 0) { + if (qmgr_message_realloc(message) == 0) { + qmgr_active_corrupt(message->queue_id); + qmgr_message_free(message); + } else { + if (message->refcount == 0) + qmgr_active_done(message); /* recurse for consistency */ + } + return; + } + + /* + * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS + * and others not. Depending on what subset of recipients are delivered, + * a trace file may or may not be created. Even when the last partial + * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may + * still exist from a previous partial delivery attempt. So as long as + * any recipient has NOTIFY=SUCCESS we have to always look for the trace + * file and be prepared for the file not to exist. + * + * See also comments in bounce/bounce_notify_util.c. + */ + if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD + | DEL_REQ_FLAG_REC_DLY_SENT)) + || (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) { + atrace_flush(message->tflags, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_25_trace_flush, + (void *) message); + return; + } + + /* + * Asynchronous processing does not reach this point. + */ + qmgr_active_done_25_generic(message); +} + +/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */ + +static void qmgr_active_done_25_trace_flush(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process atrace_flush() status and continue processing. + */ + if (status == 0 && message->tflags_offset) + qmgr_message_kill_record(message, message->tflags_offset); + message->flags |= status; + qmgr_active_done_25_generic(message); +} + +/* qmgr_active_done_25_generic - continue processing */ + +static void qmgr_active_done_25_generic(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_active_done_25_generic"; + + /* + * If we get to this point we have tried all recipients for this message. + * If the message is too old, try to bounce it. + * + * Bounces are sent asynchronously to avoid stalling while the cleanup + * daemon waits for the qmgr to accept the "new mail" trigger. + */ + if (message->flags) { + if (event_time() >= message->create_time + + (*message->sender ? var_max_queue_time : var_dsn_queue_time)) { + msg_info("%s: from=<%s>, status=expired, returned to sender", + message->queue_id, message->sender); + if (message->verp_delims == 0 || var_verp_bounce_off) + adefer_flush(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_3_defer_flush, + (void *) message); + else + adefer_flush_verp(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + message->verp_delims, + qmgr_active_done_3_defer_flush, + (void *) message); + return; + } else if (message->warn_time > 0 + && event_time() >= message->warn_time - 1) { + if (msg_verbose) + msg_info("%s: sending defer warning for %s", myname, message->queue_id); + adefer_warn(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_3_defer_warn, + (void *) message); + return; + } + } + + /* + * Asynchronous processing does not reach this point. + */ + qmgr_active_done_3_generic(message); +} + +/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */ + +static void qmgr_active_done_3_defer_warn(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process adefer_warn() completion status and continue processing. + */ + if (status == 0) + qmgr_message_update_warn(message); + qmgr_active_done_3_generic(message); +} + +/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */ + +static void qmgr_active_done_3_defer_flush(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process adefer_flush() status and continue processing. + */ + message->flags = status; + qmgr_active_done_3_generic(message); +} + +/* qmgr_active_done_3_generic - continue processing */ + +static void qmgr_active_done_3_generic(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_active_done_3_generic"; + int delay; + + /* + * Some recipients need to be tried again. Move the queue file time + * stamps into the future by the amount of time that the message is + * delayed, and move the message to the deferred queue. Impose minimal + * and maximal backoff times. + * + * Since we look at actual time in queue, not time since last delivery + * attempt, backoff times will be distributed. However, we can still see + * spikes in delivery activity because the interval between deferred + * queue scans is finite. + */ + if (message->flags) { + if (message->create_time > 0) { + delay = event_time() - message->create_time; + if (delay > var_max_backoff_time) + delay = var_max_backoff_time; + if (delay < var_min_backoff_time) + delay = var_min_backoff_time; + } else { + delay = var_min_backoff_time; + } + qmgr_active_defer(message->queue_name, message->queue_id, + MAIL_QUEUE_DEFERRED, delay); + } + + /* + * All recipients done. Remove the queue file. + */ + else { + if (mail_queue_remove(message->queue_name, message->queue_id)) { + if (errno != ENOENT) + msg_fatal("%s: remove %s from %s: %m", myname, + message->queue_id, message->queue_name); + msg_warn("%s: remove %s from %s: %m", myname, + message->queue_id, message->queue_name); + } else { + /* Same format as logged by postsuper. */ + msg_info("%s: removed", message->queue_id); + } + } + + /* + * Finally, delete the in-core message structure. + */ + qmgr_message_free(message); +} + +/* qmgr_active_drain - drain active queue by allocating a delivery process */ + +void qmgr_active_drain(void) +{ + QMGR_TRANSPORT *transport; + + /* + * Allocate one delivery process for every transport with pending mail. + * The process allocation completes asynchronously. + */ + while ((transport = qmgr_transport_select()) != 0) { + if (msg_verbose) + msg_info("qmgr_active_drain: allocate %s", transport->name); + qmgr_transport_alloc(transport, qmgr_deliver); + } +} diff --git a/src/qmgr/qmgr_bounce.c b/src/qmgr/qmgr_bounce.c new file mode 100644 index 0000000..00ba885 --- /dev/null +++ b/src/qmgr/qmgr_bounce.c @@ -0,0 +1,71 @@ +/*++ +/* NAME +/* qmgr_bounce +/* SUMMARY +/* deal with mail that will not be delivered +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_QUEUE *qmgr_bounce_recipient(message, recipient, dsn) +/* QMGR_MESSAGE *message; +/* RECIPIENT *recipient; +/* DSN *dsn; +/* DESCRIPTION +/* qmgr_bounce_recipient() produces a bounce log record. +/* Once the bounce record is written successfully, the recipient +/* is marked as done. When the bounce record cannot be written, +/* the message structure is updated to reflect that the mail is +/* deferred. +/* +/* Arguments: +/* .IP message +/* Open queue file with the message being bounced. +/* .IP recipient +/* The recipient that will not be delivered. +/* .IP dsn +/* Delivery status information. See dsn(3). +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +/* Global library. */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_bounce_recipient - bounce one message recipient */ + +void qmgr_bounce_recipient(QMGR_MESSAGE *message, RECIPIENT *recipient, + DSN *dsn) +{ + MSG_STATS stats; + int status; + + status = bounce_append(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", dsn); + + if (status == 0) + deliver_completed(message->fp, recipient->offset); + else + message->flags |= status; +} diff --git a/src/qmgr/qmgr_defer.c b/src/qmgr/qmgr_defer.c new file mode 100644 index 0000000..79615cc --- /dev/null +++ b/src/qmgr/qmgr_defer.c @@ -0,0 +1,163 @@ +/*++ +/* NAME +/* qmgr_defer +/* SUMMARY +/* deal with mail that must be delivered later +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_defer_recipient(message, recipient, dsn) +/* QMGR_MESSAGE *message; +/* RECIPIENT *recipient; +/* DSN *dsn; +/* +/* void qmgr_defer_todo(queue, dsn) +/* QMGR_QUEUE *queue; +/* DSN *dsn; +/* +/* void qmgr_defer_transport(transport, dsn) +/* QMGR_TRANSPORT *transport; +/* DSN *dsn; +/* DESCRIPTION +/* qmgr_defer_recipient() defers delivery of the named message to +/* the named recipient. It updates the message structure and writes +/* a log entry. +/* +/* qmgr_defer_todo() iterates over all "todo" deliveries queued for +/* the named site, and calls qmgr_defer_recipient() for each recipient +/* found. Side effects caused by qmgr_entry_done(), qmgr_queue_done(), +/* and by qmgr_active_done(): in-core queue entries will disappear, +/* in-core queues may disappear, in-core and on-disk messages may +/* disappear, bounces may be sent, new in-core queues, queue entries +/* and recipients may appear. +/* +/* qmgr_defer_transport() calls qmgr_defer_todo() for each queue +/* that depends on the named transport. See there for side effects. +/* +/* Arguments: +/* .IP recipient +/* A recipient address; used for logging purposes, and for updating +/* the message-specific \fIdefer\fR log. +/* .IP queue +/* Specifies a queue with delivery requests for a specific next-hop +/* host (or local user). +/* .IP transport +/* Specifies a message delivery transport. +/* .IP dsn +/* See dsn(3). +/* BUGS +/* The side effects of calling this routine are quite dramatic. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +#include +#include + +/* Global library. */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_defer_transport - defer todo entries for named transport */ + +void qmgr_defer_transport(QMGR_TRANSPORT *transport, DSN *dsn) +{ + QMGR_QUEUE *queue; + QMGR_QUEUE *next; + + if (msg_verbose) + msg_info("defer transport %s: %s %s", + transport->name, dsn->status, dsn->reason); + + /* + * Proceed carefully. Queues may disappear as a side effect. + */ + for (queue = transport->queue_list.next; queue; queue = next) { + next = queue->peers.next; + qmgr_defer_todo(queue, dsn); + } +} + +/* qmgr_defer_todo - defer all todo queue entries for specific site */ + +void qmgr_defer_todo(QMGR_QUEUE *queue, DSN *dsn) +{ + QMGR_ENTRY *entry; + QMGR_ENTRY *next; + QMGR_MESSAGE *message; + RECIPIENT *recipient; + int nrcpt; + QMGR_QUEUE *retry_queue; + + /* + * Sanity checks. + */ + if (msg_verbose) + msg_info("defer site %s: %s %s", + queue->name, dsn->status, dsn->reason); + + /* + * See if we can redirect the deliveries to the retry(8) delivery agent, + * so that they can be handled asynchronously. If the retry(8) service is + * unavailable, use the synchronous defer(8) server. With a large todo + * queue, this blocks the queue manager for a significant time. + */ + retry_queue = qmgr_error_queue(MAIL_SERVICE_RETRY, dsn); + + /* + * Proceed carefully. Queue entries may disappear as a side effect. + */ + for (entry = queue->todo.next; entry != 0; entry = next) { + next = entry->queue_peers.next; + if (retry_queue != 0) { + qmgr_entry_move_todo(retry_queue, entry); + continue; + } + message = entry->message; + for (nrcpt = 0; nrcpt < entry->rcpt_list.len; nrcpt++) { + recipient = entry->rcpt_list.info + nrcpt; + qmgr_defer_recipient(message, recipient, dsn); + } + qmgr_entry_done(entry, QMGR_QUEUE_TODO); + } +} + +/* qmgr_defer_recipient - defer delivery of specific recipient */ + +void qmgr_defer_recipient(QMGR_MESSAGE *message, RECIPIENT *recipient, + DSN *dsn) +{ + MSG_STATS stats; + + /* + * Update the message structure and log the message disposition. + */ + message->flags |= defer_append(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", dsn); +} diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c new file mode 100644 index 0000000..fe4cf0c --- /dev/null +++ b/src/qmgr/qmgr_deliver.c @@ -0,0 +1,458 @@ +/*++ +/* NAME +/* qmgr_deliver 3 +/* SUMMARY +/* deliver one per-site queue entry to that site +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_deliver_concurrency; +/* +/* int qmgr_deliver(transport, fp) +/* QMGR_TRANSPORT *transport; +/* VSTREAM *fp; +/* DESCRIPTION +/* This module implements the client side of the `queue manager +/* to delivery agent' protocol. The queue manager uses +/* asynchronous I/O so that it can drive multiple delivery +/* agents in parallel. Depending on the outcome of a delivery +/* attempt, the status of messages, queues and transports is +/* updated. +/* +/* qmgr_deliver_concurrency is a global counter that says how +/* many delivery processes are in use. This can be used, for +/* example, to control the size of the `active' message queue. +/* +/* qmgr_deliver() executes when a delivery process announces its +/* availability for the named transport. It arranges for delivery +/* of a suitable queue entry. The \fIfp\fR argument specifies a +/* stream that is connected to a delivery process, or a null +/* pointer if the transport accepts no connection. Upon completion +/* of delivery (successful or not), the stream is closed, so that the +/* delivery process is released. +/* DIAGNOSTICS +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include +#include +#include + +/* Utility library. */ + +#include +#include +#include +#include +#include +#include +#include +#include + +/* Global library. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * Important note on the _transport_rate_delay implementation: after + * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all + * code paths must directly or indirectly invoke qmgr_transport_unthrottle() + * or qmgr_transport_throttle(). Otherwise, transports with non-zero + * _transport_rate_delay will become stuck. + */ + +int qmgr_deliver_concurrency; + + /* + * Message delivery status codes. + */ +#define DELIVER_STAT_OK 0 /* all recipients delivered */ +#define DELIVER_STAT_DEFER 1 /* try some recipients later */ +#define DELIVER_STAT_CRASH 2 /* mailer internal problem */ + +/* qmgr_deliver_initial_reply - retrieve initial delivery process response */ + +static int qmgr_deliver_initial_reply(VSTREAM *stream) +{ + int stat; + + if (peekfd(vstream_fileno(stream)) < 0) { + msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else if (attr_scan(stream, ATTR_FLAG_STRICT, + RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat), + ATTR_TYPE_END) != 1) { + msg_warn("%s: malformed response", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else { + return (stat ? DELIVER_STAT_DEFER : 0); + } +} + +/* qmgr_deliver_final_reply - retrieve final delivery process response */ + +static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb) +{ + int stat; + + if (peekfd(vstream_fileno(stream)) < 0) { + msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else if (attr_scan(stream, ATTR_FLAG_STRICT, + RECV_ATTR_FUNC(dsb_scan, (void *) dsb), + RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat), + ATTR_TYPE_END) != 2) { + msg_warn("%s: malformed response", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else { + return (stat ? DELIVER_STAT_DEFER : 0); + } +} + +/* qmgr_deliver_send_request - send delivery request to delivery process */ + +static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream) +{ + RECIPIENT_LIST list = entry->rcpt_list; + RECIPIENT *recipient; + QMGR_MESSAGE *message = entry->message; + VSTRING *sender_buf = 0; + MSG_STATS stats; + char *sender; + int flags; + int smtputf8 = message->smtputf8; + const char *addr; + + /* + * Todo: integrate with code up-stream that builds the delivery request. + */ + for (recipient = list.info; recipient < list.info + list.len; recipient++) + if (var_smtputf8_enable && (addr = recipient->address)[0] + && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) { + smtputf8 |= SMTPUTF8_FLAG_RECIPIENT; + if (message->verp_delims) + smtputf8 |= SMTPUTF8_FLAG_SENDER; + } + + /* + * If variable envelope return path is requested, change prefix+@origin + * into prefix+user=domain@origin. Note that with VERP there is only one + * recipient per delivery. + */ + if (message->verp_delims == 0) { + sender = message->sender; + } else { + sender_buf = vstring_alloc(100); + verp_sender(sender_buf, message->verp_delims, + message->sender, list.info); + sender = vstring_str(sender_buf); + } + + flags = message->tflags + | entry->queue->dflags + | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT); + (void) QMGR_MSG_STATS(&stats, message); + attr_print(stream, ATTR_FLAG_NONE, + SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags), + SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name), + SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id), + SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset), + SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length), + SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop), + SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding), + SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8), + SEND_ATTR_STR(MAIL_ATTR_SENDER, sender), + SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid), + SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret), + SEND_ATTR_FUNC(msg_stats_print, (void *) &stats), + /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ + SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name), + SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr), + SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port), + SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto), + SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo), + /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ + SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method), + SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username), + SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender), + /* XXX Ditto if we want to pass TLS certificate info. */ + SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident), + SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context), + SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len), + ATTR_TYPE_END); + if (sender_buf != 0) + vstring_free(sender_buf); + for (recipient = list.info; recipient < list.info + list.len; recipient++) + attr_print(stream, ATTR_FLAG_NONE, + SEND_ATTR_FUNC(rcpt_print, (void *) recipient), + ATTR_TYPE_END); + if (vstream_fflush(stream) != 0) { + msg_warn("write to process (%s): %m", entry->queue->transport->name); + return (-1); + } else { + if (msg_verbose) + msg_info("qmgr_deliver: site `%s'", entry->queue->name); + return (0); + } +} + +/* qmgr_deliver_abort - transport response watchdog */ + +static void qmgr_deliver_abort(int unused_event, void *context) +{ + QMGR_ENTRY *entry = (QMGR_ENTRY *) context; + QMGR_QUEUE *queue = entry->queue; + QMGR_TRANSPORT *transport = queue->transport; + QMGR_MESSAGE *message = entry->message; + + msg_fatal("%s: timeout receiving delivery status from transport: %s", + message->queue_id, transport->name); +} + +/* qmgr_deliver_update - process delivery status report */ + +static void qmgr_deliver_update(int unused_event, void *context) +{ + QMGR_ENTRY *entry = (QMGR_ENTRY *) context; + QMGR_QUEUE *queue = entry->queue; + QMGR_TRANSPORT *transport = queue->transport; + QMGR_MESSAGE *message = entry->message; + static DSN_BUF *dsb; + int status; + + /* + * Release the delivery agent from a "hot" queue entry. + */ +#define QMGR_DELIVER_RELEASE_AGENT(entry) do { \ + event_disable_readwrite(vstream_fileno(entry->stream)); \ + (void) vstream_fclose(entry->stream); \ + entry->stream = 0; \ + qmgr_deliver_concurrency--; \ + } while (0) + + if (dsb == 0) + dsb = dsb_create(); + + /* + * The message transport has responded. Stop the watchdog timer. + */ + event_cancel_timer(qmgr_deliver_abort, context); + + /* + * Retrieve the delivery agent status report. The numerical status code + * indicates if delivery should be tried again. The reason text is sent + * only when a site should be avoided for a while, so that the queue + * manager can log why it does not even try to schedule delivery to the + * affected recipients. + */ + status = qmgr_deliver_final_reply(entry->stream, dsb); + + /* + * The mail delivery process failed for some reason (although delivery + * may have been successful). Back off with this transport type for a + * while. Dispose of queue entries for this transport that await + * selection (the todo lists). Stay away from queue entries that have + * been selected (the busy lists), or we would have dangling pointers. + * The queue itself won't go away before we dispose of the current queue + * entry. + */ + if (status == DELIVER_STAT_CRASH) { + message->flags |= DELIVER_STAT_DEFER; +#if 0 + whatsup = concatenate("unknown ", transport->name, + " mail transport error", (char *) 0); + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup)); + myfree(whatsup); +#else + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsb->dsn, "4.3.0", + "unknown mail transport error")); +#endif + msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description", + transport->name); + + /* + * Assume the worst and write a defer logfile record for each + * recipient. This omission was already present in the first queue + * manager implementation of 199703, and was fixed 200511. + * + * To avoid the synchronous qmgr_defer_recipient() operation for each + * recipient of this queue entry, release the delivery process and + * move the entry back to the todo queue. Let qmgr_defer_transport() + * log the recipient asynchronously if possible, and get out of here. + * Note: if asynchronous logging is not possible, + * qmgr_defer_transport() eventually invokes qmgr_entry_done() and + * the entry becomes a dangling pointer. + */ + QMGR_DELIVER_RELEASE_AGENT(entry); + qmgr_entry_unselect(entry); + qmgr_defer_transport(transport, &dsb->dsn); + return; + } + + /* + * This message must be tried again. + * + * If we have a problem talking to this site, back off with this site for a + * while; dispose of queue entries for this site that await selection + * (the todo list); stay away from queue entries that have been selected + * (the busy list), or we would have dangling pointers. The queue itself + * won't go away before we dispose of the current queue entry. + * + * XXX Caution: DSN_COPY() will panic on empty status or reason. + */ +#define SUSPENDED "delivery temporarily suspended: " + + if (status == DELIVER_STAT_DEFER) { + message->flags |= DELIVER_STAT_DEFER; + if (VSTRING_LEN(dsb->status)) { + /* Sanitize the DSN status/reason from the delivery agent. */ + if (!dsn_valid(vstring_str(dsb->status))) + vstring_strcpy(dsb->status, "4.0.0"); + if (VSTRING_LEN(dsb->reason) == 0) + vstring_strcpy(dsb->reason, "unknown error"); + vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1); + if (QMGR_QUEUE_READY(queue)) { + qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb)); + if (QMGR_QUEUE_THROTTLED(queue)) + qmgr_defer_todo(queue, &dsb->dsn); + } + } + } + + /* + * No problems detected. Mark the transport and queue as alive. The queue + * itself won't go away before we dispose of the current queue entry. + */ + if (status != DELIVER_STAT_CRASH) { + qmgr_transport_unthrottle(transport); + if (VSTRING_LEN(dsb->reason) == 0) + qmgr_queue_unthrottle(queue); + } + + /* + * Release the delivery process, and give some other queue entry a chance + * to be delivered. When all recipients for a message have been tried, + * decide what to do next with this message: defer, bounce, delete. + */ + QMGR_DELIVER_RELEASE_AGENT(entry); + qmgr_entry_done(entry, QMGR_QUEUE_BUSY); +} + +/* qmgr_deliver - deliver one per-site queue entry */ + +void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream) +{ + QMGR_ENTRY *entry; + DSN dsn; + + /* + * Find out if this delivery process is really available. Once elected, + * the delivery process is supposed to express its happiness. If there is + * a problem, wipe the pending deliveries for this transport. This + * routine runs in response to an external event, so it does not run + * while some other queue manipulation is happening. + */ + if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) { +#if 0 + whatsup = concatenate(transport->name, + " mail transport unavailable", (char *) 0); + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", whatsup)); + myfree(whatsup); +#else + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", + "mail transport unavailable")); +#endif + qmgr_defer_transport(transport, &dsn); + if (stream) + (void) vstream_fclose(stream); + return; + } + + /* + * Find a suitable queue entry. Things may have changed since this + * transport was allocated. If no suitable entry is found, + * unceremoniously disconnect from the delivery process. The delivery + * agent request reading routine is prepared for the queue manager to + * change its mind for no apparent reason. + */ + if ((entry = qmgr_job_entry_select(transport)) == 0) { + (void) vstream_fclose(stream); + return; + } + + /* + * Send the queue file info and recipient info to the delivery process. + * If there is a problem, wipe the pending deliveries for this transport. + * This routine runs in response to an external event, so it does not run + * while some other queue manipulation is happening. + */ + if (qmgr_deliver_send_request(entry, stream) < 0) { + qmgr_entry_unselect(entry); +#if 0 + whatsup = concatenate(transport->name, + " mail transport unavailable", (char *) 0); + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", whatsup)); + myfree(whatsup); +#else + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", + "mail transport unavailable")); +#endif + qmgr_defer_transport(transport, &dsn); + /* warning: entry may be a dangling pointer here */ + (void) vstream_fclose(stream); + return; + } + + /* + * If we get this far, go wait for the delivery status report. + */ + qmgr_deliver_concurrency++; + entry->stream = stream; + event_enable_read(vstream_fileno(stream), + qmgr_deliver_update, (void *) entry); + + /* + * Guard against broken systems. + */ + event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout); +} diff --git a/src/qmgr/qmgr_enable.c b/src/qmgr/qmgr_enable.c new file mode 100644 index 0000000..a35e46e --- /dev/null +++ b/src/qmgr/qmgr_enable.c @@ -0,0 +1,107 @@ +/*++ +/* NAME +/* qmgr_enable +/* SUMMARY +/* enable dead transports or sites +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_enable_queue(queue) +/* QMGR_QUEUE *queue; +/* +/* QMGR_QUEUE *qmgr_enable_transport(transport) +/* QMGR_TRANSPORT *transport; +/* +/* void qmgr_enable_all(void) +/* DESCRIPTION +/* This module purges dead in-core state information, effectively +/* re-enabling delivery. +/* +/* qmgr_enable_queue() enables deliveries to the named dead site. +/* Empty queues are destroyed. The existed solely to indicate that +/* a site is dead. +/* +/* qmgr_enable_transport() enables deliveries via the specified +/* transport, and calls qmgr_enable_queue() for each destination +/* on that transport. Empty queues are destroyed. +/* +/* qmgr_enable_all() enables all transports and queues. +/* See above for the side effects caused by doing this. +/* BUGS +/* The side effects of calling this module can be quite dramatic. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_enable_all - enable transports and queues */ + +void qmgr_enable_all(void) +{ + QMGR_TRANSPORT *xport; + + if (msg_verbose) + msg_info("qmgr_enable_all"); + + /* + * The number of transports does not change as a side effect, so this can + * be a straightforward loop. + */ + for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) + qmgr_enable_transport(xport); +} + +/* qmgr_enable_transport - defer todo entries for named transport */ + +void qmgr_enable_transport(QMGR_TRANSPORT *transport) +{ + QMGR_QUEUE *queue; + QMGR_QUEUE *next; + + /* + * Proceed carefully. Queues may disappear as a side effect. + */ + if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) { + if (msg_verbose) + msg_info("enable transport %s", transport->name); + qmgr_transport_unthrottle(transport); + } + for (queue = transport->queue_list.next; queue; queue = next) { + next = queue->peers.next; + qmgr_enable_queue(queue); + } +} + +/* qmgr_enable_queue - enable and possibly delete queue */ + +void qmgr_enable_queue(QMGR_QUEUE *queue) +{ + if (QMGR_QUEUE_THROTTLED(queue)) { + if (msg_verbose) + msg_info("enable site %s/%s", queue->transport->name, queue->name); + qmgr_queue_unthrottle(queue); + } + if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) + qmgr_queue_done(queue); +} diff --git a/src/qmgr/qmgr_entry.c b/src/qmgr/qmgr_entry.c new file mode 100644 index 0000000..61b3434 --- /dev/null +++ b/src/qmgr/qmgr_entry.c @@ -0,0 +1,452 @@ +/*++ +/* NAME +/* qmgr_entry 3 +/* SUMMARY +/* per-site queue entries +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_ENTRY *qmgr_entry_create(peer, message) +/* QMGR_PEER *peer; +/* QMGR_MESSAGE *message; +/* +/* void qmgr_entry_done(entry, which) +/* QMGR_ENTRY *entry; +/* int which; +/* +/* QMGR_ENTRY *qmgr_entry_select(queue) +/* QMGR_QUEUE *queue; +/* +/* void qmgr_entry_unselect(queue, entry) +/* QMGR_QUEUE *queue; +/* QMGR_ENTRY *entry; +/* +/* void qmgr_entry_move_todo(dst, entry) +/* QMGR_QUEUE *dst; +/* QMGR_ENTRY *entry; +/* DESCRIPTION +/* These routines add/delete/manipulate per-site message +/* delivery requests. +/* +/* qmgr_entry_create() creates an entry for the named peer and message, +/* and appends the entry to the peer's list and its queue's todo list. +/* Filling in and cleaning up the recipients is the responsibility +/* of the caller. +/* +/* qmgr_entry_done() discards a per-site queue entry. The +/* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry +/* of the site's `busy' list (i.e. queue entries that have been +/* selected for actual delivery), or QMGR_QUEUE_TODO for an entry +/* of the site's `todo' list (i.e. queue entries awaiting selection +/* for actual delivery). +/* +/* qmgr_entry_done() discards its peer structure when the peer +/* is not referenced anymore. +/* +/* qmgr_entry_done() triggers cleanup of the per-site queue when +/* the site has no pending deliveries, and the site is either +/* alive, or the site is dead and the number of in-core queues +/* exceeds a configurable limit (see qmgr_queue_done()). +/* +/* qmgr_entry_done() triggers special action when the last in-core +/* queue entry for a message is done with: either read more +/* recipients from the queue file, delete the queue file, or move +/* the queue file to the deferred queue; send bounce reports to the +/* message originator (see qmgr_active_done()). +/* +/* qmgr_entry_select() selects first entry from the named +/* per-site queue's `todo' list for actual delivery. The entry is +/* moved to the queue's `busy' list: the list of messages being +/* delivered. The entry is also removed from its peer list. +/* +/* qmgr_entry_unselect() takes the named entry off the named +/* per-site queue's `busy' list and moves it to the queue's +/* `todo' list. The entry is also prepended to its peer list again. +/* +/* qmgr_entry_move_todo() moves the specified "todo" queue entry +/* to the specified "todo" queue. +/* DIAGNOSTICS +/* Panic: interface violations, internal inconsistencies. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/*--*/ + +/* System library. */ + +#include +#include +#include + +/* Utility library. */ + +#include +#include +#include +#include + +/* Global library. */ + +#include +#include /* opportunistic session caching */ + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_entry_select - select queue entry for delivery */ + +QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *peer) +{ + const char *myname = "qmgr_entry_select"; + QMGR_ENTRY *entry; + QMGR_QUEUE *queue; + + if ((entry = peer->entry_list.next) != 0) { + queue = entry->queue; + QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers); + queue->todo_refcount--; + QMGR_LIST_APPEND(queue->busy, entry, queue_peers); + queue->busy_refcount++; + QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers); + peer->job->selected_entries++; + + /* + * With opportunistic session caching, the delivery agent must not + * only 1) save a session upon completion, but also 2) reuse a cached + * session upon the next delivery request. In order to not miss out + * on 2), we have to make caching sticky or else we get silly + * behavior when the in-memory queue drains. Specifically, new + * connections must not be made as long as cached connections exist. + * + * Safety: don't enable opportunistic session caching unless the queue + * manager is able to schedule concurrent or back-to-back deliveries + * (we need to recognize back-to-back deliveries for transports with + * concurrency 1). + * + * If caching has previously been enabled, but is not now, fetch any + * existing entries from the cache, but don't add new ones. + */ +#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \ + (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY()) + +#define BACK_TO_BACK_DELIVERY() \ + (queue->last_done + 1 >= event_time()) + + /* + * Turn on session caching after we get up to speed. Don't enable + * session caching just because we have concurrent deliveries. This + * prevents unnecessary session caching when we have a burst of mail + * <= the initial concurrency limit. + */ + if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) { + if (BACK_TO_BACK_DELIVERY()) { + if (msg_verbose) + msg_info("%s: allowing on-demand session caching for %s", + myname, queue->name); + queue->dflags |= DEL_REQ_FLAG_CONN_MASK; + } + } + + /* + * Turn off session caching when concurrency drops and we're running + * out of steam. This is what prevents from turning off session + * caching too early, and from making new connections while old ones + * are still cached. + */ + else { + if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) { + if (msg_verbose) + msg_info("%s: disallowing on-demand session caching for %s", + myname, queue->name); + queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE; + } + } + } + return (entry); +} + +/* qmgr_entry_unselect - unselect queue entry for delivery */ + +void qmgr_entry_unselect(QMGR_ENTRY *entry) +{ + QMGR_PEER *peer = entry->peer; + QMGR_QUEUE *queue = entry->queue; + + /* + * Move the entry back to the todo lists. In case of the peer list, put + * it back to the beginning, so the select()/unselect() does not reorder + * entries. We use this in qmgr_message_assign() to put recipients into + * existing entries when possible. + */ + QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers); + queue->busy_refcount--; + QMGR_LIST_APPEND(queue->todo, entry, queue_peers); + queue->todo_refcount++; + QMGR_LIST_PREPEND(peer->entry_list, entry, peer_peers); + peer->job->selected_entries--; +} + +/* qmgr_entry_move_todo - move entry between todo queues */ + +void qmgr_entry_move_todo(QMGR_QUEUE *dst_queue, QMGR_ENTRY *entry) +{ + const char *myname = "qmgr_entry_move_todo"; + QMGR_TRANSPORT *dst_transport = dst_queue->transport; + QMGR_MESSAGE *message = entry->message; + QMGR_QUEUE *src_queue = entry->queue; + QMGR_PEER *dst_peer, *src_peer = entry->peer; + QMGR_JOB *dst_job, *src_job = src_peer->job; + QMGR_ENTRY *new_entry; + int rcpt_count = entry->rcpt_list.len; + + if (entry->stream != 0) + msg_panic("%s: queue %s entry is busy", myname, src_queue->name); + if (QMGR_QUEUE_THROTTLED(dst_queue)) + msg_panic("%s: destination queue %s is throttled", myname, dst_queue->name); + if (QMGR_TRANSPORT_THROTTLED(dst_transport)) + msg_panic("%s: destination transport %s is throttled", + myname, dst_transport->name); + + /* + * Create new entry, swap the recipients between the two entries, + * adjusting the job counters accordingly, then dispose of the old entry. + * + * Note that qmgr_entry_done() will also take care of adjusting the + * recipient limits of all the message jobs, so we do not have to do that + * explicitly for the new job here. + * + * XXX This does not enforce the per-entry recipient limit, but that is not + * a problem as long as qmgr_entry_move_todo() is called only to bounce + * or defer mail. + */ + dst_job = qmgr_job_obtain(message, dst_transport); + dst_peer = qmgr_peer_obtain(dst_job, dst_queue); + + new_entry = qmgr_entry_create(dst_peer, message); + + recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list); + + src_job->rcpt_count -= rcpt_count; + dst_job->rcpt_count += rcpt_count; + + qmgr_entry_done(entry, QMGR_QUEUE_TODO); +} + +/* qmgr_entry_done - dispose of queue entry */ + +void qmgr_entry_done(QMGR_ENTRY *entry, int which) +{ + const char *myname = "qmgr_entry_done"; + QMGR_QUEUE *queue = entry->queue; + QMGR_MESSAGE *message = entry->message; + QMGR_PEER *peer = entry->peer; + QMGR_JOB *sponsor, *job = peer->job; + QMGR_TRANSPORT *transport = job->transport; + + /* + * Take this entry off the in-core queue. + */ + if (entry->stream != 0) + msg_panic("%s: file is open", myname); + if (which == QMGR_QUEUE_BUSY) { + QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers); + queue->busy_refcount--; + } else if (which == QMGR_QUEUE_TODO) { + QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers); + job->selected_entries++; + QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers); + queue->todo_refcount--; + } else { + msg_panic("%s: bad queue spec: %d", myname, which); + } + + /* + * Decrease the in-core recipient counts and free the recipient list and + * the structure itself. + */ + job->rcpt_count -= entry->rcpt_list.len; + message->rcpt_count -= entry->rcpt_list.len; + qmgr_recipient_count -= entry->rcpt_list.len; + recipient_list_free(&entry->rcpt_list); + myfree((void *) entry); + + /* + * Make sure that the transport of any retired or finishing job that + * donated recipient slots to this message gets them back first. Then, if + * possible, pass the remaining unused recipient slots to the next job on + * the job list. + */ + for (sponsor = message->job_list.next; sponsor; sponsor = sponsor->message_peers.next) { + if (sponsor->rcpt_count >= sponsor->rcpt_limit || sponsor == job) + continue; + if (sponsor->stack_level < 0 || message->rcpt_offset == 0) + qmgr_job_move_limits(sponsor); + } + if (message->rcpt_offset == 0) { + qmgr_job_move_limits(job); + } + + /* + * We implement a rate-limited queue by emulating a slow delivery + * channel. We insert the artificial delays with qmgr_queue_suspend(). + * + * When a queue is suspended, we must postpone any job scheduling decisions + * until the queue is resumed. Otherwise, we make those decisions now. + * The job scheduling decisions are made by qmgr_job_blocker_update(). + */ + if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) { + if (queue->window > 1) + msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", + myname, transport->name, queue->name, queue->window); + if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue)) + qmgr_queue_suspend(queue, transport->rate_delay); + } + if (!QMGR_QUEUE_SUSPENDED(queue) + && queue->blocker_tag == transport->blocker_tag) + qmgr_job_blocker_update(queue); + + /* + * When there are no more entries for this peer, discard the peer + * structure. + */ + peer->refcount--; + if (peer->refcount == 0) + qmgr_peer_free(peer); + + /* + * Maintain back-to-back delivery status. + */ + if (which == QMGR_QUEUE_BUSY) + queue->last_done = event_time(); + + /* + * When the in-core queue for this site is empty and when this site is + * not dead or suspended, discard the in-core queue. When this site is + * dead, but the number of in-core queues exceeds some threshold, get rid + * of this in-core queue anyway, in order to avoid running out of memory. + */ + if (queue->todo.next == 0 && queue->busy.next == 0) { + if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit) + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue)) + qmgr_queue_done(queue); + } + + /* + * Update the in-core message reference count. When the in-core message + * structure has no more references, dispose of the message. + */ + message->refcount--; + if (message->refcount == 0) + qmgr_active_done(message); +} + +/* qmgr_entry_create - create queue todo entry */ + +QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *peer, QMGR_MESSAGE *message) +{ + QMGR_ENTRY *entry; + QMGR_QUEUE *queue = peer->queue; + + /* + * Sanity check. + */ + if (QMGR_QUEUE_THROTTLED(queue)) + msg_panic("qmgr_entry_create: dead queue: %s", queue->name); + + /* + * Create the delivery request. + */ + entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY)); + entry->stream = 0; + entry->message = message; + recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE); + message->refcount++; + entry->peer = peer; + QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers); + peer->refcount++; + entry->queue = queue; + QMGR_LIST_APPEND(queue->todo, entry, queue_peers); + queue->todo_refcount++; + peer->job->read_entries++; + + /* + * Warn if a destination is falling behind while the active queue + * contains a non-trivial amount of single-recipient email. When a + * destination takes up more and more space in the active queue, then + * other mail will not get through and delivery performance will suffer. + * + * XXX At this point in the code, the busy reference count is still less + * than the concurrency limit (otherwise this code would not be invoked + * in the first place) so we have to make make some awkward adjustments + * below. + * + * XXX The queue length test below looks at the active queue share of an + * individual destination. This catches the case where mail for one + * destination is falling behind because it has to round-robin compete + * with many other destinations. However, Postfix will also perform + * poorly when most of the active queue is tied up by a small number of + * concurrency limited destinations. The queue length test below detects + * such conditions only indirectly. + * + * XXX This code does not detect the case that the active queue is being + * starved because incoming mail is pounding the disk. + */ + if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) { + int queue_length = queue->todo_refcount + queue->busy_refcount; + time_t now; + QMGR_TRANSPORT *transport; + double active_share; + + if (queue_length > var_qmgr_active_limit / 5 + && (now = event_time()) >= queue->clog_time_to_warn) { + active_share = queue_length / (double) qmgr_message_count; + msg_warn("mail for %s is using up %d of %d active queue entries", + queue->nexthop, queue_length, qmgr_message_count); + if (active_share < 0.9) + msg_warn("this may slow down other mail deliveries"); + transport = queue->transport; + if (transport->dest_concurrency_limit > 0 + && transport->dest_concurrency_limit <= queue->busy_refcount + 1) + msg_warn("you may need to increase the main.cf %s%s from %d", + transport->name, _DEST_CON_LIMIT, + transport->dest_concurrency_limit); + else if (queue->window > var_qmgr_active_limit * active_share) + msg_warn("you may need to increase the main.cf %s from %d", + VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit); + else if (queue->peers.next != queue->peers.prev) + msg_warn("you may need a separate master.cf transport for %s", + queue->nexthop); + else { + msg_warn("you may need to reduce %s connect and helo timeouts", + transport->name); + msg_warn("so that Postfix quickly skips unavailable hosts"); + msg_warn("you may need to increase the main.cf %s and %s", + VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME); + msg_warn("so that Postfix wastes less time on undeliverable mail"); + msg_warn("you may need to increase the master.cf %s process limit", + transport->name); + } + msg_warn("please avoid flushing the whole queue when you have"); + msg_warn("lots of deferred mail, that is bad for performance"); + msg_warn("to turn off these warnings specify: %s = 0", + VAR_QMGR_CLOG_WARN_TIME); + queue->clog_time_to_warn = now + var_qmgr_clog_warn_time; + } + } + return (entry); +} diff --git a/src/qmgr/qmgr_error.c b/src/qmgr/qmgr_error.c new file mode 100644 index 0000000..6541c35 --- /dev/null +++ b/src/qmgr/qmgr_error.c @@ -0,0 +1,121 @@ +/*++ +/* NAME +/* qmgr_error 3 +/* SUMMARY +/* look up/create error/retry queue +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_TRANSPORT *qmgr_error_transport(service) +/* const char *service; +/* +/* QMGR_QUEUE *qmgr_error_queue(service, dsn) +/* const char *service; +/* DSN *dsn; +/* +/* char *qmgr_error_nexthop(dsn) +/* DSN *dsn; +/* DESCRIPTION +/* qmgr_error_transport() looks up the error transport for the +/* specified service. The result is null if the transport is +/* not available. +/* +/* qmgr_error_queue() looks up an error queue for the specified +/* service and problem. The result is null if the queue is not +/* available. +/* +/* qmgr_error_nexthop() computes the next-hop information for +/* the specified problem. The result must be passed to myfree(). +/* +/* Arguments: +/* .IP dsn +/* See dsn(3). +/* .IP service +/* One of MAIL_SERVICE_ERROR or MAIL_SERVICE_RETRY. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +#include +#include + +/* Global library. */ + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_error_transport - look up error transport for specified service */ + +QMGR_TRANSPORT *qmgr_error_transport(const char *service) +{ + QMGR_TRANSPORT *transport; + + /* + * Find or create retry transport. + */ + if ((transport = qmgr_transport_find(service)) == 0) + transport = qmgr_transport_create(service); + if (QMGR_TRANSPORT_THROTTLED(transport)) + return (0); + + /* + * Done. + */ + return (transport); +} + +/* qmgr_error_queue - look up error queue for specified service and problem */ + +QMGR_QUEUE *qmgr_error_queue(const char *service, DSN *dsn) +{ + QMGR_TRANSPORT *transport; + QMGR_QUEUE *queue; + char *nexthop; + + /* + * Find or create transport. + */ + if ((transport = qmgr_error_transport(service)) == 0) + return (0); + + /* + * Find or create queue. + */ + nexthop = qmgr_error_nexthop(dsn); + if ((queue = qmgr_queue_find(transport, nexthop)) == 0) + queue = qmgr_queue_create(transport, nexthop, nexthop); + myfree(nexthop); + if (QMGR_QUEUE_THROTTLED(queue)) + return (0); + + /* + * Done. + */ + return (queue); +} + +/* qmgr_error_nexthop - compute next-hop information from problem description */ + +char *qmgr_error_nexthop(DSN *dsn) +{ + char *nexthop; + + nexthop = concatenate(dsn->status, " ", dsn->reason, (char *) 0); + return (nexthop); +} diff --git a/src/qmgr/qmgr_feedback.c b/src/qmgr/qmgr_feedback.c new file mode 100644 index 0000000..f341b95 --- /dev/null +++ b/src/qmgr/qmgr_feedback.c @@ -0,0 +1,177 @@ +/*++ +/* NAME +/* qmgr_feedback 3 +/* SUMMARY +/* delivery agent feedback management +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_feedback_init(fbck_ctl, name_prefix, name_tail, +/* def_name, def_value) +/* QMGR_FEEDBACK *fbck_ctl; +/* const char *name_prefix; +/* const char *name_tail; +/* const char *def_name; +/* const char *def_value; +/* +/* double QMGR_FEEDBACK_VAL(fbck_ctl, concurrency) +/* QMGR_FEEDBACK *fbck_ctl; +/* const int concurrency; +/* DESCRIPTION +/* Upon completion of a delivery request, a delivery agent +/* provides a hint that the scheduler should dedicate fewer or +/* more resources to a specific destination. +/* +/* qmgr_feedback_init() looks up transport-dependent positive +/* or negative concurrency feedback control information from +/* main.cf, and converts it to internal form. +/* +/* QMGR_FEEDBACK_VAL() computes a concurrency adjustment based +/* on a preprocessed feedback control information and the +/* current concurrency window. This is an "unsafe" macro that +/* evaluates some arguments multiple times. +/* +/* Arguments: +/* .IP fbck_ctl +/* Pointer to QMGR_FEEDBACK structure where the result will +/* be stored. +/* .IP name_prefix +/* Mail delivery transport name, used as the initial portion +/* of a transport-dependent concurrency feedback parameter +/* name. +/* .IP name_tail +/* The second, and fixed, portion of a transport-dependent +/* concurrency feedback parameter. +/* .IP def_name +/* The name of a default feedback parameter. +/* .IP def_val +/* The value of the default feedback parameter. +/* .IP concurrency +/* Delivery concurrency for concurrency-dependent feedback calculation. +/* DIAGNOSTICS +/* Warning: configuration error or unreasonable input. The program +/* uses name_tail feedback instead. +/* Panic: consistency check failure. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include +#include +#include /* INT_MAX */ +#include /* sscanf() */ +#include + +/* Utility library. */ + +#include +#include +#include +#include + +/* Global library. */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * Lookup tables for main.cf feedback method names. + */ +const NAME_CODE qmgr_feedback_map[] = { + CONC_FDBACK_NAME_WIN, QMGR_FEEDBACK_IDX_WIN, +#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN + CONC_FDBACK_NAME_SQRT_WIN, QMGR_FEEDBACK_IDX_SQRT_WIN, +#endif + 0, QMGR_FEEDBACK_IDX_NONE, +}; + +/* qmgr_feedback_init - initialize feedback control */ + +void qmgr_feedback_init(QMGR_FEEDBACK *fb, + const char *name_prefix, + const char *name_tail, + const char *def_name, + const char *def_val) +{ + double enum_val; + char denom_str[30 + 1]; + double denom_val; + char slash; + char junk; + char *fbck_name; + char *fbck_val; + + /* + * Look up the transport-dependent feedback value. + */ + fbck_name = concatenate(name_prefix, name_tail, (char *) 0); + fbck_val = get_mail_conf_str(fbck_name, def_val, 1, 0); + + /* + * We allow users to express feedback as 1/8, as a more user-friendly + * alternative to 0.125 (or worse, having users specify the number of + * events in a feedback hysteresis cycle). + * + * We use some sscanf() fu to parse the value into numerator and optional + * "/" followed by denominator. We're doing this only a few times during + * the process life time, so we strive for convenience instead of speed. + */ +#define INCLUSIVE_BOUNDS(val, low, high) ((val) >= (low) && (val) <= (high)) + + fb->hysteresis = 1; /* legacy */ + fb->base = -1; /* assume error */ + + switch (sscanf(fbck_val, "%lf %1[/] %30s%c", + &enum_val, &slash, denom_str, &junk)) { + case 1: + fb->index = QMGR_FEEDBACK_IDX_NONE; + fb->base = enum_val; + break; + case 3: + if ((fb->index = name_code(qmgr_feedback_map, NAME_CODE_FLAG_NONE, + denom_str)) != QMGR_FEEDBACK_IDX_NONE) { + fb->base = enum_val; + } else if (INCLUSIVE_BOUNDS(enum_val, 0, INT_MAX) + && sscanf(denom_str, "%lf%c", &denom_val, &junk) == 1 + && INCLUSIVE_BOUNDS(denom_val, 1.0 / INT_MAX, INT_MAX)) { + fb->base = enum_val / denom_val; + } + break; + } + + /* + * Sanity check. If input is bad, we just warn and use a reasonable + * default. + */ + if (!INCLUSIVE_BOUNDS(fb->base, 0, 1)) { + msg_warn("%s: ignoring malformed or unreasonable feedback: %s", + strcmp(fbck_val, def_val) ? fbck_name : def_name, fbck_val); + fb->index = QMGR_FEEDBACK_IDX_NONE; + fb->base = 1; + } + + /* + * Performance debugging/analysis. + */ + if (var_conc_feedback_debug) + msg_info("%s: %s feedback type %d value at %d: %g", + name_prefix, strcmp(fbck_val, def_val) ? + fbck_name : def_name, fb->index, var_init_dest_concurrency, + QMGR_FEEDBACK_VAL(*fb, var_init_dest_concurrency)); + + myfree(fbck_name); + myfree(fbck_val); +} diff --git a/src/qmgr/qmgr_job.c b/src/qmgr/qmgr_job.c new file mode 100644 index 0000000..24065f3 --- /dev/null +++ b/src/qmgr/qmgr_job.c @@ -0,0 +1,978 @@ +/*++ +/* NAME +/* qmgr_job 3 +/* SUMMARY +/* per-transport jobs +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_JOB *qmgr_job_obtain(message, transport) +/* QMGR_MESSAGE *message; +/* QMGR_TRANSPORT *transport; +/* +/* void qmgr_job_free(job) +/* QMGR_JOB *job; +/* +/* void qmgr_job_move_limits(job) +/* QMGR_JOB *job; +/* +/* QMGR_ENTRY *qmgr_job_entry_select(transport) +/* QMGR_TRANSPORT *transport; +/* +/* void qmgr_job_blocker_update(queue) +/* QMGR_QUEUE *queue; +/* DESCRIPTION +/* These routines add/delete/manipulate per-transport jobs. +/* Each job corresponds to a specific transport and message. +/* Each job has a peer list containing all pending delivery +/* requests for that message. +/* +/* qmgr_job_obtain() finds an existing job for named message and +/* transport combination. New empty job is created if no existing can +/* be found. In either case, the job is prepared for assignment of +/* (more) message recipients. +/* +/* qmgr_job_free() disposes of a per-transport job after all +/* its entries have been taken care of. It is an error to dispose +/* of a job that is still in use. +/* +/* qmgr_job_entry_select() attempts to find the next entry suitable +/* for delivery. The job preempting algorithm is also exercised. +/* If necessary, an attempt to read more recipients into core is made. +/* This can result in creation of more job, queue and entry structures. +/* +/* qmgr_job_blocker_update() updates the status of blocked +/* jobs after a decrease in the queue's concurrency level, +/* after the queue is throttled, or after the queue is resumed +/* from suspension. +/* +/* qmgr_job_move_limits() takes care of proper distribution of the +/* per-transport recipients limit among the per-transport jobs. +/* Should be called whenever a job's recipient slot becomes available. +/* DIAGNOSTICS +/* Panic: consistency check failure. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Patrik Rak +/* patrik@raxoft.cz +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +#include +#include +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* Forward declarations */ + +static void qmgr_job_pop(QMGR_JOB *); + +/* Helper macros */ + +#define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries) + +/* + * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread + * because we don't know if all those unread recipients go to our transport yet. + */ + +#define MIN_ENTRIES(job) ((job)->read_entries) +#define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread) + +#define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0) + +#define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag) + +/* qmgr_job_create - create and initialize message job structure */ + +static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) +{ + QMGR_JOB *job; + + job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB)); + job->message = message; + QMGR_LIST_APPEND(message->job_list, job, message_peers); + htable_enter(transport->job_byname, message->queue_id, (void *) job); + job->transport = transport; + QMGR_LIST_INIT(job->transport_peers); + QMGR_LIST_INIT(job->time_peers); + job->stack_parent = 0; + QMGR_LIST_INIT(job->stack_children); + QMGR_LIST_INIT(job->stack_siblings); + job->stack_level = -1; + job->blocker_tag = 0; + job->peer_byname = htable_create(0); + QMGR_LIST_INIT(job->peer_list); + job->slots_used = 0; + job->slots_available = 0; + job->selected_entries = 0; + job->read_entries = 0; + job->rcpt_count = 0; + job->rcpt_limit = 0; + return (job); +} + +/* qmgr_job_link - append the job to the job lists based on the time it was queued */ + +static void qmgr_job_link(QMGR_JOB *job) +{ + QMGR_TRANSPORT *transport = job->transport; + QMGR_MESSAGE *message = job->message; + QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current; + int delay; + + /* + * Sanity checks. + */ + if (job->stack_level >= 0) + msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level); + + /* + * Traverse the time list and the scheduler list from the end and stop + * when we found job older than the one being linked. + * + * During the traversals keep track if we have come across either the + * current job or the first unread job on the job list. If this is the + * case, these pointers will be adjusted below as required. + * + * Although both lists are exactly the same when only jobs on the stack + * level zero are considered, it's easier to traverse them separately. + * Otherwise it's impossible to keep track of the current job pointer + * effectively. + * + * This may look inefficient but under normal operation it is expected that + * the loops will stop right away, resulting in normal list appends + * below. However, this code is necessary for reviving retired jobs and + * for jobs which are created long after the first chunk of recipients + * was read in-core (either of these can happen only for multi-transport + * messages). + * + * XXX Note that we test stack_parent rather than stack_level below. This + * subtle difference allows us to enqueue the job in correct time order + * with respect to orphaned children even after their original parent on + * level zero is gone. Consequently, the early loop stop in candidate + * selection works reliably, too. These are the reasons why we care to + * bother with children adoption at all. + */ + current = transport->job_current; + for (next = 0, prev = transport->job_list.prev; prev; + next = prev, prev = prev->transport_peers.prev) { + if (prev->stack_parent == 0) { + delay = message->queued_time - prev->message->queued_time; + if (delay >= 0) + break; + } + if (current == prev) + current = 0; + } + list_prev = prev; + list_next = next; + + unread = transport->job_next_unread; + for (next = 0, prev = transport->job_bytime.prev; prev; + next = prev, prev = prev->time_peers.prev) { + delay = message->queued_time - prev->message->queued_time; + if (delay >= 0) + break; + if (unread == prev) + unread = 0; + } + + /* + * Link the job into the proper place on the job lists and mark it so we + * know it has been linked. + */ + job->stack_level = 0; + QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers); + QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers); + + /* + * Update the current job pointer if necessary. + */ + if (current == 0) + transport->job_current = job; + + /* + * Update the pointer to the first unread job on the job list and steal + * the unused recipient slots from the old one. + */ + if (unread == 0) { + unread = transport->job_next_unread; + transport->job_next_unread = job; + if (unread != 0) + qmgr_job_move_limits(unread); + } + + /* + * Get as much recipient slots as possible. The excess will be returned + * to the transport pool as soon as the exact amount required is known + * (which is usually after all recipients have been read in core). + */ + if (transport->rcpt_unused > 0) { + job->rcpt_limit += transport->rcpt_unused; + message->rcpt_limit += transport->rcpt_unused; + transport->rcpt_unused = 0; + } +} + +/* qmgr_job_find - lookup job associated with named message and transport */ + +static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) +{ + + /* + * Instead of traversing the message job list, we use single per + * transport hash table. This is better (at least with respect to memory + * usage) than having single hash table (usually almost empty) for each + * message. + */ + return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id)); +} + +/* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */ + +QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) +{ + QMGR_JOB *job; + + /* + * Try finding an existing job, reviving it if it was already retired. + * Create a new job for this transport/message combination otherwise. In + * either case, the job ends linked on the job lists. + */ + if ((job = qmgr_job_find(message, transport)) == 0) + job = qmgr_job_create(message, transport); + if (job->stack_level < 0) + qmgr_job_link(job); + + /* + * Reset the candidate cache because of the new expected recipients. Make + * sure the job is not marked as a blocker for the same reason. Note that + * this can result in having a non-blocker followed by more blockers. + * Consequently, we can't just update the current job pointer, we have to + * reset it. Fortunately qmgr_job_entry_select() will easily deal with + * this and will lookup the real current job for us. + */ + RESET_CANDIDATE_CACHE(transport); + if (IS_BLOCKER(job, transport)) { + job->blocker_tag = 0; + transport->job_current = transport->job_list.next; + } + return (job); +} + +/* qmgr_job_move_limits - move unused recipient slots to the next unread job */ + +void qmgr_job_move_limits(QMGR_JOB *job) +{ + QMGR_TRANSPORT *transport = job->transport; + QMGR_MESSAGE *message = job->message; + QMGR_JOB *next = transport->job_next_unread; + int rcpt_unused, msg_rcpt_unused; + + /* + * Find next unread job on the job list if necessary. Cache it for later. + * This makes the amortized efficiency of this routine O(1) per job. Note + * that we use the time list whose ordering doesn't change over time. + */ + if (job == next) { + for (next = next->time_peers.next; next; next = next->time_peers.next) + if (next->message->rcpt_offset != 0) + break; + transport->job_next_unread = next; + } + + /* + * Calculate the number of available unused slots. + */ + rcpt_unused = job->rcpt_limit - job->rcpt_count; + msg_rcpt_unused = message->rcpt_limit - message->rcpt_count; + if (msg_rcpt_unused < rcpt_unused) + rcpt_unused = msg_rcpt_unused; + + /* + * Transfer the unused recipient slots back to the transport pool and to + * the next not-fully-read job. Job's message limits are adjusted + * accordingly. Note that the transport pool can be negative if we used + * some of the rcpt_per_stack slots. + */ + if (rcpt_unused > 0) { + job->rcpt_limit -= rcpt_unused; + message->rcpt_limit -= rcpt_unused; + transport->rcpt_unused += rcpt_unused; + if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) { + next->rcpt_limit += rcpt_unused; + next->message->rcpt_limit += rcpt_unused; + transport->rcpt_unused = 0; + } + } +} + +/* qmgr_job_parent_gone - take care of orphaned stack children */ + +static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent) +{ + QMGR_JOB *child; + + while ((child = job->stack_children.next) != 0) { + QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings); + if (parent != 0) + QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings); + child->stack_parent = parent; + } +} + +/* qmgr_job_unlink - unlink the job from the job lists */ + +static void qmgr_job_unlink(QMGR_JOB *job) +{ + const char *myname = "qmgr_job_unlink"; + QMGR_TRANSPORT *transport = job->transport; + + /* + * Sanity checks. + */ + if (job->stack_level != 0) + msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level); + if (job->stack_parent != 0) + msg_panic("%s: parent present", myname); + if (job->stack_siblings.next != 0) + msg_panic("%s: siblings present", myname); + + /* + * Make sure that children of job on zero stack level are informed that + * their parent is gone too. + */ + qmgr_job_parent_gone(job, 0); + + /* + * Update the current job pointer if necessary. + */ + if (transport->job_current == job) + transport->job_current = job->transport_peers.next; + + /* + * Invalidate the candidate selection cache if necessary. + */ + if (job == transport->candidate_cache + || job == transport->candidate_cache_current) + RESET_CANDIDATE_CACHE(transport); + + /* + * Remove the job from the job lists and mark it as unlinked. + */ + QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); + QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers); + job->stack_level = -1; +} + +/* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */ + +static void qmgr_job_retire(QMGR_JOB *job) +{ + if (msg_verbose) + msg_info("qmgr_job_retire: %s", job->message->queue_id); + + /* + * Pop the job from the job stack if necessary. + */ + if (job->stack_level > 0) + qmgr_job_pop(job); + + /* + * Make sure this job is not cached as the next unread job for this + * transport. The qmgr_entry_done() will make sure that the slots donated + * by this job are moved back to the transport pool as soon as possible. + */ + qmgr_job_move_limits(job); + + /* + * Remove the job from the job lists. Note that it remains on the message + * job list, though, and that it can be revived by using + * qmgr_job_obtain(). Also note that the available slot counter is left + * intact. + */ + qmgr_job_unlink(job); +} + +/* qmgr_job_free - release the job structure */ + +void qmgr_job_free(QMGR_JOB *job) +{ + const char *myname = "qmgr_job_free"; + QMGR_MESSAGE *message = job->message; + QMGR_TRANSPORT *transport = job->transport; + + if (msg_verbose) + msg_info("%s: %s %s", myname, message->queue_id, transport->name); + + /* + * Sanity checks. + */ + if (job->rcpt_count) + msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count); + + /* + * Pop the job from the job stack if necessary. + */ + if (job->stack_level > 0) + qmgr_job_pop(job); + + /* + * Return any remaining recipient slots back to the recipient slots pool. + */ + qmgr_job_move_limits(job); + if (job->rcpt_limit) + msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit); + + /* + * Unlink and discard the structure. Check if the job is still linked on + * the job lists or if it was already retired before unlinking it. + */ + if (job->stack_level >= 0) + qmgr_job_unlink(job); + QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers); + htable_delete(transport->job_byname, message->queue_id, (void (*) (void *)) 0); + htable_free(job->peer_byname, (void (*) (void *)) 0); + myfree((void *) job); +} + +/* qmgr_job_count_slots - maintain the delivery slot counters */ + +static void qmgr_job_count_slots(QMGR_JOB *job) +{ + + /* + * Count the number of delivery slots used during the delivery of the + * selected job. Also count the number of delivery slots available for + * its preemption. + * + * Despite its trivial look, this is one of the key parts of the theory + * behind this preempting scheduler. + */ + job->slots_available++; + job->slots_used++; + + /* + * If the selected job is not the original current job, reset the + * candidate cache because the change above have slightly increased the + * chance of this job becoming a candidate next time. + * + * Don't expect that the change of the current jobs this turn will render + * the candidate cache invalid the next turn - it can happen that the + * next turn the original current job will be selected again and the + * cache would be considered valid in such case. + */ + if (job != job->transport->candidate_cache_current) + RESET_CANDIDATE_CACHE(job->transport); +} + +/* qmgr_job_candidate - find best job candidate for preempting given job */ + +static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current) +{ + QMGR_TRANSPORT *transport = current->transport; + QMGR_JOB *job, *best_job = 0; + double score, best_score = 0.0; + int max_slots, max_needed_entries, max_total_entries; + int delay; + time_t now = sane_time(); + + /* + * Fetch the result directly from the cache if the cache is still valid. + * + * Note that we cache negative results too, so the cache must be invalidated + * by resetting the cached current job pointer, not the candidate pointer + * itself. + * + * In case the cache is valid and contains no candidate, we can ignore the + * time change, as it affects only which candidate is the best, not if + * one exists. However, this feature requires that we no longer relax the + * cache resetting rules, depending on the automatic cache timeout. + */ + if (transport->candidate_cache_current == current + && (transport->candidate_cache_time == now + || transport->candidate_cache == 0)) + return (transport->candidate_cache); + + /* + * Estimate the minimum amount of delivery slots that can ever be + * accumulated for the given job. All jobs that won't fit into these + * slots are excluded from the candidate selection. + */ + max_slots = (MIN_ENTRIES(current) - current->selected_entries + + current->slots_available) / transport->slot_cost; + + /* + * Select the candidate with best time_since_queued/total_recipients + * score. In addition to jobs which don't meet the max_slots limit, skip + * also jobs which don't have any selectable entries at the moment. + * + * Instead of traversing the whole job list we traverse it just from the + * current job forward. This has several advantages. First, we skip some + * of the blocker jobs and the current job itself right away. But the + * really important advantage is that we are sure that we don't consider + * any jobs that are already stack children of the current job. Thanks to + * this we can easily include all encountered jobs which are leaf + * children of some of the preempting stacks as valid candidates. All we + * need to do is to make sure we do not include any of the stack parents. + * And, because the leaf children are not ordered by the time since + * queued, we have to exclude them from the early loop end test. + * + * However, don't bother searching if we can't find anything suitable + * anyway. + */ + if (max_slots > 0) { + for (job = current->transport_peers.next; job; job = job->transport_peers.next) { + if (job->stack_children.next != 0 || IS_BLOCKER(job, transport)) + continue; + max_total_entries = MAX_ENTRIES(job); + max_needed_entries = max_total_entries - job->selected_entries; + delay = now - job->message->queued_time + 1; + if (max_needed_entries > 0 && max_needed_entries <= max_slots) { + score = (double) delay / max_total_entries; + if (score > best_score) { + best_score = score; + best_job = job; + } + } + + /* + * Stop early if the best score is as good as it can get. + */ + if (delay <= best_score && job->stack_level == 0) + break; + } + } + + /* + * Cache the result for later use. + */ + transport->candidate_cache = best_job; + transport->candidate_cache_current = current; + transport->candidate_cache_time = now; + + return (best_job); +} + +/* qmgr_job_preempt - preempt large message with smaller one */ + +static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current) +{ + const char *myname = "qmgr_job_preempt"; + QMGR_TRANSPORT *transport = current->transport; + QMGR_JOB *job, *prev; + int expected_slots; + int rcpt_slots; + + /* + * Suppress preempting completely if the current job is not big enough to + * accumulate even the minimal number of slots required. + * + * Also, don't look for better job candidate if there are no available slots + * yet (the count can get negative due to the slot loans below). + */ + if (current->slots_available <= 0 + || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost) + return (current); + + /* + * Find best candidate for preempting the current job. + * + * Note that the function also takes care that the candidate fits within the + * number of delivery slots which the current job is still able to + * accumulate. + */ + if ((job = qmgr_job_candidate(current)) == 0) + return (current); + + /* + * Sanity checks. + */ + if (job == current) + msg_panic("%s: attempt to preempt itself", myname); + if (job->stack_children.next != 0) + msg_panic("%s: already on the job stack (%d)", myname, job->stack_level); + if (job->stack_level < 0) + msg_panic("%s: not on the job list (%d)", myname, job->stack_level); + + /* + * Check if there is enough available delivery slots accumulated to + * preempt the current job. + * + * The slot loaning scheme improves the average message response time. Note + * that the loan only allows the preemption happen earlier, though. It + * doesn't affect how many slots have to be "paid" - in either case the + * full number of slots required has to be accumulated later before the + * current job can be preempted again. + */ + expected_slots = MAX_ENTRIES(job) - job->selected_entries; + if (current->slots_available / transport->slot_cost + transport->slot_loan + < expected_slots * transport->slot_loan_factor / 100.0) + return (current); + + /* + * Preempt the current job. + * + * This involves placing the selected candidate in front of the current job + * on the job list and updating the stack parent/child/sibling pointers + * appropriately. But first we need to make sure that the candidate is + * taken from its previous job stack which it might be top of. + */ + if (job->stack_level > 0) + qmgr_job_pop(job); + QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); + prev = current->transport_peers.prev; + QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers); + job->stack_parent = current; + QMGR_LIST_APPEND(current->stack_children, job, stack_siblings); + job->stack_level = current->stack_level + 1; + + /* + * Update the current job pointer and explicitly reset the candidate + * cache. + */ + transport->job_current = job; + RESET_CANDIDATE_CACHE(transport); + + /* + * Since the single job can be preempted by several jobs at the same + * time, we have to adjust the available slot count now to prevent using + * the same slots multiple times. To do that we subtract the number of + * slots the preempting job will supposedly use. This number will be + * corrected later when that job is popped from the stack to reflect the + * number of slots really used. + * + * As long as we don't need to keep track of how many slots were really + * used, we can (ab)use the slots_used counter for counting the + * difference between the real and expected amounts instead of the + * absolute amount. + */ + current->slots_available -= expected_slots * transport->slot_cost; + job->slots_used = -expected_slots; + + /* + * Add part of extra recipient slots reserved for preempting jobs to the + * new current job if necessary. + * + * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such + * case. + */ + if (job->message->rcpt_offset != 0) { + rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2; + job->rcpt_limit += rcpt_slots; + job->message->rcpt_limit += rcpt_slots; + transport->rcpt_unused -= rcpt_slots; + } + if (msg_verbose) + msg_info("%s: %s by %s, level %d", myname, current->message->queue_id, + job->message->queue_id, job->stack_level); + + return (job); +} + +/* qmgr_job_pop - remove the job from its job preemption stack */ + +static void qmgr_job_pop(QMGR_JOB *job) +{ + const char *myname = "qmgr_job_pop"; + QMGR_TRANSPORT *transport = job->transport; + QMGR_JOB *parent; + + if (msg_verbose) + msg_info("%s: %s", myname, job->message->queue_id); + + /* + * Sanity checks. + */ + if (job->stack_level <= 0) + msg_panic("%s: not on the job stack (%d)", myname, job->stack_level); + + /* + * Adjust the number of delivery slots available to preempt job's parent. + * Note that the -= actually adds back any unused slots, as we have + * already subtracted the expected amount of slots from both counters + * when we did the preemption. + * + * Note that we intentionally do not adjust slots_used of the parent. Doing + * so would decrease the maximum per message inflation factor if the + * preemption appeared near the end of parent delivery. + * + * For the same reason we do not adjust parent's slots_available if the + * parent is not the original parent that was preempted by this job + * (i.e., the original parent job has already completed). + * + * This is another key part of the theory behind this preempting scheduler. + */ + if ((parent = job->stack_parent) != 0 + && job->stack_level == parent->stack_level + 1) + parent->slots_available -= job->slots_used * transport->slot_cost; + + /* + * Remove the job from its parent's children list. + */ + if (parent != 0) { + QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings); + job->stack_parent = 0; + } + + /* + * If there is a parent, let it adopt all those orphaned children. + * Otherwise at least notify the children that their parent is gone. + */ + qmgr_job_parent_gone(job, parent); + + /* + * Put the job back to stack level zero. + */ + job->stack_level = 0; + + /* + * Explicitly reset the candidate cache. It's not worth trying to skip + * this under some complicated conditions - in most cases the popped job + * is the current job so we would have to reset it anyway. + */ + RESET_CANDIDATE_CACHE(transport); + + /* + * Here we leave the remaining work involving the proper placement on the + * job list to the caller. The most important reason for this is that it + * allows us not to look up where exactly to place the job. + * + * The caller is also made responsible for invalidating the current job + * cache if necessary. + */ +#if 0 + QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); + QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers); + + if (transport->job_current == job) + transport->job_current = job->transport_peers.next; +#endif +} + +/* qmgr_job_peer_select - select next peer suitable for delivery */ + +static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job) +{ + QMGR_PEER *peer; + QMGR_MESSAGE *message = job->message; + + /* + * Try reading in more recipients. We do that as soon as possible + * (almost, see below), to make sure there is enough new blood pouring + * in. Otherwise single recipient for slow destination might starve the + * entire message delivery, leaving lot of fast destination recipients + * sitting idle in the queue file. + * + * Ideally we would like to read in recipients whenever there is a space, + * but to prevent excessive I/O, we read them only when enough time has + * passed or we can read enough of them at once. + * + * Note that even if we read the recipients few at a time, the message + * loading code tries to put them to existing recipient entries whenever + * possible, so the per-destination recipient grouping is not grossly + * affected. + * + * XXX Workaround for logic mismatch. The message->refcount test needs + * explanation. If the refcount is zero, it means that qmgr_active_done() + * is being completed asynchronously. In such case, we can't read in + * more recipients as bad things would happen after qmgr_active_done() + * continues processing. Note that this results in the given job being + * stalled for some time, but fortunately this particular situation is so + * rare that it is not critical. Still we seek for better solution. + */ + if (message->rcpt_offset != 0 + && message->refcount > 0 + && (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit + || (message->rcpt_limit > message->rcpt_count + && sane_time() - message->refill_time >= job->transport->refill_delay))) + qmgr_message_realloc(message); + + /* + * Get the next suitable peer, if there is any. + */ + if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0) + return (peer); + + /* + * There is no suitable peer in-core, so try reading in more recipients + * if possible. This is our last chance to get suitable peer before + * giving up on this job for now. + * + * XXX For message->refcount, see above. + */ + if (message->rcpt_offset != 0 + && message->refcount > 0 + && message->rcpt_limit > message->rcpt_count) { + qmgr_message_realloc(message); + if (HAS_ENTRIES(job)) + return (qmgr_peer_select(job)); + } + return (0); +} + +/* qmgr_job_entry_select - select next entry suitable for delivery */ + +QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport) +{ + QMGR_JOB *job, *next; + QMGR_PEER *peer; + QMGR_ENTRY *entry; + + /* + * Get the current job if there is one. + */ + if ((job = transport->job_current) == 0) + return (0); + + /* + * Exercise the preempting algorithm if enabled. + * + * The slot_cost equal to 1 causes the algorithm to degenerate and is + * therefore disabled too. + */ + if (transport->slot_cost >= 2) + job = qmgr_job_preempt(job); + + /* + * Select next entry suitable for delivery. In case the current job can't + * provide one because of the per-destination concurrency limits, we mark + * it as a "blocker" job and continue with the next job on the job list. + * + * Note that the loop also takes care of getting the "stall" jobs (job with + * no entries currently available) out of the way if necessary. Stall + * jobs can appear in case of multi-transport messages whose recipients + * don't fit in-core at once. Some jobs created by such message may have + * only few recipients and would stay on the job list until all other + * jobs of that message are delivered, blocking precious recipient slots + * available to this transport. Or it can happen that the job has some + * more entries but suddenly they all get deferred. Whatever the reason, + * we retire such jobs below if we happen to come across some. + */ + for ( /* empty */ ; job; job = next) { + next = job->transport_peers.next; + + /* + * Don't bother if the job is known to have no available entries + * because of the per-destination concurrency limits. + */ + if (IS_BLOCKER(job, transport)) + continue; + + if ((peer = qmgr_job_peer_select(job)) != 0) { + + /* + * We have found a suitable peer. Select one of its entries and + * adjust the delivery slot counters. + */ + entry = qmgr_entry_select(peer); + qmgr_job_count_slots(job); + + /* + * Remember the current job for the next time so we don't have to + * crawl over all those blockers again. They will be reconsidered + * when the concurrency limit permits. + */ + transport->job_current = job; + + /* + * In case we selected the very last job entry, remove the job + * from the job lists right now. + * + * This action uses the assumption that once the job entry has been + * selected, it can be unselected only before the message ifself + * is deferred. Thus the job with all entries selected can't + * re-appear with more entries available for selection again + * (without reading in more entries from the queue file, which in + * turn invokes qmgr_job_obtain() which re-links the job back on + * the lists if necessary). + * + * Note that qmgr_job_move_limits() transfers the recipients slots + * correctly even if the job is unlinked from the job list thanks + * to the job_next_unread caching. + */ + if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0) + qmgr_job_retire(job); + + /* + * Finally. Hand back the fruit of our tedious effort. + */ + return (entry); + } else if (HAS_ENTRIES(job)) { + + /* + * The job can't be selected due the concurrency limits. Mark it + * together with its queues so we know they are blocking the job + * list and they get the appropriate treatment. In particular, + * all blockers will be reconsidered when one of the problematic + * queues will accept more deliveries. And the job itself will be + * reconsidered if it is assigned some more entries. + */ + job->blocker_tag = transport->blocker_tag; + for (peer = job->peer_list.next; peer; peer = peer->peers.next) + if (peer->entry_list.next != 0) + peer->queue->blocker_tag = transport->blocker_tag; + } else { + + /* + * The job is "stalled". Retire it until it either gets freed or + * gets more entries later. + */ + qmgr_job_retire(job); + } + } + + /* + * We have not found any entry we could use for delivery. Well, things + * must have changed since this transport was selected for asynchronous + * allocation. Never mind. Clear the current job pointer and reluctantly + * report back that we have failed in our task. + */ + transport->job_current = 0; + return (0); +} + +/* qmgr_job_blocker_update - update "blocked job" status */ + +void qmgr_job_blocker_update(QMGR_QUEUE *queue) +{ + QMGR_TRANSPORT *transport = queue->transport; + + /* + * If the queue was blocking some of the jobs on the job list, check if + * the concurrency limit has lifted. If there are still some pending + * deliveries, give it a try and unmark all transport blockers at once. + * The qmgr_job_entry_select() will do the rest. In either case make sure + * the queue is not marked as a blocker anymore, with extra handling of + * queues which were declared dead. + * + * Note that changing the blocker status also affects the candidate cache. + * Most of the cases would be automatically recognized by the current job + * change, but we play safe and reset the cache explicitly below. + * + * Keeping the transport blocker tag odd is an easy way to make sure the tag + * never matches jobs that are not explicitly marked as blockers. + */ + if (queue->blocker_tag == transport->blocker_tag) { + if (queue->window > queue->busy_refcount && queue->todo.next != 0) { + transport->blocker_tag += 2; + transport->job_current = transport->job_list.next; + transport->candidate_cache_current = 0; + } + if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue)) + queue->blocker_tag = 0; + } +} diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c new file mode 100644 index 0000000..7d7464c --- /dev/null +++ b/src/qmgr/qmgr_message.c @@ -0,0 +1,1570 @@ +/*++ +/* NAME +/* qmgr_message 3 +/* SUMMARY +/* in-core message structures +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_message_count; +/* int qmgr_recipient_count; +/* int qmgr_vrfy_pend_count; +/* +/* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode) +/* const char *class; +/* const char *name; +/* int qflags; +/* mode_t mode; +/* +/* QMGR_MESSAGE *qmgr_message_realloc(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_free(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_update_warn(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_kill_record(message, offset) +/* QMGR_MESSAGE *message; +/* long offset; +/* DESCRIPTION +/* This module performs en-gross operations on queue messages. +/* +/* qmgr_message_count is a global counter for the total number +/* of in-core message structures (i.e. the total size of the +/* `active' message queue). +/* +/* qmgr_recipient_count is a global counter for the total number +/* of in-core recipient structures (i.e. the sum of all recipients +/* in all in-core message structures). +/* +/* qmgr_vrfy_pend_count is a global counter for the total +/* number of in-core message structures that are associated +/* with an address verification request. Requests that exceed +/* the address_verify_pending_limit are deferred immediately. +/* This is a backup mechanism for a more refined enforcement +/* mechanism in the verify(8) daemon. +/* +/* qmgr_message_alloc() creates an in-core message structure +/* with sender and recipient information taken from the named queue +/* file. A null result means the queue file could not be read or +/* that the queue file contained incorrect information. A result +/* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number +/* of recipients read from a queue file is limited by the global +/* var_qmgr_rcpt_limit configuration parameter. When the limit +/* is reached, the \fIrcpt_offset\fR structure member is set to +/* the position where the read was terminated. Recipients are +/* run through the resolver, and are assigned to destination +/* queues. Recipients that cannot be assigned are deferred or +/* bounced. Mail that has bounced twice is silently absorbed. +/* A non-zero mode means change the queue file permissions. +/* +/* qmgr_message_realloc() resumes reading recipients from the queue +/* file, and updates the recipient list and \fIrcpt_offset\fR message +/* structure members. A null result means that the file could not be +/* read or that the file contained incorrect information. Recipient +/* limit imposed this time is based on the position of the message +/* job(s) on corresponding transport job list(s). It's considered +/* an error to call this when the recipient slots can't be allocated. +/* +/* qmgr_message_free() destroys an in-core message structure and makes +/* the resources available for reuse. It is an error to destroy +/* a message structure that is still referenced by queue entry structures. +/* +/* qmgr_message_update_warn() takes a closed message, opens it, updates +/* the warning field, and closes it again. +/* +/* qmgr_message_kill_record() takes a closed message, opens it, updates +/* the record type at the given offset to "killed", and closes the file. +/* A killed envelope record is ignored. Killed records are not allowed +/* inside the message content. +/* DIAGNOSTICS +/* Warnings: malformed message file. Fatal errors: out of memory. +/* SEE ALSO +/* envelope(3) message envelope parser +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/*--*/ + +/* System library. */ + +#include +#include +#include +#include /* sscanf() */ +#include +#include +#include +#include +#include + +/* Utility library. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Global library. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Client stubs. */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +int qmgr_message_count; +int qmgr_recipient_count; +int qmgr_vrfy_pend_count; + +/* qmgr_message_create - create in-core message structure */ + +static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, + const char *queue_id, int qflags) +{ + QMGR_MESSAGE *message; + + message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE)); + qmgr_message_count++; + message->flags = 0; + message->qflags = qflags; + message->tflags = 0; + message->tflags_offset = 0; + message->rflags = QMGR_READ_FLAG_DEFAULT; + message->fp = 0; + message->refcount = 0; + message->single_rcpt = 0; + message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0; + message->create_time = 0; + GETTIMEOFDAY(&message->active_time); + message->queued_time = sane_time(); + message->refill_time = 0; + message->data_offset = 0; + message->queue_id = mystrdup(queue_id); + message->queue_name = mystrdup(queue_name); + message->encoding = 0; + message->sender = 0; + message->dsn_envid = 0; + message->dsn_ret = 0; + message->smtputf8 = 0; + message->filter_xport = 0; + message->inspect_xport = 0; + message->redirect_addr = 0; + message->data_size = 0; + message->cont_length = 0; + message->warn_offset = 0; + message->warn_time = 0; + message->rcpt_offset = 0; + message->verp_delims = 0; + message->client_name = 0; + message->client_addr = 0; + message->client_port = 0; + message->client_proto = 0; + message->client_helo = 0; + message->sasl_method = 0; + message->sasl_username = 0; + message->sasl_sender = 0; + message->log_ident = 0; + message->rewrite_context = 0; + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + message->rcpt_count = 0; + message->rcpt_limit = var_qmgr_msg_rcpt_limit; + message->rcpt_unread = 0; + QMGR_LIST_INIT(message->job_list); + return (message); +} + +/* qmgr_message_close - close queue file */ + +static void qmgr_message_close(QMGR_MESSAGE *message) +{ + vstream_fclose(message->fp); + message->fp = 0; +} + +/* qmgr_message_open - open queue file */ + +static int qmgr_message_open(QMGR_MESSAGE *message) +{ + + /* + * Sanity check. + */ + if (message->fp) + msg_panic("%s: queue file is open", message->queue_id); + + /* + * Open this queue file. Skip files that we cannot open. Back off when + * the system appears to be running out of resources. + */ + if ((message->fp = mail_queue_open(message->queue_name, + message->queue_id, + O_RDWR, 0)) == 0) { + if (errno != ENOENT) + msg_fatal("open %s %s: %m", message->queue_name, message->queue_id); + msg_warn("open %s %s: %m", message->queue_name, message->queue_id); + return (-1); + } + return (0); +} + +/* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */ + +static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message) +{ + VSTRING *buf; + long orig_offset, extra_offset; + int rec_type; + char *start; + + /* + * Initialize. No early returns or we have a memory leak. + */ + buf = vstring_alloc(100); + if ((orig_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Rewind to the very beginning to make sure we see all records. + */ + if (vstream_fseek(message->fp, 0, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Scan through the old style queue file. Count the total number of + * recipients and find the data/extra sections offsets. Note that the new + * queue files require that data_size equals extra_offset - data_offset, + * so we set data_size to this as well and ignore the size record itself + * completely. + */ + message->rcpt_unread = 0; + for (;;) { + rec_type = rec_get(message->fp, buf, 0); + if (rec_type <= 0) + /* Report missing end record later. */ + break; + start = vstring_str(buf); + if (msg_verbose > 1) + msg_info("old-style scan record %c %s", rec_type, start); + if (rec_type == REC_TYPE_END) + break; + if (rec_type == REC_TYPE_DONE + || rec_type == REC_TYPE_RCPT + || rec_type == REC_TYPE_DRCP) { + message->rcpt_unread++; + continue; + } + if (rec_type == REC_TYPE_MESG) { + if (message->data_offset == 0) { + if ((message->data_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + if ((extra_offset = atol(start)) <= message->data_offset) + msg_fatal("bad extra offset %s file %s", + start, VSTREAM_PATH(message->fp)); + if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + message->data_size = extra_offset - message->data_offset; + } + continue; + } + } + + /* + * Clean up. + */ + if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + vstring_free(buf); + + /* + * Sanity checks. Verify that all required information was found, + * including the queue file end marker. + */ + if (message->data_offset == 0 || rec_type != REC_TYPE_END) + msg_fatal("%s: envelope records out of order", message->queue_id); +} + +/* qmgr_message_read - read envelope records */ + +static int qmgr_message_read(QMGR_MESSAGE *message) +{ + VSTRING *buf; + int rec_type; + long curr_offset; + long save_offset = message->rcpt_offset; /* save a flag */ + int save_unread = message->rcpt_unread; /* save a count */ + char *start; + int recipient_limit; + const char *error_text; + char *name; + char *value; + char *orig_rcpt = 0; + int count; + int dsn_notify = 0; + char *dsn_orcpt = 0; + int n; + int have_log_client_attr = 0; + + /* + * Initialize. No early returns or we have a memory leak. + */ + buf = vstring_alloc(100); + + /* + * If we re-open this file, skip over on-file recipient records that we + * already looked at, and refill the in-core recipient address list. + * + * For the first time, the message recipient limit is calculated from the + * global recipient limit. This is to avoid reading little recipients + * when the active queue is near empty. When the queue becomes full, only + * the necessary amount is read in core. Such priming is necessary + * because there are no message jobs yet. + * + * For the next time, the recipient limit is based solely on the message + * jobs' positions in the job lists and/or job stacks. + */ + if (message->rcpt_offset) { + if (message->rcpt_list.len) + msg_panic("%s: recipient list not empty on recipient reload", + message->queue_id); + if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + message->rcpt_offset = 0; + recipient_limit = message->rcpt_limit - message->rcpt_count; + } else { + recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count; + if (recipient_limit < message->rcpt_limit) + recipient_limit = message->rcpt_limit; + } + /* Keep interrupt latency in check. */ + if (recipient_limit > 5000) + recipient_limit = 5000; + if (recipient_limit <= 0) + msg_panic("%s: no recipient slots available", message->queue_id); + if (msg_verbose) + msg_info("%s: recipient limit %d", message->queue_id, recipient_limit); + + /* + * Read envelope records. XXX Rely on the front-end programs to enforce + * record size limits. Read up to recipient_limit recipients from the + * queue file, to protect against memory exhaustion. Recipient records + * may appear before or after the message content, so we keep reading + * from the queue file until we have enough recipients (rcpt_offset != 0) + * and until we know all the non-recipient information. + * + * Note that the total recipient count record is accurate only for fresh + * queue files. After some of the recipients are marked as done and the + * queue file is deferred, it can be used as upper bound estimate only. + * Fortunately, this poses no major problem on the scheduling algorithm, + * as the only impact is that the already deferred messages are not + * chosen by qmgr_job_candidate() as often as they could. + * + * On the first open, we must examine all non-recipient records. + * + * Optimization: when we know that recipient records are not mixed with + * non-recipient records, as is typical with mailing list mail, then we + * can avoid having to examine all the queue file records before we can + * start deliveries. This avoids some file system thrashing with huge + * mailing lists. + */ + for (;;) { + if ((curr_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + if (curr_offset == message->data_offset && curr_offset > 0) { + if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + curr_offset += message->data_size; + } + rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE); + start = vstring_str(buf); + if (msg_verbose > 1) + msg_info("record %c %s", rec_type, start); + if (rec_type == REC_TYPE_PTR) { + if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR) + break; + /* Need to update curr_offset after pointer jump. */ + continue; + } + if (rec_type <= 0) { + msg_warn("%s: message rejected: missing end record", + message->queue_id); + break; + } + if (rec_type == REC_TYPE_END) { + message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; + break; + } + + /* + * Map named attributes to pseudo record types, so that we don't have + * to pollute the queue file with records that are incompatible with + * past Postfix versions. Preferably, people should be able to back + * out from an upgrade without losing mail. + */ + if (rec_type == REC_TYPE_ATTR) { + if ((error_text = split_nameval(start, &name, &value)) != 0) { + msg_warn("%s: ignoring bad attribute: %s: %.200s", + message->queue_id, error_text, start); + rec_type = REC_TYPE_ERROR; + break; + } + if ((n = rec_attr_map(name)) != 0) { + start = value; + rec_type = n; + } + } + + /* + * Process recipient records. + */ + if (rec_type == REC_TYPE_RCPT) { + /* See also below for code setting orig_rcpt etc. */ + if (message->rcpt_offset == 0) { + message->rcpt_unread--; + recipient_list_add(&message->rcpt_list, curr_offset, + dsn_orcpt ? dsn_orcpt : "", + dsn_notify ? dsn_notify : 0, + orig_rcpt ? orig_rcpt : "", start); + if (dsn_orcpt) { + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (orig_rcpt) { + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (dsn_notify) + dsn_notify = 0; + if (message->rcpt_list.len >= recipient_limit) { + if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", + VSTREAM_PATH(message->fp)); + if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) + /* We already examined all non-recipient records. */ + break; + if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER) + /* Examine all remaining non-recipient records. */ + continue; + /* Optimizations for "pure recipient" record sections. */ + if (curr_offset > message->data_offset) { + /* We already examined all non-recipient records. */ + message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; + break; + } + /* Examine non-recipient records in extracted segment. */ + if (vstream_fseek(message->fp, message->data_offset + + message->data_size, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + continue; + } + } + continue; + } + if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) { + if (message->rcpt_offset == 0) { + message->rcpt_unread--; + if (dsn_orcpt) { + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (orig_rcpt) { + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (dsn_notify) + dsn_notify = 0; + } + continue; + } + if (rec_type == REC_TYPE_DSN_ORCPT) { + /* See also above for code clearing dsn_orcpt. */ + if (dsn_orcpt != 0) { + msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>", + message->queue_id, dsn_orcpt); + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (message->rcpt_offset == 0) + dsn_orcpt = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_DSN_NOTIFY) { + /* See also above for code clearing dsn_notify. */ + if (dsn_notify != 0) { + msg_warn("%s: ignoring out-of-order DSN notify flags <%d>", + message->queue_id, dsn_notify); + dsn_notify = 0; + } + if (message->rcpt_offset == 0) { + if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n)) + msg_warn("%s: ignoring malformed DSN notify flags <%.200s>", + message->queue_id, start); + else + dsn_notify = n; + continue; + } + } + if (rec_type == REC_TYPE_ORCP) { + /* See also above for code clearing orig_rcpt. */ + if (orig_rcpt != 0) { + msg_warn("%s: ignoring out-of-order original recipient <%.200s>", + message->queue_id, orig_rcpt); + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (message->rcpt_offset == 0) + orig_rcpt = mystrdup(start); + continue; + } + + /* + * Process non-recipient records. + */ + if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) + /* We already examined all non-recipient records. */ + continue; + if (rec_type == REC_TYPE_SIZE) { + if (message->data_offset == 0) { + if ((count = sscanf(start, "%ld %ld %d %d %ld %d", + &message->data_size, &message->data_offset, + &message->rcpt_unread, &message->rflags, + &message->cont_length, + &message->smtputf8)) >= 3) { + /* Postfix >= 1.0 (a.k.a. 20010228). */ + if (message->data_offset <= 0 || message->data_size <= 0) { + msg_warn("%s: invalid size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + if (message->rflags & ~QMGR_READ_FLAG_USER) { + msg_warn("%s: invalid flags in size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + } else if (count == 1) { + /* Postfix < 1.0 (a.k.a. 20010228). */ + qmgr_message_oldstyle_scan(message); + } else { + /* Can't happen. */ + msg_warn("%s: message rejected: weird size record", + message->queue_id); + rec_type = REC_TYPE_ERROR; + break; + } + } + /* Postfix < 2.4 compatibility. */ + if (message->cont_length == 0) { + message->cont_length = message->data_size; + } else if (message->cont_length < 0) { + msg_warn("%s: invalid size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + continue; + } + if (rec_type == REC_TYPE_TIME) { + if (message->arrival_time.tv_sec == 0) + REC_TYPE_TIME_SCAN(start, message->arrival_time); + continue; + } + if (rec_type == REC_TYPE_CTIME) { + if (message->create_time == 0) + message->create_time = atol(start); + continue; + } + if (rec_type == REC_TYPE_FILT) { + if (message->filter_xport != 0) + myfree(message->filter_xport); + message->filter_xport = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_INSP) { + if (message->inspect_xport != 0) + myfree(message->inspect_xport); + message->inspect_xport = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_RDR) { + if (message->redirect_addr != 0) + myfree(message->redirect_addr); + message->redirect_addr = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_FROM) { + if (message->sender == 0) { + message->sender = mystrdup(start); + opened(message->queue_id, message->sender, + message->cont_length, message->rcpt_unread, + "queue %s", message->queue_name); + } + continue; + } + if (rec_type == REC_TYPE_DSN_ENVID) { + /* Allow Milter override. */ + if (message->dsn_envid != 0) + myfree(message->dsn_envid); + message->dsn_envid = mystrdup(start); + } + if (rec_type == REC_TYPE_DSN_RET) { + /* Allow Milter override. */ + if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n)) + msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s", + message->queue_id, start); + else + message->dsn_ret = n; + } + if (rec_type == REC_TYPE_ATTR) { + /* Allow extra segment to override envelope segment info. */ + if (strcmp(name, MAIL_ATTR_ENCODING) == 0) { + if (message->encoding != 0) + myfree(message->encoding); + message->encoding = mystrdup(value); + } + + /* + * Backwards compatibility. Before Postfix 2.3, the logging + * attributes were called client_name, etc. Now they are called + * log_client_name. etc., and client_name is used for the actual + * client information. To support old queue files we accept both + * names for the purpose of logging; the new name overrides the + * old one. + * + * XXX Do not use the "legacy" client_name etc. attribute values for + * initializing the logging attributes, when this file already + * contains the "modern" log_client_name etc. logging attributes. + * Otherwise, logging attributes that are not present in the + * queue file would be set with information from the real client. + */ + else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_name == 0) + message->client_name = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) { + if (have_log_client_attr == 0 && message->client_addr == 0) + message->client_addr = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) { + if (have_log_client_attr == 0 && message->client_port == 0) + message->client_port = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_proto == 0) + message->client_proto = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_helo == 0) + message->client_helo = mystrdup(value); + } + /* Original client attributes. */ + else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) { + if (message->client_name != 0) + myfree(message->client_name); + message->client_name = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) { + if (message->client_addr != 0) + myfree(message->client_addr); + message->client_addr = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) { + if (message->client_port != 0) + myfree(message->client_port); + message->client_port = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) { + if (message->client_proto != 0) + myfree(message->client_proto); + message->client_proto = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) { + if (message->client_helo != 0) + myfree(message->client_helo); + message->client_helo = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) { + if (message->sasl_method == 0) + message->sasl_method = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_METHOD, value); + } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) { + if (message->sasl_username == 0) + message->sasl_username = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_USERNAME, value); + } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) { + if (message->sasl_sender == 0) + message->sasl_sender = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_SENDER, value); + } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) { + if (message->log_ident == 0) + message->log_ident = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_LOG_IDENT, value); + } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) { + if (message->rewrite_context == 0) + message->rewrite_context = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_RWR_CONTEXT, value); + } + + /* + * Optional tracing flags (verify, sendmail -v, sendmail -bv). + * This record is killed after a trace logfile report is sent and + * after the logfile is deleted. + */ + else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) { + if (message->tflags == 0) { + message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value)); + if (message->tflags == DEL_REQ_FLAG_RECORD) + message->tflags_offset = curr_offset; + else + message->tflags_offset = 0; + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) + qmgr_vrfy_pend_count++; + } + } + continue; + } + if (rec_type == REC_TYPE_WARN) { + if (message->warn_offset == 0) { + message->warn_offset = curr_offset; + REC_TYPE_WARN_SCAN(start, message->warn_time); + } + continue; + } + if (rec_type == REC_TYPE_VERP) { + if (message->verp_delims == 0) { + if (message->sender == 0 || message->sender[0] == 0) { + msg_warn("%s: ignoring VERP request for null sender", + message->queue_id); + } else if (verp_delims_verify(start) != 0) { + msg_warn("%s: ignoring bad VERP request: \"%.100s\"", + message->queue_id, start); + } else { + if (msg_verbose) + msg_info("%s: enabling VERP for sender \"%.100s\"", + message->queue_id, message->sender); + message->single_rcpt = 1; + message->verp_delims = mystrdup(start); + } + } + continue; + } + } + + /* + * Grr. + */ + if (dsn_orcpt != 0) { + if (rec_type > 0) + msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>", + message->queue_id, dsn_orcpt); + myfree(dsn_orcpt); + } + if (orig_rcpt != 0) { + if (rec_type > 0) + msg_warn("%s: ignoring out-of-order original recipient <%.200s>", + message->queue_id, orig_rcpt); + myfree(orig_rcpt); + } + + /* + * After sending a "delayed" warning, request sender notification when + * message delivery is completed. While "mail delayed" notifications are + * bad enough because they multiply the amount of email traffic, "delay + * cleared" notifications are even worse because they come in a sudden + * burst when the queue drains after a network outage. + */ + if (var_dsn_delay_cleared && message->warn_time < 0) + message->tflags |= DEL_REQ_FLAG_REC_DLY_SENT; + + /* + * Remember when we have read the last recipient batch. Note that we do + * it here after reading as reading might have used considerable amount + * of time. + */ + message->refill_time = sane_time(); + + /* + * Avoid clumsiness elsewhere in the program. When sending data across an + * IPC channel, sending an empty string is more convenient than sending a + * null pointer. + */ + if (message->dsn_envid == 0) + message->dsn_envid = mystrdup(""); + if (message->encoding == 0) + message->encoding = mystrdup(MAIL_ATTR_ENC_NONE); + if (message->client_name == 0) + message->client_name = mystrdup(""); + if (message->client_addr == 0) + message->client_addr = mystrdup(""); + if (message->client_port == 0) + message->client_port = mystrdup(""); + if (message->client_proto == 0) + message->client_proto = mystrdup(""); + if (message->client_helo == 0) + message->client_helo = mystrdup(""); + if (message->sasl_method == 0) + message->sasl_method = mystrdup(""); + if (message->sasl_username == 0) + message->sasl_username = mystrdup(""); + if (message->sasl_sender == 0) + message->sasl_sender = mystrdup(""); + if (message->log_ident == 0) + message->log_ident = mystrdup(""); + if (message->rewrite_context == 0) + message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL); + /* Postfix < 2.3 compatibility. */ + if (message->create_time == 0) + message->create_time = message->arrival_time.tv_sec; + + /* + * Clean up. + */ + vstring_free(buf); + + /* + * Sanity checks. Verify that all required information was found, + * including the queue file end marker. + */ + if (message->rcpt_unread < 0 + || (message->rcpt_offset == 0 && message->rcpt_unread != 0)) { + msg_warn("%s: rcpt count mismatch (%d)", + message->queue_id, message->rcpt_unread); + message->rcpt_unread = 0; + } + if (rec_type <= 0) { + /* Already logged warning. */ + } else if (message->arrival_time.tv_sec == 0) { + msg_warn("%s: message rejected: missing arrival time record", + message->queue_id); + } else if (message->sender == 0) { + msg_warn("%s: message rejected: missing sender record", + message->queue_id); + } else if (message->data_offset == 0) { + msg_warn("%s: message rejected: missing size record", + message->queue_id); + } else { + return (0); + } + message->rcpt_offset = save_offset; /* restore flag */ + message->rcpt_unread = save_unread; /* restore count */ + recipient_list_free(&message->rcpt_list); + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + return (-1); +} + +/* qmgr_message_update_warn - update the time of next delay warning */ + +void qmgr_message_update_warn(QMGR_MESSAGE *message) +{ + + /* + * After the "mail delayed" warning, optionally send a "delay cleared" + * notification. + */ + if (qmgr_message_open(message) + || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0 + || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT, + REC_TYPE_WARN_ARG(-1)) < 0 + || vstream_fflush(message->fp)) + msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); + qmgr_message_close(message); +} + +/* qmgr_message_kill_record - mark one message record as killed */ + +void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset) +{ + if (offset <= 0) + msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset); + if (qmgr_message_open(message) + || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0 + || vstream_fflush(message->fp)) + msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); + qmgr_message_close(message); +} + +/* qmgr_message_sort_compare - compare recipient information */ + +static int qmgr_message_sort_compare(const void *p1, const void *p2) +{ + RECIPIENT *rcpt1 = (RECIPIENT *) p1; + RECIPIENT *rcpt2 = (RECIPIENT *) p2; + QMGR_QUEUE *queue1; + QMGR_QUEUE *queue2; + char *at1; + char *at2; + int result; + + /* + * Compare most significant to least significant recipient attributes. + * The comparison function must be transitive, so NULL values need to be + * assigned an ordinal (we set NULL last). + */ + + queue1 = rcpt1->u.queue; + queue2 = rcpt2->u.queue; + if (queue1 != 0 && queue2 == 0) + return (-1); + if (queue1 == 0 && queue2 != 0) + return (1); + if (queue1 != 0 && queue2 != 0) { + + /* + * Compare message transport. + */ + if ((result = strcmp(queue1->transport->name, + queue2->transport->name)) != 0) + return (result); + + /* + * Compare queue name (nexthop or recipient@nexthop). + */ + if ((result = strcmp(queue1->name, queue2->name)) != 0) + return (result); + } + + /* + * Compare recipient domain. + */ + at1 = strrchr(rcpt1->address, '@'); + at2 = strrchr(rcpt2->address, '@'); + if (at1 == 0 && at2 != 0) + return (1); + if (at1 != 0 && at2 == 0) + return (-1); + if (at1 != 0 && at2 != 0 + && (result = strcasecmp_utf8(at1, at2)) != 0) + return (result); + + /* + * Compare recipient address. + */ + return (strcmp(rcpt1->address, rcpt2->address)); +} + +/* qmgr_message_sort - sort message recipient addresses by domain */ + +static void qmgr_message_sort(QMGR_MESSAGE *message) +{ + qsort((void *) message->rcpt_list.info, message->rcpt_list.len, + sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare); + if (msg_verbose) { + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *rcpt; + + msg_info("start sorted recipient list"); + for (rcpt = list.info; rcpt < list.info + list.len; rcpt++) + msg_info("qmgr_message_sort: %s", rcpt->address); + msg_info("end sorted recipient list"); + } +} + +/* qmgr_resolve_one - resolve or skip one recipient */ + +static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient, + const char *addr, RESOLVE_REPLY *reply) +{ +#define QMGR_REDIRECT(rp, tp, np) do { \ + (rp)->flags = 0; \ + vstring_strcpy((rp)->transport, (tp)); \ + vstring_strcpy((rp)->nexthop, (np)); \ + } while (0) + + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0) + resolve_clnt_query_from(message->sender, addr, reply); + else + resolve_clnt_verify_from(message->sender, addr, reply); + if (reply->flags & RESOLVE_FLAG_FAIL) { + QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY, + "4.3.0 address resolver failure"); + return (0); + } else if (reply->flags & RESOLVE_FLAG_ERROR) { + QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR, + "5.1.3 bad address syntax"); + return (0); + } else { + return (0); + } +} + +/* qmgr_message_resolve - resolve recipients */ + +static void qmgr_message_resolve(QMGR_MESSAGE *message) +{ + static ARGV *defer_xport_argv; + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *recipient; + QMGR_TRANSPORT *transport = 0; + QMGR_QUEUE *queue = 0; + RESOLVE_REPLY reply; + VSTRING *queue_name; + char *at; + char **cpp; + char *nexthop; + ssize_t len; + int status; + DSN dsn; + MSG_STATS stats; + DSN *saved_dsn; + +#define STREQ(x,y) (strcmp(x,y) == 0) +#define STR vstring_str +#define LEN VSTRING_LEN + + resolve_clnt_init(&reply); + queue_name = vstring_alloc(1); + for (recipient = list.info; recipient < list.info + list.len; recipient++) { + + /* + * Redirect overrides all else. But only once (per entire message). + * For consistency with the remainder of Postfix, rewrite the address + * to canonical form before resolving it. + */ + if (message->redirect_addr) { + if (recipient > list.info) { + recipient->u.queue = 0; + continue; + } + message->rcpt_offset = 0; + message->rcpt_unread = 0; + + rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr, + reply.recipient); + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + if (qmgr_resolve_one(message, recipient, + recipient->address, &reply) < 0) + continue; + if (!STREQ(recipient->address, STR(reply.recipient))) + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + } + + /* + * Content filtering overrides the address resolver. + * + * XXX Bypass content_filter inspection for user-generated probes + * (sendmail -bv). MTA-generated probes never have the "please filter + * me" bits turned on, but we handle them here anyway for the sake of + * future proofing. + */ +#define FILTER_WITHOUT_NEXTHOP(filter, next) \ + (((next) = split_at((filter), ':')) == 0 || *(next) == 0) + +#define RCPT_WITHOUT_DOMAIN(rcpt, next) \ + ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0) + + else if (message->filter_xport + && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) { + reply.flags = 0; + vstring_strcpy(reply.transport, message->filter_xport); + if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop) + && *(nexthop = var_def_filter_nexthop) == 0 + && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop)) + nexthop = var_myhostname; + vstring_strcpy(reply.nexthop, nexthop); + vstring_strcpy(reply.recipient, recipient->address); + } + + /* + * Resolve the destination to (transport, nexthop, address). The + * result address may differ from the one specified by the sender. + */ + else { + if (qmgr_resolve_one(message, recipient, + recipient->address, &reply) < 0) + continue; + if (!STREQ(recipient->address, STR(reply.recipient))) + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + } + + /* + * Bounce null recipients. This should never happen, but is most + * likely the result of a fault in a different program, so aborting + * the queue manager process does not help. + */ + if (recipient->address[0] == 0) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR, + "5.1.3 null recipient address"); + } + + /* + * Discard mail to the local double bounce address here, so this + * system can run without a local delivery agent. They'd still have + * to configure something for mail directed to the local postmaster, + * though, but that is an RFC requirement anyway. + * + * XXX This lookup should be done in the resolver, and the mail should + * be directed to a general-purpose null delivery agent. + */ + if (reply.flags & RESOLVE_CLASS_LOCAL) { + at = strrchr(STR(reply.recipient), '@'); + len = (at ? (at - STR(reply.recipient)) + : strlen(STR(reply.recipient))); + if (strncasecmp_utf8(STR(reply.recipient), + var_double_bounce_sender, len) == 0 + && !var_double_bounce_sender[len]) { + status = sent(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", DSN_SIMPLE(&dsn, "2.0.0", + "undeliverable postmaster notification discarded")); + if (status == 0) { + deliver_completed(message->fp, recipient->offset); +#if 0 + /* It's the default verification probe sender address. */ + msg_warn("%s: undeliverable postmaster notification discarded", + message->queue_id); +#endif + } else + message->flags |= status; + continue; + } + } + + /* + * Optionally defer deliveries over specific transports, unless the + * restriction is lifted temporarily. + */ + if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) { + if (defer_xport_argv == 0) + defer_xport_argv = argv_split(var_defer_xports, CHARS_COMMA_SP); + for (cpp = defer_xport_argv->argv; *cpp; cpp++) + if (strcmp(*cpp, STR(reply.transport)) == 0) + break; + if (*cpp) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.3.2 deferred transport"); + } + } + + /* + * Safety: defer excess address verification requests. + */ + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0 + && qmgr_vrfy_pend_count > var_vrfy_pend_limit) + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.3.2 Too many address verification requests"); + + /* + * Look up or instantiate the proper transport. + */ + if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) { + if ((transport = qmgr_transport_find(STR(reply.transport))) == 0) + transport = qmgr_transport_create(STR(reply.transport)); + queue = 0; + } + + /* + * This message is being flushed. If need-be unthrottle the + * transport. + */ + if ((message->qflags & QMGR_FLUSH_EACH) != 0 + && QMGR_TRANSPORT_THROTTLED(transport)) + qmgr_transport_unthrottle(transport); + + /* + * This transport is dead. Defer delivery to this recipient. + */ + if (QMGR_TRANSPORT_THROTTLED(transport)) { + saved_dsn = transport->dsn; + if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) { + nexthop = qmgr_error_nexthop(saved_dsn); + vstring_strcpy(reply.nexthop, nexthop); + myfree(nexthop); + queue = 0; + } else { + qmgr_defer_recipient(message, recipient, saved_dsn); + continue; + } + } + + /* + * The nexthop destination provides the default name for the + * per-destination queue. When the delivery agent accepts only one + * recipient per delivery, give each recipient its own queue, so that + * deliveries to different recipients of the same message can happen + * in parallel, and so that we can enforce per-recipient concurrency + * limits and prevent one recipient from tying up all the delivery + * agent resources. We use recipient@nexthop as queue name rather + * than the actual recipient domain name, so that one recipient in + * multiple equivalent domains cannot evade the per-recipient + * concurrency limit. Split the address on the recipient delimiter if + * one is defined, so that extended addresses don't get extra + * delivery slots. + * + * Fold the result to lower case so that we don't have multiple queues + * for the same name. + * + * Important! All recipients in a queue must have the same nexthop + * value. It is OK to have multiple queues with the same nexthop + * value, but only when those queues are named after recipients. + * + * The single-recipient code below was written for local(8) like + * delivery agents, and assumes that all domains that deliver to the + * same (transport + nexthop) are aliases for $nexthop. Delivery + * concurrency is changed from per-domain into per-recipient, by + * changing the queue name from nexthop into localpart@nexthop. + * + * XXX This assumption is incorrect when different destinations share + * the same (transport + nexthop). In reality, such transports are + * rarely configured to use single-recipient deliveries. The fix is + * to decouple the per-destination recipient limit from the + * per-destination concurrency. + */ + vstring_strcpy(queue_name, STR(reply.nexthop)); + if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0 + && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0 + && transport->recipient_limit == 1) { + /* Copy the recipient localpart. */ + at = strrchr(STR(reply.recipient), '@'); + len = (at ? (at - STR(reply.recipient)) + : strlen(STR(reply.recipient))); + vstring_strncpy(queue_name, STR(reply.recipient), len); + /* Remove the address extension from the recipient localpart. */ + if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim)) + vstring_truncate(queue_name, strlen(STR(queue_name))); + /* Assume the recipient domain is equivalent to nexthop. */ + vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop)); + } + lowercase(STR(queue_name)); + + /* + * This transport is alive. Find or instantiate a queue for this + * recipient. + */ + if (queue == 0 || !STREQ(queue->name, STR(queue_name))) { + if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0) + queue = qmgr_queue_create(transport, STR(queue_name), + STR(reply.nexthop)); + } + + /* + * This message is being flushed. If need-be unthrottle the queue. + */ + if ((message->qflags & QMGR_FLUSH_EACH) != 0 + && QMGR_QUEUE_THROTTLED(queue)) + qmgr_queue_unthrottle(queue); + + /* + * This queue is dead. Defer delivery to this recipient. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + saved_dsn = queue->dsn; + if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) { + qmgr_defer_recipient(message, recipient, saved_dsn); + continue; + } + } + + /* + * This queue is alive. Bind this recipient to this queue instance. + */ + recipient->u.queue = queue; + } + resolve_clnt_free(&reply); + vstring_free(queue_name); +} + +/* qmgr_message_assign - assign recipients to specific delivery requests */ + +static void qmgr_message_assign(QMGR_MESSAGE *message) +{ + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *recipient; + QMGR_ENTRY *entry = 0; + QMGR_QUEUE *queue; + QMGR_JOB *job = 0; + QMGR_PEER *peer = 0; + + /* + * Try to bundle as many recipients in a delivery request as we can. When + * the recipient resolves to the same site and transport as an existing + * recipient, do not create a new queue entry, just move that recipient + * to the recipient list of the existing queue entry. All this provided + * that we do not exceed the transport-specific limit on the number of + * recipients per transaction. + */ +#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit))) + + for (recipient = list.info; recipient < list.info + list.len; recipient++) { + + /* + * Skip recipients with a dead transport or destination. + */ + if ((queue = recipient->u.queue) == 0) + continue; + + /* + * Lookup or instantiate the message job if necessary. + */ + if (job == 0 || queue->transport != job->transport) { + job = qmgr_job_obtain(message, queue->transport); + peer = 0; + } + + /* + * Lookup or instantiate job peer if necessary. + */ + if (peer == 0 || queue != peer->queue) + peer = qmgr_peer_obtain(job, queue); + + /* + * Lookup old or instantiate new recipient entry. We try to reuse the + * last existing entry whenever the recipient limit permits. + */ + entry = peer->entry_list.prev; + if (message->single_rcpt || entry == 0 + || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len)) + entry = qmgr_entry_create(peer, message); + + /* + * Add the recipient to the current entry and increase all those + * recipient counters accordingly. + */ + recipient_list_add(&entry->rcpt_list, recipient->offset, + recipient->dsn_orcpt, recipient->dsn_notify, + recipient->orig_addr, recipient->address); + job->rcpt_count++; + message->rcpt_count++; + qmgr_recipient_count++; + } + + /* + * Release the message recipient list and reinitialize it for the next + * time. + */ + recipient_list_free(&message->rcpt_list); + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + + /* + * Note that even if qmgr_job_obtain() reset the job candidate cache of + * all transports to which we assigned new recipients, this message may + * have other jobs which we didn't touch at all this time. But the number + * of unread recipients affecting the candidate selection might have + * changed considerably, so we must invalidate the caches if it might be + * of some use. + */ + for (job = message->job_list.next; job; job = job->message_peers.next) + if (job->selected_entries < job->read_entries + && job->blocker_tag != job->transport->blocker_tag) + job->transport->candidate_cache_current = 0; +} + +/* qmgr_message_move_limits - recycle unused recipient slots */ + +static void qmgr_message_move_limits(QMGR_MESSAGE *message) +{ + QMGR_JOB *job; + + for (job = message->job_list.next; job; job = job->message_peers.next) + qmgr_job_move_limits(job); +} + +/* qmgr_message_free - release memory for in-core message structure */ + +void qmgr_message_free(QMGR_MESSAGE *message) +{ + QMGR_JOB *job; + + if (message->refcount != 0) + msg_panic("qmgr_message_free: reference len: %d", message->refcount); + if (message->fp) + msg_panic("qmgr_message_free: queue file is open"); + while ((job = message->job_list.next) != 0) + qmgr_job_free(job); + myfree(message->queue_id); + myfree(message->queue_name); + if (message->dsn_envid) + myfree(message->dsn_envid); + if (message->encoding) + myfree(message->encoding); + if (message->sender) + myfree(message->sender); + if (message->verp_delims) + myfree(message->verp_delims); + if (message->filter_xport) + myfree(message->filter_xport); + if (message->inspect_xport) + myfree(message->inspect_xport); + if (message->redirect_addr) + myfree(message->redirect_addr); + if (message->client_name) + myfree(message->client_name); + if (message->client_addr) + myfree(message->client_addr); + if (message->client_port) + myfree(message->client_port); + if (message->client_proto) + myfree(message->client_proto); + if (message->client_helo) + myfree(message->client_helo); + if (message->sasl_method) + myfree(message->sasl_method); + if (message->sasl_username) + myfree(message->sasl_username); + if (message->sasl_sender) + myfree(message->sasl_sender); + if (message->log_ident) + myfree(message->log_ident); + if (message->rewrite_context) + myfree(message->rewrite_context); + recipient_list_free(&message->rcpt_list); + qmgr_message_count--; + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) + qmgr_vrfy_pend_count--; + myfree((void *) message); +} + +/* qmgr_message_alloc - create in-core message structure */ + +QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, + int qflags, mode_t mode) +{ + const char *myname = "qmgr_message_alloc"; + QMGR_MESSAGE *message; + + if (msg_verbose) + msg_info("%s: %s %s", myname, queue_name, queue_id); + + /* + * Create an in-core message structure. + */ + message = qmgr_message_create(queue_name, queue_id, qflags); + + /* + * Extract message envelope information: time of arrival, sender address, + * recipient addresses. Skip files with malformed envelope information. + */ +#define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT) + + if (qmgr_message_open(message) < 0) { + qmgr_message_free(message); + return (0); + } + if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) { + msg_info("%s: skipped, still being delivered", queue_id); + qmgr_message_close(message); + qmgr_message_free(message); + return (QMGR_MESSAGE_LOCKED); + } + if (qmgr_message_read(message) < 0) { + qmgr_message_close(message); + qmgr_message_free(message); + return (0); + } else { + + /* + * We have validated the queue file content, so it is safe to modify + * the file properties now. + */ + if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0) + msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Reset the defer log. This code should not be here, but we must + * reset the defer log *after* acquiring the exclusive lock on the + * queue file and *before* resolving new recipients. Since all those + * operations are encapsulated so nicely by this routine, the defer + * log reset has to be done here as well. + * + * Note: it is safe to remove the defer logfile from a previous queue + * run of this queue file, because the defer log contains information + * about recipients that still exist in this queue file. + */ + if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT) + msg_fatal("%s: %s: remove %s %s: %m", myname, + queue_id, MAIL_QUEUE_DEFER, queue_id); + qmgr_message_sort(message); + qmgr_message_resolve(message); + qmgr_message_sort(message); + qmgr_message_assign(message); + qmgr_message_close(message); + if (message->rcpt_offset == 0) + qmgr_message_move_limits(message); + return (message); + } +} + +/* qmgr_message_realloc - refresh in-core message structure */ + +QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_message_realloc"; + + /* + * Sanity checks. + */ + if (message->rcpt_offset <= 0) + msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset); + if (msg_verbose) + msg_info("%s: %s %s offset %ld", myname, message->queue_name, + message->queue_id, message->rcpt_offset); + + /* + * Extract recipient addresses. Skip files with malformed envelope + * information. + */ + if (qmgr_message_open(message) < 0) + return (0); + if (qmgr_message_read(message) < 0) { + qmgr_message_close(message); + return (0); + } else { + qmgr_message_sort(message); + qmgr_message_resolve(message); + qmgr_message_sort(message); + qmgr_message_assign(message); + qmgr_message_close(message); + if (message->rcpt_offset == 0) + qmgr_message_move_limits(message); + return (message); + } +} diff --git a/src/qmgr/qmgr_move.c b/src/qmgr/qmgr_move.c new file mode 100644 index 0000000..e68f803 --- /dev/null +++ b/src/qmgr/qmgr_move.c @@ -0,0 +1,104 @@ +/*++ +/* NAME +/* qmgr_move 3 +/* SUMMARY +/* move queue entries to another queue +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_move(from, to, time_stamp) +/* const char *from; +/* const char *to; +/* time_t time_stamp; +/* DESCRIPTION +/* The \fBqmgr_move\fR routine scans the \fIfrom\fR queue for entries +/* with valid queue names and moves them to the \fIto\fR queue. +/* If \fItime_stamp\fR is non-zero, the queue file time stamps are +/* set to the specified value. +/* Entries with invalid names are left alone. No attempt is made to +/* look for other badness such as multiple links or weird file types. +/* These issues are dealt with when a queue file is actually opened. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include +#include +#include +#include +#include + +/* Utility library. */ + +#include +#include +#include + +/* Global library. */ + +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_move - move queue entries to another queue, leave bad files alone */ + +void qmgr_move(const char *src_queue, const char *dst_queue, + time_t time_stamp) +{ + const char *myname = "qmgr_move"; + SCAN_DIR *queue_dir; + char *queue_id; + struct utimbuf tbuf; + const char *path; + + if (strcmp(src_queue, dst_queue) == 0) + msg_panic("%s: source queue %s is destination", myname, src_queue); + if (msg_verbose) + msg_info("start move queue %s -> %s", src_queue, dst_queue); + + queue_dir = scan_dir_open(src_queue); + while ((queue_id = mail_scan_dir_next(queue_dir)) != 0) { + if (mail_queue_id_ok(queue_id)) { + if (time_stamp > 0) { + tbuf.actime = tbuf.modtime = time_stamp; + path = mail_queue_path((VSTRING *) 0, src_queue, queue_id); + if (utime(path, &tbuf) < 0) { + if (errno != ENOENT) + msg_fatal("%s: update %s time stamps: %m", myname, path); + msg_warn("%s: update %s time stamps: %m", myname, path); + continue; + } + } + if (mail_queue_rename(queue_id, src_queue, dst_queue)) { + if (errno != ENOENT) + msg_fatal("%s: rename %s from %s to %s: %m", + myname, queue_id, src_queue, dst_queue); + msg_warn("%s: rename %s from %s to %s: %m", + myname, queue_id, src_queue, dst_queue); + continue; + } + if (msg_verbose) + msg_info("%s: moved %s from %s to %s", + myname, queue_id, src_queue, dst_queue); + } else { + msg_warn("%s: ignored: queue %s id %s", + myname, src_queue, queue_id); + } + } + scan_dir_close(queue_dir); + + if (msg_verbose) + msg_info("end move queue %s -> %s", src_queue, dst_queue); +} diff --git a/src/qmgr/qmgr_peer.c b/src/qmgr/qmgr_peer.c new file mode 100644 index 0000000..ebf9632 --- /dev/null +++ b/src/qmgr/qmgr_peer.c @@ -0,0 +1,154 @@ +/*++ +/* NAME +/* qmgr_peer 3 +/* SUMMARY +/* per-job peers +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_PEER *qmgr_peer_create(job, queue) +/* QMGR_JOB *job; +/* QMGR_QUEUE *queue; +/* +/* QMGR_PEER *qmgr_peer_find(job, queue) +/* QMGR_JOB *job; +/* QMGR_QUEUE *queue; +/* +/* QMGR_PEER *qmgr_peer_obtain(job, queue) +/* QMGR_JOB *job; +/* QMGR_QUEUE *queue; +/* +/* void qmgr_peer_free(peer) +/* QMGR_PEER *peer; +/* +/* QMGR_PEER *qmgr_peer_select(job) +/* QMGR_JOB *job; +/* +/* DESCRIPTION +/* These routines add/delete/manipulate per-job peers. +/* Each peer corresponds to a specific job and destination. +/* It is similar to per-transport queue structure, but groups +/* only the entries of the given job. +/* +/* qmgr_peer_create() creates an empty peer structure for the named +/* job and destination. It is an error to call this function +/* if a peer for given combination already exists. +/* +/* qmgr_peer_find() looks up the peer for the named destination +/* for the named job. A null result means that the peer +/* was not found. +/* +/* qmgr_peer_obtain() looks up the peer for the named destination +/* for the named job. If it doesn't exist yet, it creates it. +/* +/* qmgr_peer_free() disposes of a per-job peer after all +/* its entries have been taken care of. It is an error to dispose +/* of a peer still in use. +/* +/* qmgr_peer_select() attempts to find a peer of named job that +/* has messages pending delivery. This routine implements +/* round-robin search among job's peers. +/* DIAGNOSTICS +/* Panic: consistency check failure. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Patrik Rak +/* patrik@raxoft.cz +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +#include +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_peer_create - create and initialize message peer structure */ + +QMGR_PEER *qmgr_peer_create(QMGR_JOB *job, QMGR_QUEUE *queue) +{ + QMGR_PEER *peer; + + peer = (QMGR_PEER *) mymalloc(sizeof(QMGR_PEER)); + peer->queue = queue; + peer->job = job; + QMGR_LIST_APPEND(job->peer_list, peer, peers); + htable_enter(job->peer_byname, queue->name, (void *) peer); + peer->refcount = 0; + QMGR_LIST_INIT(peer->entry_list); + return (peer); +} + +/* qmgr_peer_free - release peer structure */ + +void qmgr_peer_free(QMGR_PEER *peer) +{ + const char *myname = "qmgr_peer_free"; + QMGR_JOB *job = peer->job; + QMGR_QUEUE *queue = peer->queue; + + /* + * Sanity checks. It is an error to delete a referenced peer structure. + */ + if (peer->refcount != 0) + msg_panic("%s: refcount: %d", myname, peer->refcount); + if (peer->entry_list.next != 0) + msg_panic("%s: entry list not empty: %s", myname, queue->name); + + QMGR_LIST_UNLINK(job->peer_list, QMGR_PEER *, peer, peers); + htable_delete(job->peer_byname, queue->name, (void (*) (void *)) 0); + myfree((void *) peer); +} + +/* qmgr_peer_find - lookup peer associated with given job and queue */ + +QMGR_PEER *qmgr_peer_find(QMGR_JOB *job, QMGR_QUEUE *queue) +{ + return ((QMGR_PEER *) htable_find(job->peer_byname, queue->name)); +} + +/* qmgr_peer_obtain - find/create peer associated with given job and queue */ + +QMGR_PEER *qmgr_peer_obtain(QMGR_JOB *job, QMGR_QUEUE *queue) +{ + QMGR_PEER *peer; + + if ((peer = qmgr_peer_find(job, queue)) == 0) + peer = qmgr_peer_create(job, queue); + return (peer); +} + +/* qmgr_peer_select - select next peer suitable for delivery within given job */ + +QMGR_PEER *qmgr_peer_select(QMGR_JOB *job) +{ + QMGR_PEER *peer; + QMGR_QUEUE *queue; + + /* + * If we find a suitable site, rotate the list to enforce round-robin + * selection. See similar selection code in qmgr_transport_select(). + */ + for (peer = job->peer_list.next; peer; peer = peer->peers.next) { + queue = peer->queue; + if (queue->window > queue->busy_refcount && peer->entry_list.next != 0) { + QMGR_LIST_ROTATE(job->peer_list, peer, peers); + if (msg_verbose) + msg_info("qmgr_peer_select: %s %s %s (%d of %d)", + job->message->queue_id, queue->transport->name, queue->name, + queue->busy_refcount + 1, queue->window); + return (peer); + } + } + return (0); +} diff --git a/src/qmgr/qmgr_queue.c b/src/qmgr/qmgr_queue.c new file mode 100644 index 0000000..d2ffe09 --- /dev/null +++ b/src/qmgr/qmgr_queue.c @@ -0,0 +1,439 @@ +/*++ +/* NAME +/* qmgr_queue 3 +/* SUMMARY +/* per-destination queues +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_queue_count; +/* +/* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop) +/* QMGR_TRANSPORT *transport; +/* const char *name; +/* const char *nexthop; +/* +/* void qmgr_queue_done(queue) +/* QMGR_QUEUE *queue; +/* +/* QMGR_QUEUE *qmgr_queue_find(transport, name) +/* QMGR_TRANSPORT *transport; +/* const char *name; +/* +/* void qmgr_queue_throttle(queue, dsn) +/* QMGR_QUEUE *queue; +/* DSN *dsn; +/* +/* void qmgr_queue_unthrottle(queue) +/* QMGR_QUEUE *queue; +/* +/* void qmgr_queue_suspend(queue, delay) +/* QMGR_QUEUE *queue; +/* int delay; +/* DESCRIPTION +/* These routines add/delete/manipulate per-destination queues. +/* Each queue corresponds to a specific transport and destination. +/* Each queue has a `todo' list of delivery requests for that +/* destination, and a `busy' list of delivery requests in progress. +/* +/* qmgr_queue_count is a global counter for the total number +/* of in-core queue structures. +/* +/* qmgr_queue_create() creates an empty named queue for the named +/* transport and destination. The queue is given an initial +/* concurrency limit as specified with the +/* \fIinitial_destination_concurrency\fR configuration parameter, +/* provided that it does not exceed the transport-specific +/* concurrency limit. +/* +/* qmgr_queue_done() disposes of a per-destination queue after all +/* its entries have been taken care of. It is an error to dispose +/* of a dead queue. +/* +/* qmgr_queue_find() looks up the named queue for the named +/* transport. A null result means that the queue was not found. +/* +/* qmgr_queue_throttle() handles a delivery error, and decrements the +/* concurrency limit for the destination, with a lower bound of 1. +/* When the cohort failure bound is reached, qmgr_queue_throttle() +/* sets the concurrency limit to zero and starts a timer +/* to re-enable delivery to the destination after a configurable delay. +/* +/* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects. +/* The concurrency limit for the destination is incremented, +/* provided that it does not exceed the destination concurrency +/* limit specified for the transport. This routine implements +/* "slow open" mode, and eliminates the "thundering herd" problem. +/* +/* qmgr_queue_suspend() suspends delivery for this destination +/* briefly. This function invalidates any scheduling decisions +/* that are based on the present queue's concurrency window. +/* To compensate for work skipped by qmgr_entry_done(), the +/* status of blocker jobs is re-evaluated after the queue is +/* resumed. +/* DIAGNOSTICS +/* Panic: consistency check failure. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Pre-emptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/* +/* Concurrency scheduler enhancements with: +/* Victor Duchovni +/* Morgan Stanley +/*--*/ + +/* System library. */ + +#include +#include + +/* Utility library. */ + +#include +#include +#include +#include + +/* Global library. */ + +#include +#include +#include /* QMGR_LOG_WINDOW */ + +/* Application-specific. */ + +#include "qmgr.h" + +int qmgr_queue_count; + +#define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ + (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \ + || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0) + +#define QMGR_LOG_FEEDBACK(feedback) \ + if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ + msg_info("%s: feedback %g", myname, feedback); + +#define QMGR_LOG_WINDOW(queue) \ + if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ + msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \ + myname, queue->name, queue->transport->dest_concurrency_limit, \ + queue->window, queue->success, queue->failure, queue->fail_cohorts); + +/* qmgr_queue_resume - resume delivery to destination */ + +static void qmgr_queue_resume(int event, void *context) +{ + QMGR_QUEUE *queue = (QMGR_QUEUE *) context; + const char *myname = "qmgr_queue_resume"; + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_SUSPENDED(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + + /* + * We can't simply force delivery on this queue: the transport's pending + * count may already be maxed out, and there may be other constraints + * that definitely should be none of our business. The best we can do is + * to play by the same rules as everyone else: let qmgr_active_drain() + * and round-robin selection take care of message selection. + */ + queue->window = 1; + + /* + * Every event handler that leaves a queue in the "ready" state should + * remove the queue when it is empty. + * + * XXX Do not omit the redundant test below. It is here to simplify code + * consistency checks. The check is trivially eliminated by the compiler + * optimizer. There is no need to sacrifice code clarity for the sake of + * performance. + * + * XXX Do not expose the blocker job logic here. Rate-limited queues are not + * a performance-critical feature. Here, too, there is no need to + * sacrifice code clarity for the sake of performance. + */ + if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) + qmgr_queue_done(queue); + else + qmgr_job_blocker_update(queue); +} + +/* qmgr_queue_suspend - briefly suspend a destination */ + +void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) +{ + const char *myname = "qmgr_queue_suspend"; + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + if (queue->busy_refcount > 0) + msg_panic("%s: queue is busy", myname); + + /* + * Set the queue status to "suspended". No-one is supposed to remove a + * queue in suspended state. + */ + queue->window = QMGR_QUEUE_STAT_SUSPENDED; + event_request_timer(qmgr_queue_resume, (void *) queue, delay); +} + +/* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */ + +static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context) +{ + QMGR_QUEUE *queue = (QMGR_QUEUE *) context; + + /* + * This routine runs when a wakeup timer goes off; it does not run in the + * context of some queue manipulation. Therefore, it is safe to discard + * this in-core queue when it is empty and when this site is not dead. + */ + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) + qmgr_queue_done(queue); +} + +/* qmgr_queue_unthrottle - give this destination another chance */ + +void qmgr_queue_unthrottle(QMGR_QUEUE *queue) +{ + const char *myname = "qmgr_queue_unthrottle"; + QMGR_TRANSPORT *transport = queue->transport; + double feedback; + + if (msg_verbose) + msg_info("%s: queue %s", myname, queue->name); + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + + /* + * Don't restart the negative feedback hysteresis cycle with every + * positive feedback. Restart it only when we make a positive concurrency + * adjustment (i.e. at the end of a positive feedback hysteresis cycle). + * Otherwise negative feedback would be too aggressive: negative feedback + * takes effect immediately at the start of its hysteresis cycle. + */ + queue->fail_cohorts = 0; + + /* + * Special case when this site was dead. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue); + if (queue->dsn == 0) + msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); + dsn_free(queue->dsn); + queue->dsn = 0; + /* Back from the almost grave, best concurrency is anyone's guess. */ + if (queue->busy_refcount > 0) + queue->window = queue->busy_refcount; + else + queue->window = transport->init_dest_concurrency; + queue->success = queue->failure = 0; + QMGR_LOG_WINDOW(queue); + return; + } + + /* + * Increase the destination's concurrency limit until we reach the + * transport's concurrency limit. Allow for a margin the size of the + * initial destination concurrency, so that we're not too gentle. + * + * Why is the concurrency increment based on preferred concurrency and not + * on the number of outstanding delivery requests? The latter fluctuates + * wildly when deliveries complete in bursts (artificial benchmark + * measurements), and does not account for cached connections. + * + * Keep the window within reasonable distance from actual concurrency + * otherwise negative feedback will be ineffective. This expression + * assumes that busy_refcount changes gradually. This is invalid when + * deliveries complete in bursts (artificial benchmark measurements). + */ + if (transport->dest_concurrency_limit == 0 + || transport->dest_concurrency_limit > queue->window) + if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { + feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); + QMGR_LOG_FEEDBACK(feedback); + queue->success += feedback; + /* Prepare for overshoot (feedback > hysteresis, rounding error). */ + while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { + queue->window += transport->pos_feedback.hysteresis; + queue->success -= transport->pos_feedback.hysteresis; + queue->failure = 0; + } + /* Prepare for overshoot. */ + if (transport->dest_concurrency_limit > 0 + && queue->window > transport->dest_concurrency_limit) + queue->window = transport->dest_concurrency_limit; + } + QMGR_LOG_WINDOW(queue); +} + +/* qmgr_queue_throttle - handle destination delivery failure */ + +void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) +{ + const char *myname = "qmgr_queue_throttle"; + QMGR_TRANSPORT *transport = queue->transport; + double feedback; + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + if (queue->dsn) + msg_panic("%s: queue %s: spurious reason %s", + myname, queue->name, queue->dsn->reason); + if (msg_verbose) + msg_info("%s: queue %s: %s %s", + myname, queue->name, dsn->status, dsn->reason); + + /* + * Don't restart the positive feedback hysteresis cycle with every + * negative feedback. Restart it only when we make a negative concurrency + * adjustment (i.e. at the start of a negative feedback hysteresis + * cycle). Otherwise positive feedback would be too weak (positive + * feedback does not take effect until the end of its hysteresis cycle). + */ + + /* + * This queue is declared dead after a configurable number of + * pseudo-cohort failures. + */ + if (QMGR_QUEUE_READY(queue)) { + queue->fail_cohorts += 1.0 / queue->window; + if (transport->fail_cohort_limit > 0 + && queue->fail_cohorts >= transport->fail_cohort_limit) + queue->window = QMGR_QUEUE_STAT_THROTTLED; + } + + /* + * Decrease the destination's concurrency limit until we reach 1. Base + * adjustments on the concurrency limit itself, instead of using the + * actual concurrency. The latter fluctuates wildly when deliveries + * complete in bursts (artificial benchmark measurements). + * + * Even after reaching 1, we maintain the negative hysteresis cycle so that + * negative feedback can cancel out positive feedback. + */ + if (QMGR_QUEUE_READY(queue)) { + feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); + QMGR_LOG_FEEDBACK(feedback); + queue->failure -= feedback; + /* Prepare for overshoot (feedback > hysteresis, rounding error). */ + while (queue->failure - feedback / 2 < 0) { + queue->window -= transport->neg_feedback.hysteresis; + queue->success = 0; + queue->failure += transport->neg_feedback.hysteresis; + } + /* Prepare for overshoot. */ + if (queue->window < 1) + queue->window = 1; + } + + /* + * Special case for a site that just was declared dead. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + queue->dsn = DSN_COPY(dsn); + event_request_timer(qmgr_queue_unthrottle_wrapper, + (void *) queue, var_min_backoff_time); + queue->dflags = 0; + } + QMGR_LOG_WINDOW(queue); +} + +/* qmgr_queue_done - delete in-core queue for site */ + +void qmgr_queue_done(QMGR_QUEUE *queue) +{ + const char *myname = "qmgr_queue_done"; + QMGR_TRANSPORT *transport = queue->transport; + + /* + * Sanity checks. It is an error to delete an in-core queue with pending + * messages or timers. + */ + if (queue->busy_refcount != 0 || queue->todo_refcount != 0) + msg_panic("%s: refcount: %d", myname, + queue->busy_refcount + queue->todo_refcount); + if (queue->todo.next || queue->busy.next) + msg_panic("%s: queue not empty: %s", myname, queue->name); + if (!QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + if (queue->dsn) + msg_panic("%s: queue %s: spurious reason %s", + myname, queue->name, queue->dsn->reason); + + /* + * Clean up this in-core queue. + */ + QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers); + htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0); + myfree(queue->name); + myfree(queue->nexthop); + qmgr_queue_count--; + myfree((void *) queue); +} + +/* qmgr_queue_create - create in-core queue for site */ + +QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, + const char *nexthop) +{ + QMGR_QUEUE *queue; + + /* + * If possible, choose an initial concurrency of > 1 so that one bad + * message or one bad network won't slow us down unnecessarily. + */ + + queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); + qmgr_queue_count++; + queue->dflags = 0; + queue->last_done = 0; + queue->name = mystrdup(name); + queue->nexthop = mystrdup(nexthop); + queue->todo_refcount = 0; + queue->busy_refcount = 0; + queue->transport = transport; + queue->window = transport->init_dest_concurrency; + queue->success = queue->failure = queue->fail_cohorts = 0; + QMGR_LIST_INIT(queue->todo); + QMGR_LIST_INIT(queue->busy); + queue->dsn = 0; + queue->clog_time_to_warn = 0; + queue->blocker_tag = 0; + QMGR_LIST_APPEND(transport->queue_list, queue, peers); + htable_enter(transport->queue_byname, name, (void *) queue); + return (queue); +} + +/* qmgr_queue_find - find in-core named queue */ + +QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name) +{ + return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name)); +} diff --git a/src/qmgr/qmgr_scan.c b/src/qmgr/qmgr_scan.c new file mode 100644 index 0000000..0665a23 --- /dev/null +++ b/src/qmgr/qmgr_scan.c @@ -0,0 +1,185 @@ +/*++ +/* NAME +/* qmgr_scan 3 +/* SUMMARY +/* queue scanning +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_SCAN *qmgr_scan_create(queue_name) +/* const char *queue_name; +/* +/* char *qmgr_scan_next(scan_info) +/* QMGR_SCAN *scan_info; +/* +/* void qmgr_scan_request(scan_info, flags) +/* QMGR_SCAN *scan_info; +/* int flags; +/* DESCRIPTION +/* This module implements queue scans. A queue scan always runs +/* to completion, so that all files get a fair chance. The caller +/* can request that a queue scan be restarted once it completes. +/* +/* qmgr_scan_create() creates a context for scanning the named queue, +/* but does not start a queue scan. +/* +/* qmgr_scan_next() returns the base name of the next queue file. +/* A null pointer means that no file was found. qmgr_scan_next() +/* automagically restarts a queue scan when a scan request had +/* arrived while the scan was in progress. +/* +/* qmgr_scan_request() records a request for the next queue scan. The +/* flags argument is the bit-wise OR of zero or more of the following, +/* unrecognized flags being ignored: +/* .IP QMGR_FLUSH_ONCE +/* Forget state information about dead hosts or transports. +/* This request takes effect immediately. +/* .IP QMGR_FLUSH_DFXP +/* Override the defer_transports setting. This takes effect +/* immediately when a queue scan is in progress, and affects +/* the next queue scan. +/* .IP QMGR_SCAN_ALL +/* Ignore queue file time stamps. This takes effect immediately +/* when a queue scan is in progress, and affects the next queue +/* scan. +/* .IP QMGR_SCAN_START +/* Start a queue scan when none is in progress, or restart the +/* current scan upon completion. +/* DIAGNOSTICS +/* Fatal: out of memory. +/* Panic: interface violations, internal consistency errors. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include + +/* Utility library. */ + +#include +#include +#include + +/* Global library. */ + +#include + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_scan_start - start queue scan */ + +static void qmgr_scan_start(QMGR_SCAN *scan_info) +{ + const char *myname = "qmgr_scan_start"; + + /* + * Sanity check. + */ + if (scan_info->handle) + msg_panic("%s: %s queue scan in progress", + myname, scan_info->queue); + + /* + * Give the poor tester a clue. + */ + if (msg_verbose) + msg_info("%s: %sstart %s queue scan", + myname, + scan_info->nflags & QMGR_SCAN_START ? "re" : "", + scan_info->queue); + + /* + * Start or restart the scan. + */ + scan_info->flags = scan_info->nflags; + scan_info->nflags = 0; + scan_info->handle = scan_dir_open(scan_info->queue); +} + +/* qmgr_scan_request - request for future scan */ + +void qmgr_scan_request(QMGR_SCAN *scan_info, int flags) +{ + + /* + * Apply "forget all dead destinations" requests immediately. Throttle + * dead transports and queues at the earliest opportunity: preferably + * during an already ongoing queue scan, otherwise the throttling will + * have to wait until a "start scan" trigger arrives. + * + * The QMGR_FLUSH_ONCE request always comes with QMGR_FLUSH_DFXP, and + * sometimes it also comes with QMGR_SCAN_ALL. It becomes a completely + * different story when a flush request is encoded in file permissions. + */ + if (flags & QMGR_FLUSH_ONCE) + qmgr_enable_all(); + + /* + * Apply "ignore time stamp" requests also towards the scan that is + * already in progress. + */ + if (scan_info->handle != 0 && (flags & QMGR_SCAN_ALL)) + scan_info->flags |= QMGR_SCAN_ALL; + + /* + * Apply "override defer_transports" requests also towards the scan that + * is already in progress. + */ + if (scan_info->handle != 0 && (flags & QMGR_FLUSH_DFXP)) + scan_info->flags |= QMGR_FLUSH_DFXP; + + /* + * If a scan is in progress, just record the request. + */ + scan_info->nflags |= flags; + if (scan_info->handle == 0 && (flags & QMGR_SCAN_START) != 0) { + scan_info->nflags &= ~QMGR_SCAN_START; + qmgr_scan_start(scan_info); + } +} + +/* qmgr_scan_next - look for next queue file */ + +char *qmgr_scan_next(QMGR_SCAN *scan_info) +{ + char *path = 0; + + /* + * Restart the scan if we reach the end and a queue scan request has + * arrived in the mean time. + */ + if (scan_info->handle && (path = mail_scan_dir_next(scan_info->handle)) == 0) { + scan_info->handle = scan_dir_close(scan_info->handle); + if (msg_verbose && (scan_info->nflags & QMGR_SCAN_START) == 0) + msg_info("done %s queue scan", scan_info->queue); + } + if (!scan_info->handle && (scan_info->nflags & QMGR_SCAN_START)) { + qmgr_scan_start(scan_info); + path = mail_scan_dir_next(scan_info->handle); + } + return (path); +} + +/* qmgr_scan_create - create queue scan context */ + +QMGR_SCAN *qmgr_scan_create(const char *queue) +{ + QMGR_SCAN *scan_info; + + scan_info = (QMGR_SCAN *) mymalloc(sizeof(*scan_info)); + scan_info->queue = mystrdup(queue); + scan_info->flags = scan_info->nflags = 0; + scan_info->handle = 0; + return (scan_info); +} diff --git a/src/qmgr/qmgr_transport.c b/src/qmgr/qmgr_transport.c new file mode 100644 index 0000000..fe99806 --- /dev/null +++ b/src/qmgr/qmgr_transport.c @@ -0,0 +1,504 @@ +/*++ +/* NAME +/* qmgr_transport 3 +/* SUMMARY +/* per-transport data structures +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_TRANSPORT *qmgr_transport_create(name) +/* const char *name; +/* +/* QMGR_TRANSPORT *qmgr_transport_find(name) +/* const char *name; +/* +/* QMGR_TRANSPORT *qmgr_transport_select() +/* +/* void qmgr_transport_alloc(transport, notify) +/* QMGR_TRANSPORT *transport; +/* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp); +/* +/* void qmgr_transport_throttle(transport, dsn) +/* QMGR_TRANSPORT *transport; +/* DSN *dsn; +/* +/* void qmgr_transport_unthrottle(transport) +/* QMGR_TRANSPORT *transport; +/* DESCRIPTION +/* This module organizes the world by message transport type. +/* Each transport can have zero or more destination queues +/* associated with it. +/* +/* qmgr_transport_create() instantiates a data structure for the +/* named transport type. +/* +/* qmgr_transport_find() looks up an existing message transport +/* data structure. +/* +/* qmgr_transport_select() attempts to find a transport that +/* has messages pending delivery. This routine implements +/* round-robin search among transports. +/* +/* qmgr_transport_alloc() allocates a delivery process for the +/* specified transport type. Allocation is performed asynchronously. +/* When a process becomes available, the application callback routine +/* is invoked with as arguments the transport and a stream that +/* is connected to a delivery process. It is an error to call +/* qmgr_transport_alloc() while delivery process allocation for +/* the same transport is in progress. +/* +/* qmgr_transport_throttle blocks further allocation of delivery +/* processes for the named transport. Attempts to throttle a +/* throttled transport are ignored. +/* +/* qmgr_transport_unthrottle() undoes qmgr_transport_throttle(). +/* Attempts to unthrottle a non-throttled transport are ignored. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Preemptive scheduler enhancements: +/* Patrik Rak +/* Modra 6 +/* 155 00, Prague, Czech Republic +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include +#include + +#include /* FD_SETSIZE */ +#include /* FD_SETSIZE */ +#include /* FD_SETSIZE */ + +#ifdef USE_SYS_SELECT_H +#include /* FD_SETSIZE */ +#endif + +/* Utility library. */ + +#include +#include +#include +#include +#include +#include + +/* Global library. */ + +#include +#include +#include +#include + +/* Application-specific. */ + +#include "qmgr.h" + +HTABLE *qmgr_transport_byname; /* transport by name */ +QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */ + + /* + * A local structure to remember a delivery process allocation request. + */ +typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC; + +struct QMGR_TRANSPORT_ALLOC { + QMGR_TRANSPORT *transport; /* transport context */ + VSTREAM *stream; /* delivery service stream */ + QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ +}; + + /* + * Connections to delivery agents are managed asynchronously. Each delivery + * agent connection goes through multiple wait states: + * + * - With Linux/Solaris and old queue manager implementations only, wait for + * the server to invoke accept(). + * + * - Wait for the delivery agent's announcement that it is ready to receive a + * delivery request. + * + * - Wait for the delivery request completion status. + * + * Older queue manager implementations had only one pending delivery agent + * connection per transport. With low-latency destinations, the output rates + * were reduced on Linux/Solaris systems that had the extra wait state. + * + * To maximize delivery agent output rates with low-latency destinations, the + * following changes were made to the queue manager by the end of the 2.4 + * development cycle: + * + * - The Linux/Solaris accept() wait state was eliminated. + * + * - A pipeline was implemented for pending delivery agent connections. The + * number of pending delivery agent connections was increased from one to + * two: the number of before-delivery wait states, plus one extra pipeline + * slot to prevent the pipeline from stalling easily. Increasing the + * pipeline much further actually hurt performance. + * + * - To reduce queue manager disk competition with delivery agents, the queue + * scanning algorithm was modified to import only one message per interrupt. + * The incoming and deferred queue scans now happen on alternate interrupts. + * + * Simplistically reasoned, a non-zero (incoming + active) queue length is + * equivalent to a time shift for mail deliveries; this is undesirable when + * delivery agents are not fully utilized. + * + * On the other hand a non-empty active queue is what allows us to do clever + * things such as queue file prefetch, concurrency windows, and connection + * caching; the idea is that such "thinking time" is affordable only after + * the output channels are maxed out. + */ +#ifndef QMGR_TRANSPORT_MAX_PEND +#define QMGR_TRANSPORT_MAX_PEND 2 +#endif + + /* + * Important note on the _transport_rate_delay implementation: after + * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all + * code paths must directly or indirectly invoke qmgr_transport_unthrottle() + * or qmgr_transport_throttle(). Otherwise, transports with non-zero + * _transport_rate_delay will become stuck. + */ + +/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ + +static void qmgr_transport_unthrottle_wrapper(int unused_event, void *context) +{ + qmgr_transport_unthrottle((QMGR_TRANSPORT *) context); +} + +/* qmgr_transport_unthrottle - open the throttle */ + +void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport) +{ + const char *myname = "qmgr_transport_unthrottle"; + + /* + * This routine runs after expiration of the timer set by + * qmgr_transport_throttle(), or whenever a delivery transport has been + * used without malfunction. In either case, we enable delivery again if + * the transport was throttled. We always reset the transport rate lock. + */ + if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) { + if (msg_verbose) + msg_info("%s: transport %s", myname, transport->name); + transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD; + if (transport->dsn == 0) + msg_panic("%s: transport %s: null reason", + myname, transport->name); + dsn_free(transport->dsn); + transport->dsn = 0; + event_cancel_timer(qmgr_transport_unthrottle_wrapper, + (void *) transport); + } + if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) + transport->flags &= ~QMGR_TRANSPORT_STAT_RATE_LOCK; +} + +/* qmgr_transport_throttle - disable delivery process allocation */ + +void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) +{ + const char *myname = "qmgr_transport_throttle"; + + /* + * We are unable to connect to a deliver process for this type of message + * transport. Instead of hosing the system by retrying in a tight loop, + * back off and disable this transport type for a while. + */ + if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) { + if (msg_verbose) + msg_info("%s: transport %s: status: %s reason: %s", + myname, transport->name, dsn->status, dsn->reason); + transport->flags |= QMGR_TRANSPORT_STAT_DEAD; + if (transport->dsn) + msg_panic("%s: transport %s: spurious reason: %s", + myname, transport->name, transport->dsn->reason); + transport->dsn = DSN_COPY(dsn); + event_request_timer(qmgr_transport_unthrottle_wrapper, + (void *) transport, var_transport_retry_time); + } +} + +/* qmgr_transport_abort - transport connect watchdog */ + +static void qmgr_transport_abort(int unused_event, void *context) +{ + QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; + + msg_fatal("timeout connecting to transport: %s", alloc->transport->name); +} + +/* qmgr_transport_rate_event - delivery process availability notice */ + +static void qmgr_transport_rate_event(int unused_event, void *context) +{ + QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; + + alloc->notify(alloc->transport, alloc->stream); + myfree((void *) alloc); +} + +/* qmgr_transport_event - delivery process availability notice */ + +static void qmgr_transport_event(int unused_event, void *context) +{ + QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; + + /* + * This routine notifies the application when the request given to + * qmgr_transport_alloc() completes. + */ + if (msg_verbose) + msg_info("transport_event: %s", alloc->transport->name); + + /* + * Connection request completed. Stop the watchdog timer. + */ + event_cancel_timer(qmgr_transport_abort, context); + + /* + * Disable further read events that end up calling this function, and + * free up this pending connection pipeline slot. + */ + if (alloc->stream) { + event_disable_readwrite(vstream_fileno(alloc->stream)); + non_blocking(vstream_fileno(alloc->stream), BLOCKING); + } + alloc->transport->pending -= 1; + + /* + * Notify the requestor. + */ + if (alloc->transport->xport_rate_delay > 0) { + if ((alloc->transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) == 0) + msg_panic("transport_event: missing rate lock for transport %s", + alloc->transport->name); + event_request_timer(qmgr_transport_rate_event, (void *) alloc, + alloc->transport->xport_rate_delay); + } else { + alloc->notify(alloc->transport, alloc->stream); + myfree((void *) alloc); + } +} + +/* qmgr_transport_select - select transport for allocation */ + +QMGR_TRANSPORT *qmgr_transport_select(void) +{ + QMGR_TRANSPORT *xport; + QMGR_QUEUE *queue; + int need; + + /* + * If we find a suitable transport, rotate the list of transports to + * effectuate round-robin selection. See similar selection code in + * qmgr_peer_select(). + * + * This function is called repeatedly until all transports have maxed out + * the number of pending delivery agent connections, until all delivery + * agent concurrency windows are maxed out, or until we run out of "todo" + * queue entries. + */ +#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) + + for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { + if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 + || (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0 + || xport->pending >= QMGR_TRANSPORT_MAX_PEND) + continue; + need = xport->pending + 1; + for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { + if (QMGR_QUEUE_READY(queue) == 0) + continue; + if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, + queue->todo_refcount)) <= 0) { + QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers); + if (msg_verbose) + msg_info("qmgr_transport_select: %s", xport->name); + return (xport); + } + } + } + return (0); +} + +/* qmgr_transport_alloc - allocate delivery process */ + +void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) +{ + QMGR_TRANSPORT_ALLOC *alloc; + + /* + * Sanity checks. + */ + if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) + msg_panic("qmgr_transport: dead transport: %s", transport->name); + if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) + msg_panic("qmgr_transport: rate-locked transport: %s", transport->name); + if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) + msg_panic("qmgr_transport: excess allocation: %s", transport->name); + + /* + * When this message delivery transport is rate-limited, do not select it + * again before the end of a message delivery transaction. + */ + if (transport->xport_rate_delay > 0) + transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK; + + /* + * Connect to the well-known port for this delivery service, and wake up + * when a process announces its availability. Allow only a limited number + * of delivery process allocation attempts for this transport. In case of + * problems, back off. Do not hose the system when it is in trouble + * already. + * + * Use non-blocking connect(), so that Linux won't block the queue manager + * until the delivery agent calls accept(). + * + * When the connection to delivery agent cannot be completed, notify the + * event handler so that it can throttle the transport and defer the todo + * queues, just like it does when communication fails *after* connection + * completion. + * + * Before Postfix 2.4, the event handler was not invoked after connect() + * error, and mail was not deferred. Because of this, mail would be stuck + * in the active queue after triggering a "connection refused" condition. + */ + alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); + alloc->transport = transport; + alloc->notify = notify; + transport->pending += 1; + if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, + NON_BLOCKING)) == 0) { + msg_warn("connect to transport %s/%s: %m", + MAIL_CLASS_PRIVATE, transport->name); + event_request_timer(qmgr_transport_event, (void *) alloc, 0); + return; + } +#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(CA_VSTREAM_CTL_DUPFD) +#ifndef THRESHOLD_FD_WORKAROUND +#define THRESHOLD_FD_WORKAROUND 128 +#endif + vstream_control(alloc->stream, + CA_VSTREAM_CTL_DUPFD(THRESHOLD_FD_WORKAROUND), + CA_VSTREAM_CTL_END); +#endif + event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, + (void *) alloc); + + /* + * Guard against broken systems. + */ + event_request_timer(qmgr_transport_abort, (void *) alloc, + var_daemon_timeout); +} + +/* qmgr_transport_create - create transport instance */ + +QMGR_TRANSPORT *qmgr_transport_create(const char *name) +{ + QMGR_TRANSPORT *transport; + + if (htable_find(qmgr_transport_byname, name) != 0) + msg_panic("qmgr_transport_create: transport exists: %s", name); + transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); + transport->flags = 0; + transport->pending = 0; + transport->name = mystrdup(name); + + /* + * Use global configuration settings or transport-specific settings. + */ + transport->dest_concurrency_limit = + get_mail_conf_int2(name, _DEST_CON_LIMIT, + var_dest_con_limit, 0, 0); + transport->recipient_limit = + get_mail_conf_int2(name, _DEST_RCPT_LIMIT, + var_dest_rcpt_limit, 0, 0); + transport->init_dest_concurrency = + get_mail_conf_int2(name, _INIT_DEST_CON, + var_init_dest_concurrency, 1, 0); + transport->xport_rate_delay = get_mail_conf_time2(name, _XPORT_RATE_DELAY, + var_xport_rate_delay, + 's', 0, 0); + transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, + var_dest_rate_delay, + 's', 0, 0); + + if (transport->rate_delay > 0) + transport->dest_concurrency_limit = 1; + if (transport->dest_concurrency_limit != 0 + && transport->dest_concurrency_limit < transport->init_dest_concurrency) + transport->init_dest_concurrency = transport->dest_concurrency_limit; + + transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST, + var_delivery_slot_cost, 0, 0); + transport->slot_loan = get_mail_conf_int2(name, _DELIVERY_SLOT_LOAN, + var_delivery_slot_loan, 0, 0); + transport->slot_loan_factor = + 100 - get_mail_conf_int2(name, _DELIVERY_SLOT_DISCOUNT, + var_delivery_slot_discount, 0, 100); + transport->min_slots = get_mail_conf_int2(name, _MIN_DELIVERY_SLOTS, + var_min_delivery_slots, 0, 0); + transport->rcpt_unused = get_mail_conf_int2(name, _XPORT_RCPT_LIMIT, + var_xport_rcpt_limit, 0, 0); + transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT, + var_stack_rcpt_limit, 0, 0); + transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT, + var_xport_refill_limit, 1, 0); + transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY, + var_xport_refill_delay, 's', 1, 0); + + transport->queue_byname = htable_create(0); + QMGR_LIST_INIT(transport->queue_list); + transport->job_byname = htable_create(0); + QMGR_LIST_INIT(transport->job_list); + QMGR_LIST_INIT(transport->job_bytime); + transport->job_current = 0; + transport->job_next_unread = 0; + transport->candidate_cache = 0; + transport->candidate_cache_current = 0; + transport->candidate_cache_time = (time_t) 0; + transport->blocker_tag = 1; + transport->dsn = 0; + qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK, + VAR_CONC_POS_FDBACK, var_conc_pos_feedback); + qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK, + VAR_CONC_NEG_FDBACK, var_conc_neg_feedback); + transport->fail_cohort_limit = + get_mail_conf_int2(name, _CONC_COHORT_LIM, + var_conc_cohort_limit, 0, 0); + if (qmgr_transport_byname == 0) + qmgr_transport_byname = htable_create(10); + htable_enter(qmgr_transport_byname, name, (void *) transport); + QMGR_LIST_PREPEND(qmgr_transport_list, transport, peers); + if (msg_verbose) + msg_info("qmgr_transport_create: %s concurrency %d recipients %d", + transport->name, transport->dest_concurrency_limit, + transport->recipient_limit); + return (transport); +} + +/* qmgr_transport_find - find transport instance */ + +QMGR_TRANSPORT *qmgr_transport_find(const char *name) +{ + return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name)); +} -- cgit v1.2.3