diff options
Diffstat (limited to '')
l--------- | src/oqmgr/.indent.pro | 1 | ||||
-rw-r--r-- | src/oqmgr/.printfck | 25 | ||||
-rw-r--r-- | src/oqmgr/Makefile.in | 361 | ||||
-rw-r--r-- | src/oqmgr/qmgr.c | 731 | ||||
-rw-r--r-- | src/oqmgr/qmgr.h | 431 | ||||
-rw-r--r-- | src/oqmgr/qmgr_active.c | 587 | ||||
-rw-r--r-- | src/oqmgr/qmgr_bounce.c | 71 | ||||
-rw-r--r-- | src/oqmgr/qmgr_defer.c | 158 | ||||
-rw-r--r-- | src/oqmgr/qmgr_deliver.c | 455 | ||||
-rw-r--r-- | src/oqmgr/qmgr_enable.c | 107 | ||||
-rw-r--r-- | src/oqmgr/qmgr_entry.c | 391 | ||||
-rw-r--r-- | src/oqmgr/qmgr_error.c | 121 | ||||
-rw-r--r-- | src/oqmgr/qmgr_feedback.c | 177 | ||||
-rw-r--r-- | src/oqmgr/qmgr_message.c | 1445 | ||||
-rw-r--r-- | src/oqmgr/qmgr_move.c | 104 | ||||
-rw-r--r-- | src/oqmgr/qmgr_queue.c | 442 | ||||
-rw-r--r-- | src/oqmgr/qmgr_scan.c | 185 | ||||
-rw-r--r-- | src/oqmgr/qmgr_transport.c | 472 |
18 files changed, 6264 insertions, 0 deletions
diff --git a/src/oqmgr/.indent.pro b/src/oqmgr/.indent.pro new file mode 120000 index 0000000..5c837ec --- /dev/null +++ b/src/oqmgr/.indent.pro @@ -0,0 +1 @@ +../../.indent.pro
\ No newline at end of file diff --git a/src/oqmgr/.printfck b/src/oqmgr/.printfck new file mode 100644 index 0000000..66016ed --- /dev/null +++ b/src/oqmgr/.printfck @@ -0,0 +1,25 @@ +been_here_xt 2 0 +bounce_append 5 0 +cleanup_out_format 1 0 +defer_append 5 0 +mail_command 1 0 +mail_print 1 0 +msg_error 0 0 +msg_fatal 0 0 +msg_info 0 0 +msg_panic 0 0 +msg_warn 0 0 +opened 4 0 +post_mail_fprintf 1 0 +qmgr_message_bounce 2 0 +rec_fprintf 2 0 +sent 4 0 +smtp_cmd 1 0 +smtp_mesg_fail 2 0 +smtp_printf 1 0 +smtp_rcpt_fail 3 0 +smtp_site_fail 2 0 +udp_syslog 1 0 +vstream_fprintf 1 0 +vstream_printf 0 0 +vstring_sprintf 1 0 diff --git a/src/oqmgr/Makefile.in b/src/oqmgr/Makefile.in new file mode 100644 index 0000000..93054ec --- /dev/null +++ b/src/oqmgr/Makefile.in @@ -0,0 +1,361 @@ +SHELL = /bin/sh +SRCS = qmgr.c qmgr_active.c qmgr_transport.c qmgr_queue.c qmgr_entry.c \ + qmgr_message.c qmgr_deliver.c qmgr_move.c \ + qmgr_defer.c qmgr_enable.c qmgr_scan.c qmgr_bounce.c qmgr_error.c \ + qmgr_feedback.c +OBJS = qmgr.o qmgr_active.o qmgr_transport.o qmgr_queue.o qmgr_entry.o \ + qmgr_message.o qmgr_deliver.o qmgr_move.o \ + qmgr_defer.o qmgr_enable.o qmgr_scan.o qmgr_bounce.o qmgr_error.o \ + qmgr_feedback.o +HDRS = qmgr.h +TESTSRC = +DEFS = -I. -I$(INC_DIR) -D$(SYSTYPE) +CFLAGS = $(DEBUG) $(OPT) $(DEFS) +TESTPROG= +PROG = qmgr +INC_DIR = ../../include +LIBS = ../../lib/lib$(LIB_PREFIX)master$(LIB_SUFFIX) \ + ../../lib/lib$(LIB_PREFIX)global$(LIB_SUFFIX) \ + ../../lib/lib$(LIB_PREFIX)util$(LIB_SUFFIX) + +.c.o:; $(CC) $(CFLAGS) -c $*.c + +$(PROG): $(OBJS) $(LIBS) + $(CC) $(CFLAGS) $(SHLIB_RPATH) -o $@ $(OBJS) $(LIBS) $(SYSLIBS) + +$(OBJS): ../../conf/makedefs.out + +Makefile: Makefile.in + cat ../../conf/makedefs.out $? >$@ + +test: $(TESTPROG) + +tests: + +root_tests: + +update: ../../libexec/o$(PROG) + +../../libexec/o$(PROG): $(PROG) + cp $(PROG) ../../libexec/o$(PROG) + +printfck: $(OBJS) $(PROG) + rm -rf printfck + mkdir printfck + cp *.h printfck + sed '1,/^# do not edit/!d' Makefile >printfck/Makefile + set -e; for i in *.c; do printfck -f .printfck $$i >printfck/$$i; done + cd printfck; make "INC_DIR=../../../include" `cd ..; ls *.o` + +lint: + lint $(DEFS) $(SRCS) $(LINTFIX) + +clean: + rm -f *.o *core $(PROG) $(TESTPROG) junk + rm -rf printfck + +tidy: clean + +depend: $(MAKES) + (sed '1,/^# do not edit/!d' Makefile.in; \ + set -e; for i in [a-z][a-z0-9]*.c; do \ + $(CC) -E $(DEFS) $(INCL) $$i | grep -v '[<>]' | sed -n -e '/^# *1 *"\([^"]*\)".*/{' \ + -e 's//'`echo $$i|sed 's/c$$/o/'`': \1/' \ + -e 's/o: \.\//o: /' -e p -e '}' ; \ + done | LANG=C sort -u) | grep -v '[.][o][:][ ][/]' >$$$$ && mv $$$$ Makefile.in + @$(EXPORT) make -f Makefile.in Makefile 1>&2 + +# do not edit below this line - it is generated by 'make depend' +qmgr.o: ../../include/argv.h +qmgr.o: ../../include/attr.h +qmgr.o: ../../include/check_arg.h +qmgr.o: ../../include/dict.h +qmgr.o: ../../include/dsn.h +qmgr.o: ../../include/events.h +qmgr.o: ../../include/flush_clnt.h +qmgr.o: ../../include/htable.h +qmgr.o: ../../include/iostuff.h +qmgr.o: ../../include/mail_conf.h +qmgr.o: ../../include/mail_flow.h +qmgr.o: ../../include/mail_params.h +qmgr.o: ../../include/mail_proto.h +qmgr.o: ../../include/mail_queue.h +qmgr.o: ../../include/mail_server.h +qmgr.o: ../../include/mail_version.h +qmgr.o: ../../include/master_proto.h +qmgr.o: ../../include/msg.h +qmgr.o: ../../include/myflock.h +qmgr.o: ../../include/mymalloc.h +qmgr.o: ../../include/nvtable.h +qmgr.o: ../../include/recipient_list.h +qmgr.o: ../../include/scan_dir.h +qmgr.o: ../../include/sys_defs.h +qmgr.o: ../../include/vbuf.h +qmgr.o: ../../include/vstream.h +qmgr.o: ../../include/vstring.h +qmgr.o: qmgr.c +qmgr.o: qmgr.h +qmgr_active.o: ../../include/abounce.h +qmgr_active.o: ../../include/attr.h +qmgr_active.o: ../../include/bounce.h +qmgr_active.o: ../../include/check_arg.h +qmgr_active.o: ../../include/defer.h +qmgr_active.o: ../../include/deliver_request.h +qmgr_active.o: ../../include/dsn.h +qmgr_active.o: ../../include/dsn_buf.h +qmgr_active.o: ../../include/dsn_mask.h +qmgr_active.o: ../../include/events.h +qmgr_active.o: ../../include/htable.h +qmgr_active.o: ../../include/mail_open_ok.h +qmgr_active.o: ../../include/mail_params.h +qmgr_active.o: ../../include/mail_queue.h +qmgr_active.o: ../../include/msg.h +qmgr_active.o: ../../include/msg_stats.h +qmgr_active.o: ../../include/mymalloc.h +qmgr_active.o: ../../include/nvtable.h +qmgr_active.o: ../../include/qmgr_user.h +qmgr_active.o: ../../include/rec_type.h +qmgr_active.o: ../../include/recipient_list.h +qmgr_active.o: ../../include/scan_dir.h +qmgr_active.o: ../../include/sys_defs.h +qmgr_active.o: ../../include/trace.h +qmgr_active.o: ../../include/vbuf.h +qmgr_active.o: ../../include/vstream.h +qmgr_active.o: ../../include/vstring.h +qmgr_active.o: ../../include/warn_stat.h +qmgr_active.o: qmgr.h +qmgr_active.o: qmgr_active.c +qmgr_bounce.o: ../../include/attr.h +qmgr_bounce.o: ../../include/bounce.h +qmgr_bounce.o: ../../include/check_arg.h +qmgr_bounce.o: ../../include/deliver_completed.h +qmgr_bounce.o: ../../include/deliver_request.h +qmgr_bounce.o: ../../include/dsn.h +qmgr_bounce.o: ../../include/dsn_buf.h +qmgr_bounce.o: ../../include/htable.h +qmgr_bounce.o: ../../include/msg_stats.h +qmgr_bounce.o: ../../include/mymalloc.h +qmgr_bounce.o: ../../include/nvtable.h +qmgr_bounce.o: ../../include/recipient_list.h +qmgr_bounce.o: ../../include/scan_dir.h +qmgr_bounce.o: ../../include/sys_defs.h +qmgr_bounce.o: ../../include/vbuf.h +qmgr_bounce.o: ../../include/vstream.h +qmgr_bounce.o: ../../include/vstring.h +qmgr_bounce.o: qmgr.h +qmgr_bounce.o: qmgr_bounce.c +qmgr_defer.o: ../../include/attr.h +qmgr_defer.o: ../../include/bounce.h +qmgr_defer.o: ../../include/check_arg.h +qmgr_defer.o: ../../include/defer.h +qmgr_defer.o: ../../include/deliver_request.h +qmgr_defer.o: ../../include/dsn.h +qmgr_defer.o: ../../include/dsn_buf.h +qmgr_defer.o: ../../include/htable.h +qmgr_defer.o: ../../include/iostuff.h +qmgr_defer.o: ../../include/mail_proto.h +qmgr_defer.o: ../../include/msg.h +qmgr_defer.o: ../../include/msg_stats.h +qmgr_defer.o: ../../include/mymalloc.h +qmgr_defer.o: ../../include/nvtable.h +qmgr_defer.o: ../../include/recipient_list.h +qmgr_defer.o: ../../include/scan_dir.h +qmgr_defer.o: ../../include/sys_defs.h +qmgr_defer.o: ../../include/vbuf.h +qmgr_defer.o: ../../include/vstream.h +qmgr_defer.o: ../../include/vstring.h +qmgr_defer.o: qmgr.h +qmgr_defer.o: qmgr_defer.c +qmgr_deliver.o: ../../include/attr.h +qmgr_deliver.o: ../../include/check_arg.h +qmgr_deliver.o: ../../include/deliver_request.h +qmgr_deliver.o: ../../include/dsb_scan.h +qmgr_deliver.o: ../../include/dsn.h +qmgr_deliver.o: ../../include/dsn_buf.h +qmgr_deliver.o: ../../include/dsn_util.h +qmgr_deliver.o: ../../include/events.h +qmgr_deliver.o: ../../include/htable.h +qmgr_deliver.o: ../../include/iostuff.h +qmgr_deliver.o: ../../include/mail_params.h +qmgr_deliver.o: ../../include/mail_proto.h +qmgr_deliver.o: ../../include/mail_queue.h +qmgr_deliver.o: ../../include/msg.h +qmgr_deliver.o: ../../include/msg_stats.h +qmgr_deliver.o: ../../include/mymalloc.h +qmgr_deliver.o: ../../include/nvtable.h +qmgr_deliver.o: ../../include/rcpt_print.h +qmgr_deliver.o: ../../include/recipient_list.h +qmgr_deliver.o: ../../include/scan_dir.h +qmgr_deliver.o: ../../include/smtputf8.h +qmgr_deliver.o: ../../include/stringops.h +qmgr_deliver.o: ../../include/sys_defs.h +qmgr_deliver.o: ../../include/vbuf.h +qmgr_deliver.o: ../../include/verp_sender.h +qmgr_deliver.o: ../../include/vstream.h +qmgr_deliver.o: ../../include/vstring.h +qmgr_deliver.o: ../../include/vstring_vstream.h +qmgr_deliver.o: qmgr.h +qmgr_deliver.o: qmgr_deliver.c +qmgr_enable.o: ../../include/check_arg.h +qmgr_enable.o: ../../include/dsn.h +qmgr_enable.o: ../../include/msg.h +qmgr_enable.o: ../../include/recipient_list.h +qmgr_enable.o: ../../include/scan_dir.h +qmgr_enable.o: ../../include/sys_defs.h +qmgr_enable.o: ../../include/vbuf.h +qmgr_enable.o: ../../include/vstream.h +qmgr_enable.o: qmgr.h +qmgr_enable.o: qmgr_enable.c +qmgr_entry.o: ../../include/attr.h +qmgr_entry.o: ../../include/check_arg.h +qmgr_entry.o: ../../include/deliver_request.h +qmgr_entry.o: ../../include/dsn.h +qmgr_entry.o: ../../include/events.h +qmgr_entry.o: ../../include/htable.h +qmgr_entry.o: ../../include/mail_params.h +qmgr_entry.o: ../../include/msg.h +qmgr_entry.o: ../../include/msg_stats.h +qmgr_entry.o: ../../include/mymalloc.h +qmgr_entry.o: ../../include/nvtable.h +qmgr_entry.o: ../../include/recipient_list.h +qmgr_entry.o: ../../include/scan_dir.h +qmgr_entry.o: ../../include/sys_defs.h +qmgr_entry.o: ../../include/vbuf.h +qmgr_entry.o: ../../include/vstream.h +qmgr_entry.o: ../../include/vstring.h +qmgr_entry.o: qmgr.h +qmgr_entry.o: qmgr_entry.c +qmgr_error.o: ../../include/check_arg.h +qmgr_error.o: ../../include/dsn.h +qmgr_error.o: ../../include/mymalloc.h +qmgr_error.o: ../../include/recipient_list.h +qmgr_error.o: ../../include/scan_dir.h +qmgr_error.o: ../../include/stringops.h +qmgr_error.o: ../../include/sys_defs.h +qmgr_error.o: ../../include/vbuf.h +qmgr_error.o: ../../include/vstream.h +qmgr_error.o: ../../include/vstring.h +qmgr_error.o: qmgr.h +qmgr_error.o: qmgr_error.c +qmgr_feedback.o: ../../include/check_arg.h +qmgr_feedback.o: ../../include/dsn.h +qmgr_feedback.o: ../../include/mail_conf.h +qmgr_feedback.o: ../../include/mail_params.h +qmgr_feedback.o: ../../include/msg.h +qmgr_feedback.o: ../../include/mymalloc.h +qmgr_feedback.o: ../../include/name_code.h +qmgr_feedback.o: ../../include/recipient_list.h +qmgr_feedback.o: ../../include/scan_dir.h +qmgr_feedback.o: ../../include/stringops.h +qmgr_feedback.o: ../../include/sys_defs.h +qmgr_feedback.o: ../../include/vbuf.h +qmgr_feedback.o: ../../include/vstream.h +qmgr_feedback.o: ../../include/vstring.h +qmgr_feedback.o: qmgr.h +qmgr_feedback.o: qmgr_feedback.c +qmgr_message.o: ../../include/argv.h +qmgr_message.o: ../../include/attr.h +qmgr_message.o: ../../include/bounce.h +qmgr_message.o: ../../include/canon_addr.h +qmgr_message.o: ../../include/check_arg.h +qmgr_message.o: ../../include/deliver_completed.h +qmgr_message.o: ../../include/deliver_request.h +qmgr_message.o: ../../include/dict.h +qmgr_message.o: ../../include/dsn.h +qmgr_message.o: ../../include/dsn_buf.h +qmgr_message.o: ../../include/dsn_mask.h +qmgr_message.o: ../../include/htable.h +qmgr_message.o: ../../include/iostuff.h +qmgr_message.o: ../../include/mail_params.h +qmgr_message.o: ../../include/mail_proto.h +qmgr_message.o: ../../include/mail_queue.h +qmgr_message.o: ../../include/msg.h +qmgr_message.o: ../../include/msg_stats.h +qmgr_message.o: ../../include/myflock.h +qmgr_message.o: ../../include/mymalloc.h +qmgr_message.o: ../../include/nvtable.h +qmgr_message.o: ../../include/opened.h +qmgr_message.o: ../../include/qmgr_user.h +qmgr_message.o: ../../include/rec_attr_map.h +qmgr_message.o: ../../include/rec_type.h +qmgr_message.o: ../../include/recipient_list.h +qmgr_message.o: ../../include/record.h +qmgr_message.o: ../../include/resolve_clnt.h +qmgr_message.o: ../../include/rewrite_clnt.h +qmgr_message.o: ../../include/scan_dir.h +qmgr_message.o: ../../include/sent.h +qmgr_message.o: ../../include/split_addr.h +qmgr_message.o: ../../include/split_at.h +qmgr_message.o: ../../include/stringops.h +qmgr_message.o: ../../include/sys_defs.h +qmgr_message.o: ../../include/valid_hostname.h +qmgr_message.o: ../../include/vbuf.h +qmgr_message.o: ../../include/verp_sender.h +qmgr_message.o: ../../include/vstream.h +qmgr_message.o: ../../include/vstring.h +qmgr_message.o: qmgr.h +qmgr_message.o: qmgr_message.c +qmgr_move.o: ../../include/check_arg.h +qmgr_move.o: ../../include/dsn.h +qmgr_move.o: ../../include/mail_queue.h +qmgr_move.o: ../../include/mail_scan_dir.h +qmgr_move.o: ../../include/msg.h +qmgr_move.o: ../../include/recipient_list.h +qmgr_move.o: ../../include/scan_dir.h +qmgr_move.o: ../../include/sys_defs.h +qmgr_move.o: ../../include/vbuf.h +qmgr_move.o: ../../include/vstream.h +qmgr_move.o: ../../include/vstring.h +qmgr_move.o: qmgr.h +qmgr_move.o: qmgr_move.c +qmgr_queue.o: ../../include/attr.h +qmgr_queue.o: ../../include/check_arg.h +qmgr_queue.o: ../../include/dsn.h +qmgr_queue.o: ../../include/events.h +qmgr_queue.o: ../../include/htable.h +qmgr_queue.o: ../../include/iostuff.h +qmgr_queue.o: ../../include/mail_params.h +qmgr_queue.o: ../../include/mail_proto.h +qmgr_queue.o: ../../include/msg.h +qmgr_queue.o: ../../include/mymalloc.h +qmgr_queue.o: ../../include/nvtable.h +qmgr_queue.o: ../../include/recipient_list.h +qmgr_queue.o: ../../include/scan_dir.h +qmgr_queue.o: ../../include/sys_defs.h +qmgr_queue.o: ../../include/vbuf.h +qmgr_queue.o: ../../include/vstream.h +qmgr_queue.o: ../../include/vstring.h +qmgr_queue.o: qmgr.h +qmgr_queue.o: qmgr_queue.c +qmgr_scan.o: ../../include/check_arg.h +qmgr_scan.o: ../../include/dsn.h +qmgr_scan.o: ../../include/mail_scan_dir.h +qmgr_scan.o: ../../include/msg.h +qmgr_scan.o: ../../include/mymalloc.h +qmgr_scan.o: ../../include/recipient_list.h +qmgr_scan.o: ../../include/scan_dir.h +qmgr_scan.o: ../../include/sys_defs.h +qmgr_scan.o: ../../include/vbuf.h +qmgr_scan.o: ../../include/vstream.h +qmgr_scan.o: qmgr.h +qmgr_scan.o: qmgr_scan.c +qmgr_transport.o: ../../include/attr.h +qmgr_transport.o: ../../include/check_arg.h +qmgr_transport.o: ../../include/dsn.h +qmgr_transport.o: ../../include/events.h +qmgr_transport.o: ../../include/htable.h +qmgr_transport.o: ../../include/iostuff.h +qmgr_transport.o: ../../include/mail_conf.h +qmgr_transport.o: ../../include/mail_params.h +qmgr_transport.o: ../../include/mail_proto.h +qmgr_transport.o: ../../include/msg.h +qmgr_transport.o: ../../include/mymalloc.h +qmgr_transport.o: ../../include/nvtable.h +qmgr_transport.o: ../../include/recipient_list.h +qmgr_transport.o: ../../include/scan_dir.h +qmgr_transport.o: ../../include/sys_defs.h +qmgr_transport.o: ../../include/vbuf.h +qmgr_transport.o: ../../include/vstream.h +qmgr_transport.o: ../../include/vstring.h +qmgr_transport.o: qmgr.h +qmgr_transport.o: qmgr_transport.c diff --git a/src/oqmgr/qmgr.c b/src/oqmgr/qmgr.c new file mode 100644 index 0000000..2b6b2ad --- /dev/null +++ b/src/oqmgr/qmgr.c @@ -0,0 +1,731 @@ +/*++ +/* NAME +/* qmgr 8 +/* SUMMARY +/* old Postfix queue manager +/* SYNOPSIS +/* \fBqmgr\fR [generic Postfix daemon options] +/* DESCRIPTION +/* The \fBqmgr\fR(8) daemon awaits the arrival of incoming mail +/* and arranges for its delivery via Postfix delivery processes. +/* The actual mail routing strategy is delegated to the +/* \fBtrivial-rewrite\fR(8) daemon. +/* This program expects to be run from the \fBmaster\fR(8) process +/* manager. +/* +/* Mail addressed to the local \fBdouble-bounce\fR address is +/* logged and discarded. This stops potential loops caused by +/* undeliverable bounce notifications. +/* MAIL QUEUES +/* .ad +/* .fi +/* The \fBqmgr\fR(8) daemon maintains the following queues: +/* .IP \fBincoming\fR +/* Inbound mail from the network, or mail picked up by the +/* local \fBpickup\fR(8) agent from the \fBmaildrop\fR directory. +/* .IP \fBactive\fR +/* Messages that the queue manager has opened for delivery. Only +/* a limited number of messages is allowed to enter the \fBactive\fR +/* queue (leaky bucket strategy, for a fixed delivery rate). +/* .IP \fBdeferred\fR +/* Mail that could not be delivered upon the first attempt. The queue +/* manager implements exponential backoff by doubling the time between +/* delivery attempts. +/* .IP \fBcorrupt\fR +/* Unreadable or damaged queue files are moved here for inspection. +/* .IP \fBhold\fR +/* Messages that are kept "on hold" are kept here until someone +/* sets them free. +/* DELIVERY STATUS REPORTS +/* .ad +/* .fi +/* The \fBqmgr\fR(8) daemon keeps an eye on per-message delivery status +/* reports in the following directories. Each status report file has +/* the same name as the corresponding message file: +/* .IP \fBbounce\fR +/* Per-recipient status information about why mail is bounced. +/* These files are maintained by the \fBbounce\fR(8) daemon. +/* .IP \fBdefer\fR +/* Per-recipient status information about why mail is delayed. +/* These files are maintained by the \fBdefer\fR(8) daemon. +/* .IP \fBtrace\fR +/* Per-recipient status information as requested with the +/* Postfix "\fBsendmail -v\fR" or "\fBsendmail -bv\fR" command. +/* These files are maintained by the \fBtrace\fR(8) daemon. +/* .PP +/* The \fBqmgr\fR(8) daemon is responsible for asking the +/* \fBbounce\fR(8), \fBdefer\fR(8) or \fBtrace\fR(8) daemons to +/* send delivery reports. +/* STRATEGIES +/* .ad +/* .fi +/* The queue manager implements a variety of strategies for +/* either opening queue files (input) or for message delivery (output). +/* .IP "\fBleaky bucket\fR" +/* This strategy limits the number of messages in the \fBactive\fR queue +/* and prevents the queue manager from running out of memory under +/* heavy load. +/* .IP \fBfairness\fR +/* When the \fBactive\fR queue has room, the queue manager takes one +/* message from the \fBincoming\fR queue and one from the \fBdeferred\fR +/* queue. This prevents a large mail backlog from blocking the delivery +/* of new mail. +/* .IP "\fBslow start\fR" +/* This strategy eliminates "thundering herd" problems by slowly +/* adjusting the number of parallel deliveries to the same destination. +/* .IP "\fBround robin\fR" +/* The queue manager sorts delivery requests by destination. +/* Round-robin selection prevents one destination from dominating +/* deliveries to other destinations. +/* .IP "\fBexponential backoff\fR" +/* Mail that cannot be delivered upon the first attempt is deferred. +/* The time interval between delivery attempts is doubled after each +/* attempt. +/* .IP "\fBdestination status cache\fR" +/* The queue manager avoids unnecessary delivery attempts by +/* maintaining a short-term, in-memory list of unreachable destinations. +/* TRIGGERS +/* .ad +/* .fi +/* On an idle system, the queue manager waits for the arrival of +/* trigger events, or it waits for a timer to go off. A trigger +/* is a one-byte message. +/* Depending on the message received, the queue manager performs +/* one of the following actions (the message is followed by the +/* symbolic constant used internally by the software): +/* .IP "\fBD (QMGR_REQ_SCAN_DEFERRED)\fR" +/* Start a deferred queue scan. If a deferred queue scan is already +/* in progress, that scan will be restarted as soon as it finishes. +/* .IP "\fBI (QMGR_REQ_SCAN_INCOMING)\fR" +/* Start an incoming queue scan. If an incoming queue scan is already +/* in progress, that scan will be restarted as soon as it finishes. +/* .IP "\fBA (QMGR_REQ_SCAN_ALL)\fR" +/* Ignore deferred queue file time stamps. The request affects +/* the next deferred queue scan. +/* .IP "\fBF (QMGR_REQ_FLUSH_DEAD)\fR" +/* Purge all information about dead transports and destinations. +/* .IP "\fBW (TRIGGER_REQ_WAKEUP)\fR" +/* Wakeup call, This is used by the master server to instantiate +/* servers that should not go away forever. The action is to start +/* an incoming queue scan. +/* .PP +/* The \fBqmgr\fR(8) daemon reads an entire buffer worth of triggers. +/* Multiple identical trigger requests are collapsed into one, and +/* trigger requests are sorted so that \fBA\fR and \fBF\fR precede +/* \fBD\fR and \fBI\fR. Thus, in order to force a deferred queue run, +/* one would request \fBA F D\fR; in order to notify the queue manager +/* of the arrival of new mail one would request \fBI\fR. +/* STANDARDS +/* RFC 3463 (Enhanced status codes) +/* RFC 3464 (Delivery status notifications) +/* SECURITY +/* .ad +/* .fi +/* The \fBqmgr\fR(8) daemon is not security sensitive. It reads +/* single-character messages from untrusted local users, and thus may +/* be susceptible to denial of service attacks. The \fBqmgr\fR(8) daemon +/* does not talk to the outside world, and it can be run at fixed low +/* privilege in a chrooted environment. +/* DIAGNOSTICS +/* Problems and transactions are logged to the \fBsyslogd\fR(8) +/* or \fBpostlogd\fR(8) daemon. +/* Corrupted message files are saved to the \fBcorrupt\fR queue +/* for further inspection. +/* +/* Depending on the setting of the \fBnotify_classes\fR parameter, +/* the postmaster is notified of bounces and of other trouble. +/* BUGS +/* A single queue manager process has to compete for disk access with +/* multiple front-end processes such as \fBcleanup\fR(8). A sudden burst of +/* inbound mail can negatively impact outbound delivery rates. +/* CONFIGURATION PARAMETERS +/* .ad +/* .fi +/* Changes to \fBmain.cf\fR are not picked up automatically, +/* as \fBqmgr\fR(8) +/* is a persistent process. Use the command "\fBpostfix reload\fR" after +/* a configuration change. +/* +/* The text below provides only a parameter summary. See +/* \fBpostconf\fR(5) for more details including examples. +/* +/* In the text below, \fItransport\fR is the first field in a +/* \fBmaster.cf\fR entry. +/* COMPATIBILITY CONTROLS +/* .ad +/* .fi +/* Available before Postfix version 2.5: +/* .IP "\fBallow_min_user (no)\fR" +/* Allow a sender or recipient address to have `-' as the first +/* character. +/* .PP +/* Available with Postfix version 2.7 and later: +/* .IP "\fBdefault_filter_nexthop (empty)\fR" +/* When a content_filter or FILTER request specifies no explicit +/* next-hop destination, use $default_filter_nexthop instead; when +/* that value is empty, use the domain in the recipient address. +/* ACTIVE QUEUE CONTROLS +/* .ad +/* .fi +/* .IP "\fBqmgr_clog_warn_time (300s)\fR" +/* The minimal delay between warnings that a specific destination is +/* clogging up the Postfix active queue. +/* .IP "\fBqmgr_message_active_limit (20000)\fR" +/* The maximal number of messages in the active queue. +/* .IP "\fBqmgr_message_recipient_limit (20000)\fR" +/* The maximal number of recipients held in memory by the Postfix +/* queue manager, and the maximal size of the short-term, +/* in-memory "dead" destination status cache. +/* DELIVERY CONCURRENCY CONTROLS +/* .ad +/* .fi +/* .IP "\fBqmgr_fudge_factor (100)\fR" +/* Obsolete feature: the percentage of delivery resources that a busy +/* mail system will use up for delivery of a large mailing list +/* message. +/* .IP "\fBinitial_destination_concurrency (5)\fR" +/* The initial per-destination concurrency level for parallel delivery +/* to the same destination. +/* .IP "\fBdefault_destination_concurrency_limit (20)\fR" +/* The default maximal number of parallel deliveries to the same +/* destination. +/* .IP "\fBtransport_destination_concurrency_limit ($default_destination_concurrency_limit)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_limit parameter value, where +/* \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .PP +/* Available in Postfix version 2.5 and later: +/* .IP "\fBtransport_initial_destination_concurrency ($initial_destination_concurrency)\fR" +/* A transport-specific override for the initial_destination_concurrency +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .IP "\fBdefault_destination_concurrency_failed_cohort_limit (1)\fR" +/* How many pseudo-cohorts must suffer connection or handshake +/* failure before a specific destination is considered unavailable +/* (and further delivery is suspended). +/* .IP "\fBtransport_destination_concurrency_failed_cohort_limit ($default_destination_concurrency_failed_cohort_limit)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_failed_cohort_limit parameter value, +/* where \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .IP "\fBdefault_destination_concurrency_negative_feedback (1)\fR" +/* The per-destination amount of delivery concurrency negative +/* feedback, after a delivery completes with a connection or handshake +/* failure. +/* .IP "\fBtransport_destination_concurrency_negative_feedback ($default_destination_concurrency_negative_feedback)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_negative_feedback parameter value, +/* where \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .IP "\fBdefault_destination_concurrency_positive_feedback (1)\fR" +/* The per-destination amount of delivery concurrency positive +/* feedback, after a delivery completes without connection or handshake +/* failure. +/* .IP "\fBtransport_destination_concurrency_positive_feedback ($default_destination_concurrency_positive_feedback)\fR" +/* A transport-specific override for the +/* default_destination_concurrency_positive_feedback parameter value, +/* where \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* .IP "\fBdestination_concurrency_feedback_debug (no)\fR" +/* Make the queue manager's feedback algorithm verbose for performance +/* analysis purposes. +/* RECIPIENT SCHEDULING CONTROLS +/* .ad +/* .fi +/* .IP "\fBdefault_destination_recipient_limit (50)\fR" +/* The default maximal number of recipients per message delivery. +/* .IP "\fBtransport_destination_recipient_limit ($default_destination_recipient_limit)\fR" +/* A transport-specific override for the +/* default_destination_recipient_limit parameter value, where +/* \fItransport\fR is the master.cf name of the message delivery +/* transport. +/* OTHER RESOURCE AND RATE CONTROLS +/* .ad +/* .fi +/* .IP "\fBminimal_backoff_time (300s)\fR" +/* The minimal time between attempts to deliver a deferred message; +/* prior to Postfix 2.4 the default value was 1000s. +/* .IP "\fBmaximal_backoff_time (4000s)\fR" +/* The maximal time between attempts to deliver a deferred message. +/* .IP "\fBmaximal_queue_lifetime (5d)\fR" +/* Consider a message as undeliverable, when delivery fails with a +/* temporary error, and the time in the queue has reached the +/* maximal_queue_lifetime limit. +/* .IP "\fBqueue_run_delay (300s)\fR" +/* The time between deferred queue scans by the queue manager; +/* prior to Postfix 2.4 the default value was 1000s. +/* .IP "\fBtransport_retry_time (60s)\fR" +/* The time between attempts by the Postfix queue manager to contact +/* a malfunctioning message delivery transport. +/* .PP +/* Available in Postfix version 2.1 and later: +/* .IP "\fBbounce_queue_lifetime (5d)\fR" +/* Consider a bounce message as undeliverable, when delivery fails +/* with a temporary error, and the time in the queue has reached the +/* bounce_queue_lifetime limit. +/* .PP +/* Available in Postfix version 2.5 and later: +/* .IP "\fBdefault_destination_rate_delay (0s)\fR" +/* The default amount of delay that is inserted between individual +/* message deliveries to the same destination and over the same message +/* delivery transport. +/* .IP "\fBtransport_destination_rate_delay ($default_destination_rate_delay)\fR" +/* A transport-specific override for the default_destination_rate_delay +/* parameter value, where \fItransport\fR is the master.cf name of +/* the message delivery transport. +/* .PP +/* Available in Postfix version 3.1 and later: +/* .IP "\fBdefault_transport_rate_delay (0s)\fR" +/* The default amount of delay that is inserted between individual +/* message deliveries over the same message delivery transport, +/* regardless of destination. +/* .IP "\fBtransport_transport_rate_delay ($default_transport_rate_delay)\fR" +/* A transport-specific override for the default_transport_rate_delay +/* parameter value, where the initial \fItransport\fR in the parameter +/* name is the master.cf name of the message delivery transport. +/* SAFETY CONTROLS +/* .ad +/* .fi +/* .IP "\fBqmgr_daemon_timeout (1000s)\fR" +/* How much time a Postfix queue manager process may take to handle +/* a request before it is terminated by a built-in watchdog timer. +/* .IP "\fBqmgr_ipc_timeout (60s)\fR" +/* The time limit for the queue manager to send or receive information +/* over an internal communication channel. +/* .PP +/* Available in Postfix version 3.1 and later: +/* .IP "\fBaddress_verify_pending_request_limit (see 'postconf -d' output)\fR" +/* A safety limit that prevents address verification requests from +/* overwhelming the Postfix queue. +/* MISCELLANEOUS CONTROLS +/* .ad +/* .fi +/* .IP "\fBconfig_directory (see 'postconf -d' output)\fR" +/* The default location of the Postfix main.cf and master.cf +/* configuration files. +/* .IP "\fBdefer_transports (empty)\fR" +/* The names of message delivery transports that should not deliver mail +/* unless someone issues "\fBsendmail -q\fR" or equivalent. +/* .IP "\fBdelay_logging_resolution_limit (2)\fR" +/* The maximal number of digits after the decimal point when logging +/* sub-second delay values. +/* .IP "\fBhelpful_warnings (yes)\fR" +/* Log warnings about problematic configuration settings, and provide +/* helpful suggestions. +/* .IP "\fBprocess_id (read-only)\fR" +/* The process ID of a Postfix command or daemon process. +/* .IP "\fBprocess_name (read-only)\fR" +/* The process name of a Postfix command or daemon process. +/* .IP "\fBqueue_directory (see 'postconf -d' output)\fR" +/* The location of the Postfix top-level queue directory. +/* .IP "\fBsyslog_facility (mail)\fR" +/* The syslog facility of Postfix logging. +/* .IP "\fBsyslog_name (see 'postconf -d' output)\fR" +/* A prefix that is prepended to the process name in syslog +/* records, so that, for example, "smtpd" becomes "prefix/smtpd". +/* .PP +/* Available in Postfix version 3.0 and later: +/* .IP "\fBconfirm_delay_cleared (no)\fR" +/* After sending a "your message is delayed" notification, inform +/* the sender when the delay clears up. +/* .PP +/* Available in Postfix 3.3 and later: +/* .IP "\fBservice_name (read-only)\fR" +/* The master.cf service name of a Postfix daemon process. +/* FILES +/* /var/spool/postfix/incoming, incoming queue +/* /var/spool/postfix/active, active queue +/* /var/spool/postfix/deferred, deferred queue +/* /var/spool/postfix/bounce, non-delivery status +/* /var/spool/postfix/defer, non-delivery status +/* /var/spool/postfix/trace, delivery status +/* SEE ALSO +/* trivial-rewrite(8), address routing +/* bounce(8), delivery status reports +/* postconf(5), configuration parameters +/* master(5), generic daemon options +/* master(8), process manager +/* postlogd(8), Postfix logging +/* syslogd(8), system logging +/* README FILES +/* .ad +/* .fi +/* Use "\fBpostconf readme_directory\fR" or +/* "\fBpostconf html_directory\fR" to locate this information. +/* .na +/* .nf +/* QSHAPE_README, Postfix queue analysis +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <stdlib.h> +#include <unistd.h> +#include <ctype.h> + +/* Utility library. */ + +#include <msg.h> +#include <events.h> +#include <vstream.h> +#include <dict.h> + +/* Global library. */ + +#include <mail_queue.h> +#include <recipient_list.h> +#include <mail_conf.h> +#include <mail_params.h> +#include <mail_version.h> +#include <mail_proto.h> /* QMGR_SCAN constants */ +#include <mail_flow.h> +#include <flush_clnt.h> + +/* Master process interface */ + +#include <master_proto.h> +#include <mail_server.h> + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * Tunables. + */ +int var_queue_run_delay; +int var_min_backoff_time; +int var_max_backoff_time; +int var_max_queue_time; +int var_dsn_queue_time; +int var_qmgr_active_limit; +int var_qmgr_rcpt_limit; +int var_init_dest_concurrency; +int var_transport_retry_time; +int var_dest_con_limit; +int var_dest_rcpt_limit; +char *var_defer_xports; +int var_qmgr_fudge; +int var_local_rcpt_lim; /* XXX */ +int var_local_con_lim; /* XXX */ +bool var_verp_bounce_off; +int var_qmgr_clog_warn_time; +char *var_conc_pos_feedback; +char *var_conc_neg_feedback; +int var_conc_cohort_limit; +int var_conc_feedback_debug; +int var_xport_rate_delay; +int var_dest_rate_delay; +char *var_def_filter_nexthop; +int var_qmgr_daemon_timeout; +int var_qmgr_ipc_timeout; +int var_dsn_delay_cleared; +int var_vrfy_pend_limit; + +static QMGR_SCAN *qmgr_scans[2]; + +#define QMGR_SCAN_IDX_INCOMING 0 +#define QMGR_SCAN_IDX_DEFERRED 1 +#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0])) + +/* qmgr_deferred_run_event - queue manager heartbeat */ + +static void qmgr_deferred_run_event(int unused_event, void *dummy) +{ + + /* + * This routine runs when it is time for another deferred queue scan. + * Make sure this routine gets called again in the future. + */ + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START); + event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay); +} + +/* qmgr_trigger_event - respond to external trigger(s) */ + +static void qmgr_trigger_event(char *buf, ssize_t len, + char *unused_service, char **argv) +{ + int incoming_flag = 0; + int deferred_flag = 0; + int i; + + /* + * Sanity check. This service takes no command-line arguments. + */ + if (argv[0]) + msg_fatal("unexpected command-line argument: %s", argv[0]); + + /* + * Collapse identical requests that have arrived since we looked last + * time. There is no client feedback so there is no need to process each + * request in order. And as long as we don't have conflicting requests we + * are free to sort them into the most suitable order. + */ +#define QMGR_FLUSH_BEFORE (QMGR_FLUSH_ONCE | QMGR_FLUSH_DFXP) + + for (i = 0; i < len; i++) { + if (msg_verbose) + msg_info("request: %d (%c)", + buf[i], ISALNUM(buf[i]) ? buf[i] : '?'); + switch (buf[i]) { + case TRIGGER_REQ_WAKEUP: + case QMGR_REQ_SCAN_INCOMING: + incoming_flag |= QMGR_SCAN_START; + break; + case QMGR_REQ_SCAN_DEFERRED: + deferred_flag |= QMGR_SCAN_START; + break; + case QMGR_REQ_FLUSH_DEAD: + deferred_flag |= QMGR_FLUSH_BEFORE; + incoming_flag |= QMGR_FLUSH_BEFORE; + break; + case QMGR_REQ_SCAN_ALL: + deferred_flag |= QMGR_SCAN_ALL; + incoming_flag |= QMGR_SCAN_ALL; + break; + default: + if (msg_verbose) + msg_info("request ignored"); + break; + } + } + + /* + * Process each request type at most once. Modifiers take effect upon the + * next queue run. If no queue run is in progress, and a queue scan is + * requested, the request takes effect immediately. + */ + if (incoming_flag != 0) + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag); + if (deferred_flag != 0) + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag); +} + +/* qmgr_loop - queue manager main loop */ + +static int qmgr_loop(char *unused_name, char **unused_argv) +{ + char *path; + ssize_t token_count; + int feed = 0; + int scan_idx; /* Priority order scan index */ + static int first_scan_idx = QMGR_SCAN_IDX_INCOMING; + int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1; + int delay; + + /* + * This routine runs as part of the event handling loop, after the event + * manager has delivered a timer or I/O event (including the completion + * of a connection to a delivery process), or after it has waited for a + * specified amount of time. The result value of qmgr_loop() specifies + * how long the event manager should wait for the next event. + */ +#define DONT_WAIT 0 +#define WAIT_FOR_EVENT (-1) + + /* + * Attempt to drain the active queue by allocating a suitable delivery + * process and by delivering mail via it. Delivery process allocation and + * mail delivery are asynchronous. + */ + qmgr_active_drain(); + + /* + * Let some new blood into the active queue when the queue size is + * smaller than some configurable limit, and when the number of in-core + * recipients does not exceed some configurable limit. + * + * We import one message per interrupt, to optimally tune the input count + * for the number of delivery agent protocol wait states, as explained in + * qmgr_transport.c. + */ + delay = WAIT_FOR_EVENT; + for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit + && qmgr_recipient_count < var_qmgr_rcpt_limit + && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) { + last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT; + if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) { + delay = DONT_WAIT; + if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0) + break; + } + } + + /* + * Round-robin the queue scans. When the active queue becomes full, + * prefer new mail over deferred mail. + */ + if (qmgr_message_count < var_qmgr_active_limit + && qmgr_recipient_count < var_qmgr_rcpt_limit) { + first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT; + } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) { + first_scan_idx = QMGR_SCAN_IDX_INCOMING; + } + + /* + * Global flow control. If enabled, slow down receiving processes that + * get ahead of the queue manager, but don't block them completely. + */ + if (var_in_flow_delay > 0) { + token_count = mail_flow_count(); + if (token_count < var_proc_limit) { + if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING) + mail_flow_put(1); + else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0) + mail_flow_put(var_proc_limit - token_count); + } else if (token_count > var_proc_limit) { + mail_flow_get(token_count - var_proc_limit); + } + } + return (delay); +} + +/* pre_accept - see if tables have changed */ + +static void pre_accept(char *unused_name, char **unused_argv) +{ + const char *table; + + if ((table = dict_changed_name()) != 0) { + msg_info("table %s has changed -- restarting", table); + exit(0); + } +} + +/* qmgr_pre_init - pre-jail initialization */ + +static void qmgr_pre_init(char *unused_name, char **unused_argv) +{ + flush_init(); +} + +/* qmgr_post_init - post-jail initialization */ + +static void qmgr_post_init(char *unused_name, char **unused_argv) +{ + + /* + * Sanity check. + */ + if (var_qmgr_rcpt_limit < var_qmgr_active_limit) { + msg_warn("%s is smaller than %s - adjusting %s", + VAR_QMGR_RCPT_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_RCPT_LIMIT); + var_qmgr_rcpt_limit = var_qmgr_active_limit; + } + if (var_dsn_queue_time > var_max_queue_time) { + msg_warn("%s is larger than %s - adjusting %s", + VAR_DSN_QUEUE_TIME, VAR_MAX_QUEUE_TIME, VAR_DSN_QUEUE_TIME); + var_dsn_queue_time = var_max_queue_time; + } + + /* + * This routine runs after the skeleton code has entered the chroot jail. + * Prevent automatic process suicide after a limited number of client + * requests or after a limited amount of idle time. Move any left-over + * entries from the active queue to the incoming queue, and give them a + * time stamp into the future, in order to allow ongoing deliveries to + * finish first. Start scanning the incoming and deferred queues. + * Left-over active queue entries are moved to the incoming queue because + * the incoming queue has priority; moving left-overs to the deferred + * queue could cause anomalous delays when "postfix reload/start" are + * issued often. Override the IPC timeout (default 3600s) so that the + * queue manager can reset a broken IPC channel before the watchdog timer + * goes off. + */ + var_ipc_timeout = var_qmgr_ipc_timeout; + var_use_limit = 0; + var_idle_limit = 0; + qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time()); + qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING); + qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START); + qmgr_deferred_run_event(0, (void *) 0); +} + +MAIL_VERSION_STAMP_DECLARE; + +/* main - the main program */ + +int main(int argc, char **argv) +{ + static const CONFIG_STR_TABLE str_table[] = { + VAR_DEFER_XPORTS, DEF_DEFER_XPORTS, &var_defer_xports, 0, 0, + VAR_CONC_POS_FDBACK, DEF_CONC_POS_FDBACK, &var_conc_pos_feedback, 1, 0, + VAR_CONC_NEG_FDBACK, DEF_CONC_NEG_FDBACK, &var_conc_neg_feedback, 1, 0, + VAR_DEF_FILTER_NEXTHOP, DEF_DEF_FILTER_NEXTHOP, &var_def_filter_nexthop, 0, 0, + 0, + }; + static const CONFIG_TIME_TABLE time_table[] = { + VAR_QUEUE_RUN_DELAY, DEF_QUEUE_RUN_DELAY, &var_queue_run_delay, 1, 0, + VAR_MIN_BACKOFF_TIME, DEF_MIN_BACKOFF_TIME, &var_min_backoff_time, 1, 0, + VAR_MAX_BACKOFF_TIME, DEF_MAX_BACKOFF_TIME, &var_max_backoff_time, 1, 0, + VAR_MAX_QUEUE_TIME, DEF_MAX_QUEUE_TIME, &var_max_queue_time, 0, 8640000, + VAR_DSN_QUEUE_TIME, DEF_DSN_QUEUE_TIME, &var_dsn_queue_time, 0, 8640000, + VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0, + VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0, + VAR_XPORT_RATE_DELAY, DEF_XPORT_RATE_DELAY, &var_xport_rate_delay, 0, 0, + VAR_DEST_RATE_DELAY, DEF_DEST_RATE_DELAY, &var_dest_rate_delay, 0, 0, + VAR_QMGR_DAEMON_TIMEOUT, DEF_QMGR_DAEMON_TIMEOUT, &var_qmgr_daemon_timeout, 1, 0, + VAR_QMGR_IPC_TIMEOUT, DEF_QMGR_IPC_TIMEOUT, &var_qmgr_ipc_timeout, 1, 0, + 0, + }; + static const CONFIG_INT_TABLE int_table[] = { + VAR_QMGR_ACT_LIMIT, DEF_QMGR_ACT_LIMIT, &var_qmgr_active_limit, 1, 0, + VAR_QMGR_RCPT_LIMIT, DEF_QMGR_RCPT_LIMIT, &var_qmgr_rcpt_limit, 1, 0, + VAR_INIT_DEST_CON, DEF_INIT_DEST_CON, &var_init_dest_concurrency, 1, 0, + VAR_DEST_CON_LIMIT, DEF_DEST_CON_LIMIT, &var_dest_con_limit, 0, 0, + VAR_DEST_RCPT_LIMIT, DEF_DEST_RCPT_LIMIT, &var_dest_rcpt_limit, 0, 0, + VAR_QMGR_FUDGE, DEF_QMGR_FUDGE, &var_qmgr_fudge, 10, 100, + VAR_LOCAL_RCPT_LIMIT, DEF_LOCAL_RCPT_LIMIT, &var_local_rcpt_lim, 0, 0, + VAR_LOCAL_CON_LIMIT, DEF_LOCAL_CON_LIMIT, &var_local_con_lim, 0, 0, + VAR_CONC_COHORT_LIM, DEF_CONC_COHORT_LIM, &var_conc_cohort_limit, 0, 0, + VAR_VRFY_PEND_LIMIT, DEF_VRFY_PEND_LIMIT, &var_vrfy_pend_limit, 1, 0, + 0, + }; + static const CONFIG_BOOL_TABLE bool_table[] = { + VAR_VERP_BOUNCE_OFF, DEF_VERP_BOUNCE_OFF, &var_verp_bounce_off, + VAR_CONC_FDBACK_DEBUG, DEF_CONC_FDBACK_DEBUG, &var_conc_feedback_debug, + VAR_DSN_DELAY_CLEARED, DEF_DSN_DELAY_CLEARED, &var_dsn_delay_cleared, + 0, + }; + + /* + * Fingerprint executables and core dumps. + */ + MAIL_VERSION_STAMP_ALLOCATE; + + /* + * Use the trigger service skeleton, because no-one else should be + * monitoring our service port while this process runs, and because we do + * not talk back to the client. + */ + trigger_server_main(argc, argv, qmgr_trigger_event, + CA_MAIL_SERVER_INT_TABLE(int_table), + CA_MAIL_SERVER_STR_TABLE(str_table), + CA_MAIL_SERVER_BOOL_TABLE(bool_table), + CA_MAIL_SERVER_TIME_TABLE(time_table), + CA_MAIL_SERVER_PRE_INIT(qmgr_pre_init), + CA_MAIL_SERVER_POST_INIT(qmgr_post_init), + CA_MAIL_SERVER_LOOP(qmgr_loop), + CA_MAIL_SERVER_PRE_ACCEPT(pre_accept), + CA_MAIL_SERVER_SOLITARY, + CA_MAIL_SERVER_WATCHDOG(&var_qmgr_daemon_timeout), + 0); +} diff --git a/src/oqmgr/qmgr.h b/src/oqmgr/qmgr.h new file mode 100644 index 0000000..15c941b --- /dev/null +++ b/src/oqmgr/qmgr.h @@ -0,0 +1,431 @@ +/*++ +/* NAME +/* qmgr 3h +/* SUMMARY +/* queue manager data structures +/* SYNOPSIS +/* #include "qmgr.h" +/* DESCRIPTION +/* .nf + + /* + * System library. + */ +#include <sys/time.h> +#include <time.h> + + /* + * Utility library. + */ +#include <vstream.h> +#include <scan_dir.h> + + /* + * Global library. + */ +#include <recipient_list.h> +#include <dsn.h> + + /* + * The queue manager is built around lots of mutually-referring structures. + * These typedefs save some typing. + */ +typedef struct QMGR_TRANSPORT QMGR_TRANSPORT; +typedef struct QMGR_QUEUE QMGR_QUEUE; +typedef struct QMGR_ENTRY QMGR_ENTRY; +typedef struct QMGR_MESSAGE QMGR_MESSAGE; +typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST; +typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST; +typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST; +typedef struct QMGR_SCAN QMGR_SCAN; +typedef struct QMGR_FEEDBACK QMGR_FEEDBACK; + + /* + * Hairy macros to update doubly-linked lists. + */ +#define QMGR_LIST_ROTATE(head, object) { \ + head.next->peers.prev = head.prev; \ + head.prev->peers.next = head.next; \ + head.next = object->peers.next; \ + if (object->peers.next) \ + head.next->peers.prev = 0; \ + head.prev = object; \ + object->peers.next = 0; \ +} + +#define QMGR_LIST_UNLINK(head, type, object) { \ + type next = object->peers.next; \ + type prev = object->peers.prev; \ + if (prev) prev->peers.next = next; \ + else head.next = next; \ + if (next) next->peers.prev = prev; \ + else head.prev = prev; \ + object->peers.next = object->peers.prev = 0; \ +} + +#define QMGR_LIST_APPEND(head, object) { \ + object->peers.next = head.next; \ + object->peers.prev = 0; \ + if (head.next) { \ + head.next->peers.prev = object; \ + } else { \ + head.prev = object; \ + } \ + head.next = object; \ +} + +#define QMGR_LIST_PREPEND(head, object) { \ + object->peers.prev = head.prev; \ + object->peers.next = 0; \ + if (head.prev) { \ + head.prev->peers.next = object; \ + } else { \ + head.next = object; \ + } \ + head.prev = object; \ +} + +#define QMGR_LIST_INIT(head) { \ + head.prev = 0; \ + head.next = 0; \ +} + + /* + * Transports are looked up by name (when we have resolved a message), or + * round-robin wise (when we want to distribute resources fairly). + */ +struct QMGR_TRANSPORT_LIST { + QMGR_TRANSPORT *next; + QMGR_TRANSPORT *prev; +}; + +extern struct HTABLE *qmgr_transport_byname; /* transport by name */ +extern QMGR_TRANSPORT_LIST qmgr_transport_list; /* transports, round robin */ + + /* + * Delivery agents provide feedback, as hints that Postfix should expend + * more or fewer resources on a specific destination domain. The main.cf + * file specifies how feedback affects delivery concurrency: add/subtract a + * constant, a ratio of constants, or a constant divided by the delivery + * concurrency; and it specifies how much feedback must accumulate between + * concurrency updates. + */ +struct QMGR_FEEDBACK { + int hysteresis; /* to pass, need to be this tall */ + double base; /* pre-computed from main.cf */ + int index; /* none, window, sqrt(window) */ +}; + +#define QMGR_FEEDBACK_IDX_NONE 0 /* no window dependence */ +#define QMGR_FEEDBACK_IDX_WIN 1 /* 1/window dependence */ +#if 0 +#define QMGR_FEEDBACK_IDX_SQRT_WIN 2 /* 1/sqrt(window) dependence */ +#endif + +#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN +#include <math.h> +#endif + +extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *); + +#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN +#define QMGR_FEEDBACK_VAL(fb, win) \ + ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win)) +#else +#define QMGR_FEEDBACK_VAL(fb, win) \ + ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \ + (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \ + (fb).base / sqrt(win)) +#endif + + /* + * Each transport (local, smtp-out, bounce) can have one queue per next hop + * name. Queues are looked up by next hop name (when we have resolved a + * message destination), or round-robin wise (when we want to deliver + * messages fairly). + */ +struct QMGR_QUEUE_LIST { + QMGR_QUEUE *next; + QMGR_QUEUE *prev; +}; + +struct QMGR_TRANSPORT { + int flags; /* blocked, etc. */ + int pending; /* incomplete DA connections */ + char *name; /* transport name */ + int dest_concurrency_limit; /* concurrency per domain */ + int init_dest_concurrency; /* init. per-domain concurrency */ + int recipient_limit; /* recipients per transaction */ + struct HTABLE *queue_byname; /* queues indexed by domain */ + QMGR_QUEUE_LIST queue_list; /* queues, round robin order */ + QMGR_TRANSPORT_LIST peers; /* linkage */ + DSN *dsn; /* why unavailable */ + QMGR_FEEDBACK pos_feedback; /* positive feedback control */ + QMGR_FEEDBACK neg_feedback; /* negative feedback control */ + int fail_cohort_limit; /* flow shutdown control */ + int xport_rate_delay; /* suspend per delivery */ + int rate_delay; /* suspend per delivery */ +}; + +#define QMGR_TRANSPORT_STAT_DEAD (1<<1) +#define QMGR_TRANSPORT_STAT_RATE_LOCK (1<<2) + +typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); +extern QMGR_TRANSPORT *qmgr_transport_select(void); +extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY); +extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *); +extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); +extern QMGR_TRANSPORT *qmgr_transport_create(const char *); +extern QMGR_TRANSPORT *qmgr_transport_find(const char *); + +#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) + + /* + * Each next hop (e.g., a domain name) has its own queue of pending message + * transactions. The "todo" queue contains messages that are to be delivered + * to this next hop. When a message is elected for transmission, it is moved + * from the "todo" queue to the "busy" queue. Messages are taken from the + * "todo" queue in sequence. An initial destination delivery concurrency > 1 + * ensures that one problematic message will not block all other traffic to + * that next hop. + */ +struct QMGR_ENTRY_LIST { + QMGR_ENTRY *next; + QMGR_ENTRY *prev; +}; + +struct QMGR_QUEUE { + int dflags; /* delivery request options */ + time_t last_done; /* last delivery completion */ + char *name; /* domain name or address */ + char *nexthop; /* domain name */ + int todo_refcount; /* queue entries (todo list) */ + int busy_refcount; /* queue entries (busy list) */ + int window; /* slow open algorithm */ + double success; /* accumulated positive feedback */ + double failure; /* accumulated negative feedback */ + double fail_cohorts; /* pseudo-cohort failure count */ + QMGR_TRANSPORT *transport; /* transport linkage */ + QMGR_ENTRY_LIST todo; /* todo queue entries */ + QMGR_ENTRY_LIST busy; /* messages on the wire */ + QMGR_QUEUE_LIST peers; /* neighbor queues */ + DSN *dsn; /* why unavailable */ + time_t clog_time_to_warn; /* time of next warning */ +}; + +#define QMGR_QUEUE_TODO 1 /* waiting for service */ +#define QMGR_QUEUE_BUSY 2 /* recipients on the wire */ + +extern int qmgr_queue_count; + +extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *); +extern QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *); +extern void qmgr_queue_done(QMGR_QUEUE *); +extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *); +extern void qmgr_queue_unthrottle(QMGR_QUEUE *); +extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *); +extern void qmgr_queue_suspend(QMGR_QUEUE *, int); + + /* + * Exclusive queue states. Originally there were only two: "throttled" and + * "not throttled". It was natural to encode these in the queue window size. + * After 10 years it's not practical to rip out all the working code and + * change representations, so we just clean up the names a little. + * + * Note: only the "ready" state can reach every state (including itself); + * non-ready states can reach only the "ready" state. Other transitions are + * forbidden, because they would result in dangling event handlers. + */ +#define QMGR_QUEUE_STAT_THROTTLED 0 /* back-off timer */ +#define QMGR_QUEUE_STAT_SUSPENDED -1 /* voluntary delay timer */ +#define QMGR_QUEUE_STAT_SAVED -2 /* delayed cleanup timer */ +#define QMGR_QUEUE_STAT_BAD -3 /* can't happen */ + +#define QMGR_QUEUE_READY(q) ((q)->window > 0) +#define QMGR_QUEUE_THROTTLED(q) ((q)->window == QMGR_QUEUE_STAT_THROTTLED) +#define QMGR_QUEUE_SUSPENDED(q) ((q)->window == QMGR_QUEUE_STAT_SUSPENDED) +#define QMGR_QUEUE_SAVED(q) ((q)->window == QMGR_QUEUE_STAT_SAVED) +#define QMGR_QUEUE_BAD(q) ((q)->window <= QMGR_QUEUE_STAT_BAD) + +#define QMGR_QUEUE_STATUS(q) ( \ + QMGR_QUEUE_READY(q) ? "ready" : \ + QMGR_QUEUE_THROTTLED(q) ? "throttled" : \ + QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \ + QMGR_QUEUE_SAVED(q) ? "saved" : \ + "invalid queue status" \ + ) + + /* + * Structure of one next-hop queue entry. In order to save some copying + * effort we allow multiple recipients per transaction. + */ +struct QMGR_ENTRY { + VSTREAM *stream; /* delivery process */ + QMGR_MESSAGE *message; /* message info */ + RECIPIENT_LIST rcpt_list; /* as many as it takes */ + QMGR_QUEUE *queue; /* parent linkage */ + QMGR_ENTRY_LIST peers; /* neighbor entries */ +}; + +extern QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *); +extern void qmgr_entry_unselect(QMGR_QUEUE *, QMGR_ENTRY *); +extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *); +extern void qmgr_entry_done(QMGR_ENTRY *, int); +extern QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *, QMGR_MESSAGE *); + + /* + * All common in-core information about a message is kept here. When all + * recipients have been tried the message file is linked to the "deferred" + * queue (some hosts not reachable), to the "bounce" queue (some recipients + * were rejected), and is then removed from the "active" queue. + */ +struct QMGR_MESSAGE { + int flags; /* delivery problems */ + int qflags; /* queuing flags */ + int tflags; /* tracing flags */ + long tflags_offset; /* offset for killing */ + int rflags; /* queue file read flags */ + VSTREAM *fp; /* open queue file or null */ + int refcount; /* queue entries */ + int single_rcpt; /* send one rcpt at a time */ + struct timeval arrival_time; /* start of receive transaction */ + time_t create_time; /* queue file create time */ + struct timeval active_time; /* time of entry into active queue */ + long warn_offset; /* warning bounce flag offset */ + time_t warn_time; /* time next warning to be sent */ + long data_offset; /* data seek offset */ + char *queue_name; /* queue name */ + char *queue_id; /* queue file */ + char *encoding; /* content encoding */ + char *sender; /* complete address */ + char *dsn_envid; /* DSN envelope ID */ + int dsn_ret; /* DSN headers/full */ + int smtputf8; /* requires unicode */ + char *verp_delims; /* VERP delimiters */ + char *filter_xport; /* filtering transport */ + char *inspect_xport; /* inspecting transport */ + char *redirect_addr; /* info@spammer.tld */ + long data_size; /* data segment size */ + long cont_length; /* message content length */ + long rcpt_offset; /* more recipients here */ + char *client_name; /* client hostname */ + char *client_addr; /* client address */ + char *client_port; /* client port */ + char *client_proto; /* client protocol */ + char *client_helo; /* helo parameter */ + char *sasl_method; /* SASL method */ + char *sasl_username; /* SASL user name */ + char *sasl_sender; /* SASL sender */ + char *log_ident; /* up-stream queue ID */ + char *rewrite_context; /* address qualification */ + RECIPIENT_LIST rcpt_list; /* complete addresses */ +}; + + /* + * Flags 0-15 are reserved for qmgr_user.h. + */ +#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16) + +#define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1) + +extern int qmgr_message_count; +extern int qmgr_recipient_count; +extern int qmgr_vrfy_pend_count; + +extern void qmgr_message_free(QMGR_MESSAGE *); +extern void qmgr_message_update_warn(QMGR_MESSAGE *); +extern void qmgr_message_kill_record(QMGR_MESSAGE *, long); +extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t); +extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *); + +#define QMGR_MSG_STATS(stats, message) \ + MSG_STATS_INIT2(stats, \ + incoming_arrival, message->arrival_time, \ + active_arrival, message->active_time) + + /* + * qmgr_defer.c + */ +extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *); +extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *); +extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); + + /* + * qmgr_bounce.c + */ +extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); + + /* + * qmgr_deliver.c + */ +extern int qmgr_deliver_concurrency; +extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *); + + /* + * qmgr_active.c + */ +extern int qmgr_active_feed(QMGR_SCAN *, const char *); +extern void qmgr_active_drain(void); +extern void qmgr_active_done(QMGR_MESSAGE *); + + /* + * qmgr_move.c + */ +extern void qmgr_move(const char *, const char *, time_t); + + /* + * qmgr_enable.c + */ +extern void qmgr_enable_all(void); +extern void qmgr_enable_transport(QMGR_TRANSPORT *); +extern void qmgr_enable_queue(QMGR_QUEUE *); + + /* + * Queue scan context. + */ +struct QMGR_SCAN { + char *queue; /* queue name */ + int flags; /* private, this run */ + int nflags; /* private, next run */ + struct SCAN_DIR *handle; /* scan */ +}; + + /* + * Flags that control queue scans or destination selection. These are + * similar to the QMGR_REQ_XXX request codes. + */ +#define QMGR_SCAN_START (1<<0) /* start now/restart when done */ +#define QMGR_SCAN_ALL (1<<1) /* all queue file time stamps */ +#define QMGR_FLUSH_ONCE (1<<2) /* unthrottle once */ +#define QMGR_FLUSH_DFXP (1<<3) /* override defer_transports */ +#define QMGR_FLUSH_EACH (1<<4) /* unthrottle per message */ + + /* + * qmgr_scan.c + */ +extern QMGR_SCAN *qmgr_scan_create(const char *); +extern void qmgr_scan_request(QMGR_SCAN *, int); +extern char *qmgr_scan_next(QMGR_SCAN *); + + /* + * qmgr_error.c + */ +extern QMGR_TRANSPORT *qmgr_error_transport(const char *); +extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *); +extern char *qmgr_error_nexthop(DSN *); + +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ diff --git a/src/oqmgr/qmgr_active.c b/src/oqmgr/qmgr_active.c new file mode 100644 index 0000000..49f5ae7 --- /dev/null +++ b/src/oqmgr/qmgr_active.c @@ -0,0 +1,587 @@ +/*++ +/* NAME +/* qmgr_active 3 +/* SUMMARY +/* active queue management +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_active_feed(scan_info, queue_id) +/* QMGR_SCAN *scan_info; +/* const char *queue_id; +/* +/* void qmgr_active_drain() +/* +/* int qmgr_active_done(message) +/* QMGR_MESSAGE *message; +/* DESCRIPTION +/* These functions maintain the active message queue: the set +/* of messages that the queue manager is actually working on. +/* The active queue is limited in size. Messages are drained +/* from the active queue by allocating a delivery process and +/* by delivering mail via that process. Messages leak into the +/* active queue only when the active queue is small enough. +/* Damaged message files are saved to the "corrupt" directory. +/* +/* qmgr_active_feed() inserts the named message file into +/* the active queue. Message files with the wrong name or +/* with other wrong properties are skipped but not removed. +/* The following queue flags are recognized, other flags being +/* ignored: +/* .IP QMGR_SCAN_ALL +/* Examine all queue files. Normally, deferred queue files with +/* future time stamps are ignored, and incoming queue files with +/* future time stamps are frowned upon. +/* .PP +/* qmgr_active_drain() allocates one delivery process. +/* Process allocation is asynchronous. Once the delivery +/* process is available, an attempt is made to deliver +/* a message via it. Message delivery is asynchronous, too. +/* +/* qmgr_active_done() deals with a message after delivery +/* has been tried for all in-core recipients. If the message +/* was bounced, a bounce message is sent to the sender, or +/* to the Errors-To: address if one was specified. +/* If there are more on-file recipients, a new batch of +/* in-core recipients is read from the queue file. Otherwise, +/* if a delivery agent marked the queue file as corrupt, +/* the queue file is moved to the "corrupt" queue (surprise); +/* if at least one delivery failed, the message is moved +/* to the deferred queue. The time stamps of a deferred queue +/* file are set to the nearest wakeup time of its recipient +/* sites (if delivery failed due to a problem with a next-hop +/* host), are set into the future by the amount of time the +/* message was queued (per-message exponential backoff), or are set +/* into the future by a minimal backoff time, whichever is more. +/* The minimal_backoff_time parameter specifies the minimal +/* amount of time between delivery attempts; maximal_backoff_time +/* specifies an upper limit. +/* DIAGNOSTICS +/* Fatal: queue file access failures, out of memory. +/* Panic: interface violations, internal consistency errors. +/* Warnings: corrupt message file. A corrupt message is saved +/* to the "corrupt" queue for further inspection. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <sys/stat.h> +#include <dirent.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <utime.h> +#include <errno.h> + +#ifndef S_IRWXU /* What? no POSIX system? */ +#define S_IRWXU 0700 +#endif + +/* Utility library. */ + +#include <msg.h> +#include <events.h> +#include <mymalloc.h> +#include <vstream.h> +#include <warn_stat.h> + +/* Global library. */ + +#include <mail_params.h> +#include <mail_open_ok.h> +#include <mail_queue.h> +#include <recipient_list.h> +#include <bounce.h> +#include <defer.h> +#include <trace.h> +#include <abounce.h> +#include <rec_type.h> +#include <qmgr_user.h> + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * A bunch of call-back routines. + */ +static void qmgr_active_done_2_bounce_flush(int, void *); +static void qmgr_active_done_2_generic(QMGR_MESSAGE *); +static void qmgr_active_done_25_trace_flush(int, void *); +static void qmgr_active_done_25_generic(QMGR_MESSAGE *); +static void qmgr_active_done_3_defer_flush(int, void *); +static void qmgr_active_done_3_defer_warn(int, void *); +static void qmgr_active_done_3_generic(QMGR_MESSAGE *); + +/* qmgr_active_corrupt - move corrupted file out of the way */ + +static void qmgr_active_corrupt(const char *queue_id) +{ + const char *myname = "qmgr_active_corrupt"; + + if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) { + if (errno != ENOENT) + msg_fatal("%s: save corrupt file queue %s id %s: %m", + myname, MAIL_QUEUE_ACTIVE, queue_id); + } else { + msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"", + queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT); + } +} + +/* qmgr_active_defer - defer queue file */ + +static void qmgr_active_defer(const char *queue_name, const char *queue_id, + const char *dest_queue, int delay) +{ + const char *myname = "qmgr_active_defer"; + const char *path; + struct utimbuf tbuf; + + if (msg_verbose) + msg_info("wakeup %s after %ld secs", queue_id, (long) delay); + + tbuf.actime = tbuf.modtime = event_time() + delay; + path = mail_queue_path((VSTRING *) 0, queue_name, queue_id); + if (utime(path, &tbuf) < 0 && errno != ENOENT) + msg_fatal("%s: update %s time stamps: %m", myname, path); + if (mail_queue_rename(queue_id, queue_name, dest_queue)) { + if (errno != ENOENT) + msg_fatal("%s: rename %s from %s to %s: %m", myname, + queue_id, queue_name, dest_queue); + msg_warn("%s: rename %s from %s to %s: %m", myname, + queue_id, queue_name, dest_queue); + } else if (msg_verbose) { + msg_info("%s: defer %s", myname, queue_id); + } +} + +/* qmgr_active_feed - feed one message into active queue */ + +int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id) +{ + const char *myname = "qmgr_active_feed"; + QMGR_MESSAGE *message; + struct stat st; + const char *path; + + if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0) + msg_panic("%s: bad queue %s", myname, scan_info->queue); + if (msg_verbose) + msg_info("%s: queue %s", myname, scan_info->queue); + + /* + * Make sure this is something we are willing to open. + */ + if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO) + return (0); + + if (msg_verbose) + msg_info("%s: %s", myname, path); + + /* + * Skip files that have time stamps into the future. They need to cool + * down. Incoming and deferred files can have future time stamps. + */ + if ((scan_info->flags & QMGR_SCAN_ALL) == 0 + && st.st_mtime > time((time_t *) 0) + 1) { + if (msg_verbose) + msg_info("%s: skip %s (%ld seconds)", myname, queue_id, + (long) (st.st_mtime - event_time())); + return (0); + } + + /* + * Move the message to the active queue. File access errors are fatal. + */ + if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) { + if (errno != ENOENT) + msg_fatal("%s: %s: rename from %s to %s: %m", myname, + queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); + msg_warn("%s: %s: rename from %s to %s: %m", myname, + queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); + return (0); + } + + /* + * Extract envelope information: sender and recipients. At this point, + * mail addresses have been processed by the cleanup service so they + * should be in canonical form. Generate requests to deliver this + * message. + * + * Throwing away queue files seems bad, especially when they made it this + * far into the mail system. Therefore we save bad files to a separate + * directory for further inspection. + * + * After queue manager restart it is possible that a queue file is still + * being delivered. In that case (the file is locked), defer delivery by + * a minimal amount of time. + */ +#define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP) + + if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id, + (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? + scan_info->flags | QMGR_FLUSH_AFTER : + scan_info->flags, + (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? + st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE : + 0)) == 0) { + qmgr_active_corrupt(queue_id); + return (0); + } else if (message == QMGR_MESSAGE_LOCKED) { + qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60); + return (0); + } else { + + /* + * Special case if all recipients were already delivered. Send any + * bounces and clean up. + */ + if (message->refcount == 0) + qmgr_active_done(message); + return (1); + } +} + +/* qmgr_active_done - dispose of message after recipients have been tried */ + +void qmgr_active_done(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_active_done"; + struct stat st; + + if (msg_verbose) + msg_info("%s: %s", myname, message->queue_id); + + /* + * During a previous iteration, an attempt to bounce this message may + * have failed, so there may still be a bounce log lying around. XXX By + * groping around in the bounce queue, we're trespassing on the bounce + * service's territory. But doing so is more robust than depending on the + * bounce daemon to do the lookup for us, and for us to do the deleting + * after we have received a successful status from the bounce service. + * The bounce queue directory blocks are most likely in memory anyway. If + * these lookups become a performance problem we will have to build an + * in-core cache into the bounce daemon. + * + * Don't bounce when the bounce log is empty. The bounce process obviously + * failed, and the delivery agent will have requested that the message be + * deferred. + * + * Bounces are sent asynchronously to avoid stalling while the cleanup + * daemon waits for the qmgr to accept the "new mail" trigger. + * + * See also code in cleanup_bounce.c. + */ + if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) { + if (st.st_size == 0) { + if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id)) + msg_fatal("remove %s %s: %m", + MAIL_QUEUE_BOUNCE, message->queue_id); + } else { + if (msg_verbose) + msg_info("%s: bounce %s", myname, message->queue_id); + if (message->verp_delims == 0 || var_verp_bounce_off) + abounce_flush(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_2_bounce_flush, + (void *) message); + else + abounce_flush_verp(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + message->verp_delims, + qmgr_active_done_2_bounce_flush, + (void *) message); + return; + } + } + + /* + * Asynchronous processing does not reach this point. + */ + qmgr_active_done_2_generic(message); +} + +/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */ + +static void qmgr_active_done_2_bounce_flush(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process abounce_flush() status and continue processing. + */ + message->flags |= status; + qmgr_active_done_2_generic(message); +} + +/* qmgr_active_done_2_generic - continue processing */ + +static void qmgr_active_done_2_generic(QMGR_MESSAGE *message) +{ + const char *path; + struct stat st; + + /* + * A delivery agent marks a queue file as corrupt by changing its + * attributes, and by pretending that delivery was deferred. + */ + if (message->flags + && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) { + qmgr_active_corrupt(message->queue_id); + qmgr_message_free(message); + return; + } + + /* + * If we did not read all recipients from this file, go read some more, + * but remember whether some recipients have to be tried again. + * + * Throwing away queue files seems bad, especially when they made it this + * far into the mail system. Therefore we save bad files to a separate + * directory for further inspection by a human being. + */ + if (message->rcpt_offset > 0) { + if (qmgr_message_realloc(message) == 0) { + qmgr_active_corrupt(message->queue_id); + qmgr_message_free(message); + } else { + if (message->refcount == 0) + qmgr_active_done(message); /* recurse for consistency */ + } + return; + } + + /* + * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS + * and others not. Depending on what subset of recipients are delivered, + * a trace file may or may not be created. Even when the last partial + * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may + * still exist from a previous partial delivery attempt. So as long as + * any recipient has NOTIFY=SUCCESS we have to always look for the trace + * file and be prepared for the file not to exist. + * + * See also comments in bounce/bounce_notify_util.c. + */ + if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD + | DEL_REQ_FLAG_REC_DLY_SENT)) + || (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) { + atrace_flush(message->tflags, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_25_trace_flush, + (void *) message); + return; + } + + /* + * Asynchronous processing does not reach this point. + */ + qmgr_active_done_25_generic(message); +} + +/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */ + +static void qmgr_active_done_25_trace_flush(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process atrace_flush() status and continue processing. + */ + if (status == 0 && message->tflags_offset) + qmgr_message_kill_record(message, message->tflags_offset); + message->flags |= status; + qmgr_active_done_25_generic(message); +} + +/* qmgr_active_done_25_generic - continue processing */ + +static void qmgr_active_done_25_generic(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_active_done_25_generic"; + + /* + * If we get to this point we have tried all recipients for this message. + * If the message is too old, try to bounce it. + * + * Bounces are sent asynchronously to avoid stalling while the cleanup + * daemon waits for the qmgr to accept the "new mail" trigger. + */ + if (message->flags) { + if (event_time() >= message->create_time + + (*message->sender ? var_max_queue_time : var_dsn_queue_time)) { + msg_info("%s: from=<%s>, status=expired, returned to sender", + message->queue_id, message->sender); + if (message->verp_delims == 0 || var_verp_bounce_off) + adefer_flush(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_3_defer_flush, + (void *) message); + else + adefer_flush_verp(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + message->verp_delims, + qmgr_active_done_3_defer_flush, + (void *) message); + return; + } else if (message->warn_time > 0 + && event_time() >= message->warn_time - 1) { + if (msg_verbose) + msg_info("%s: sending defer warning for %s", myname, message->queue_id); + adefer_warn(BOUNCE_FLAG_KEEP, + message->queue_name, + message->queue_id, + message->encoding, + message->smtputf8, + message->sender, + message->dsn_envid, + message->dsn_ret, + qmgr_active_done_3_defer_warn, + (void *) message); + return; + } + } + + /* + * Asynchronous processing does not reach this point. + */ + qmgr_active_done_3_generic(message); +} + +/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */ + +static void qmgr_active_done_3_defer_warn(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process adefer_warn() completion status and continue processing. + */ + if (status == 0) + qmgr_message_update_warn(message); + qmgr_active_done_3_generic(message); +} + +/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */ + +static void qmgr_active_done_3_defer_flush(int status, void *context) +{ + QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; + + /* + * Process adefer_flush() status and continue processing. + */ + message->flags = status; + qmgr_active_done_3_generic(message); +} + +/* qmgr_active_done_3_generic - continue processing */ + +static void qmgr_active_done_3_generic(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_active_done_3_generic"; + int delay; + + /* + * Some recipients need to be tried again. Move the queue file time + * stamps into the future by the amount of time that the message is + * delayed, and move the message to the deferred queue. Impose minimal + * and maximal backoff times. + * + * Since we look at actual time in queue, not time since last delivery + * attempt, backoff times will be distributed. However, we can still see + * spikes in delivery activity because the interval between deferred + * queue scans is finite. + */ + if (message->flags) { + if (message->create_time > 0) { + delay = event_time() - message->create_time; + if (delay > var_max_backoff_time) + delay = var_max_backoff_time; + if (delay < var_min_backoff_time) + delay = var_min_backoff_time; + } else { + delay = var_min_backoff_time; + } + qmgr_active_defer(message->queue_name, message->queue_id, + MAIL_QUEUE_DEFERRED, delay); + } + + /* + * All recipients done. Remove the queue file. + */ + else { + if (mail_queue_remove(message->queue_name, message->queue_id)) { + if (errno != ENOENT) + msg_fatal("%s: remove %s from %s: %m", myname, + message->queue_id, message->queue_name); + msg_warn("%s: remove %s from %s: %m", myname, + message->queue_id, message->queue_name); + } else { + /* Same format as logged by postsuper. */ + msg_info("%s: removed", message->queue_id); + } + } + + /* + * Finally, delete the in-core message structure. + */ + qmgr_message_free(message); +} + +/* qmgr_active_drain - drain active queue by allocating a delivery process */ + +void qmgr_active_drain(void) +{ + QMGR_TRANSPORT *transport; + + /* + * Allocate one delivery process for every transport with pending mail. + * The process allocation completes asynchronously. + */ + while ((transport = qmgr_transport_select()) != 0) { + if (msg_verbose) + msg_info("qmgr_active_drain: allocate %s", transport->name); + qmgr_transport_alloc(transport, qmgr_deliver); + } +} diff --git a/src/oqmgr/qmgr_bounce.c b/src/oqmgr/qmgr_bounce.c new file mode 100644 index 0000000..00ba885 --- /dev/null +++ b/src/oqmgr/qmgr_bounce.c @@ -0,0 +1,71 @@ +/*++ +/* NAME +/* qmgr_bounce +/* SUMMARY +/* deal with mail that will not be delivered +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_QUEUE *qmgr_bounce_recipient(message, recipient, dsn) +/* QMGR_MESSAGE *message; +/* RECIPIENT *recipient; +/* DSN *dsn; +/* DESCRIPTION +/* qmgr_bounce_recipient() produces a bounce log record. +/* Once the bounce record is written successfully, the recipient +/* is marked as done. When the bounce record cannot be written, +/* the message structure is updated to reflect that the mail is +/* deferred. +/* +/* Arguments: +/* .IP message +/* Open queue file with the message being bounced. +/* .IP recipient +/* The recipient that will not be delivered. +/* .IP dsn +/* Delivery status information. See dsn(3). +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> + +/* Utility library. */ + +/* Global library. */ + +#include <bounce.h> +#include <deliver_completed.h> + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_bounce_recipient - bounce one message recipient */ + +void qmgr_bounce_recipient(QMGR_MESSAGE *message, RECIPIENT *recipient, + DSN *dsn) +{ + MSG_STATS stats; + int status; + + status = bounce_append(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", dsn); + + if (status == 0) + deliver_completed(message->fp, recipient->offset); + else + message->flags |= status; +} diff --git a/src/oqmgr/qmgr_defer.c b/src/oqmgr/qmgr_defer.c new file mode 100644 index 0000000..dc0319e --- /dev/null +++ b/src/oqmgr/qmgr_defer.c @@ -0,0 +1,158 @@ +/*++ +/* NAME +/* qmgr_defer +/* SUMMARY +/* deal with mail that must be delivered later +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_defer_recipient(message, recipient, dsn) +/* QMGR_MESSAGE *message; +/* RECIPIENT *recipient; +/* DSN *dsn; +/* +/* void qmgr_defer_todo(queue, dsn) +/* QMGR_QUEUE *queue; +/* DSN *dsn; +/* +/* void qmgr_defer_transport(transport, dsn) +/* QMGR_TRANSPORT *transport; +/* DSN *dsn; +/* DESCRIPTION +/* qmgr_defer_recipient() defers delivery of the named message to +/* the named recipient. It updates the message structure and writes +/* a log entry. +/* +/* qmgr_defer_todo() iterates over all "todo" deliveries queued for +/* the named site, and calls qmgr_defer_recipient() for each recipient +/* found. Side effects caused by qmgr_entry_done(), qmgr_queue_done(), +/* and by qmgr_active_done(): in-core queue entries will disappear, +/* in-core queues may disappear, in-core and on-disk messages may +/* disappear, bounces may be sent, new in-core queues, queue entries +/* and recipients may appear. +/* +/* qmgr_defer_transport() calls qmgr_defer_todo() for each queue +/* that depends on the named transport. See there for side effects. +/* +/* Arguments: +/* .IP recipient +/* A recipient address; used for logging purposes, and for updating +/* the message-specific \fIdefer\fR log. +/* .IP queue +/* Specifies a queue with delivery requests for a specific next-hop +/* host (or local user). +/* .IP transport +/* Specifies a message delivery transport. +/* .IP dsn +/* See dsn(3). +/* BUGS +/* The side effects of calling this routine are quite dramatic. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> + +/* Utility library. */ + +#include <msg.h> +#include <vstream.h> + +/* Global library. */ + +#include <mail_proto.h> +#include <defer.h> + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_defer_transport - defer todo entries for named transport */ + +void qmgr_defer_transport(QMGR_TRANSPORT *transport, DSN *dsn) +{ + QMGR_QUEUE *queue; + QMGR_QUEUE *next; + + if (msg_verbose) + msg_info("defer transport %s: %s %s", + transport->name, dsn->status, dsn->reason); + + /* + * Proceed carefully. Queues may disappear as a side effect. + */ + for (queue = transport->queue_list.next; queue; queue = next) { + next = queue->peers.next; + qmgr_defer_todo(queue, dsn); + } +} + +/* qmgr_defer_todo - defer all todo queue entries for specific site */ + +void qmgr_defer_todo(QMGR_QUEUE *queue, DSN *dsn) +{ + QMGR_ENTRY *entry; + QMGR_ENTRY *next; + QMGR_MESSAGE *message; + RECIPIENT *recipient; + int nrcpt; + QMGR_QUEUE *retry_queue; + + /* + * Sanity checks. + */ + if (msg_verbose) + msg_info("defer site %s: %s %s", + queue->name, dsn->status, dsn->reason); + + /* + * See if we can redirect the deliveries to the retry(8) delivery agent, + * so that they can be handled asynchronously. If the retry(8) service is + * unavailable, use the synchronous defer(8) server. With a large todo + * queue, this blocks the queue manager for a significant time. + */ + retry_queue = qmgr_error_queue(MAIL_SERVICE_RETRY, dsn); + + /* + * Proceed carefully. Queue entries may disappear as a side effect. + */ + for (entry = queue->todo.next; entry != 0; entry = next) { + next = entry->peers.next; + if (retry_queue != 0) { + qmgr_entry_move_todo(retry_queue, entry); + continue; + } + message = entry->message; + for (nrcpt = 0; nrcpt < entry->rcpt_list.len; nrcpt++) { + recipient = entry->rcpt_list.info + nrcpt; + qmgr_defer_recipient(message, recipient, dsn); + } + qmgr_entry_done(entry, QMGR_QUEUE_TODO); + } +} + +/* qmgr_defer_recipient - defer delivery of specific recipient */ + +void qmgr_defer_recipient(QMGR_MESSAGE *message, RECIPIENT *recipient, + DSN *dsn) +{ + MSG_STATS stats; + + /* + * Update the message structure and log the message disposition. + */ + message->flags |= defer_append(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", dsn); +} diff --git a/src/oqmgr/qmgr_deliver.c b/src/oqmgr/qmgr_deliver.c new file mode 100644 index 0000000..d8ec7bb --- /dev/null +++ b/src/oqmgr/qmgr_deliver.c @@ -0,0 +1,455 @@ +/*++ +/* NAME +/* qmgr_deliver 3 +/* SUMMARY +/* deliver one per-site queue entry to that site +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_deliver_concurrency; +/* +/* int qmgr_deliver(transport, fp) +/* QMGR_TRANSPORT *transport; +/* VSTREAM *fp; +/* DESCRIPTION +/* This module implements the client side of the `queue manager +/* to delivery agent' protocol. The queue manager uses +/* asynchronous I/O so that it can drive multiple delivery +/* agents in parallel. Depending on the outcome of a delivery +/* attempt, the status of messages, queues and transports is +/* updated. +/* +/* qmgr_deliver_concurrency is a global counter that says how +/* many delivery processes are in use. This can be used, for +/* example, to control the size of the `active' message queue. +/* +/* qmgr_deliver() executes when a delivery process announces its +/* availability for the named transport. It arranges for delivery +/* of a suitable queue entry. The \fIfp\fR argument specifies a +/* stream that is connected to the delivery process, or a null +/* pointer if the transport accepts no connection. Upon completion +/* of delivery (successful or not), the stream is closed, so that the +/* delivery process is released. +/* DIAGNOSTICS +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <time.h> +#include <string.h> + +/* Utility library. */ + +#include <msg.h> +#include <vstring.h> +#include <vstream.h> +#include <vstring_vstream.h> +#include <events.h> +#include <iostuff.h> +#include <stringops.h> +#include <mymalloc.h> + +/* Global library. */ + +#include <mail_queue.h> +#include <mail_proto.h> +#include <recipient_list.h> +#include <mail_params.h> +#include <deliver_request.h> +#include <verp_sender.h> +#include <dsn_util.h> +#include <dsn_buf.h> +#include <dsb_scan.h> +#include <rcpt_print.h> +#include <smtputf8.h> + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * Important note on the _transport_rate_delay implementation: after + * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all + * code paths must directly or indirectly invoke qmgr_transport_unthrottle() + * or qmgr_transport_throttle(). Otherwise, transports with non-zero + * _transport_rate_delay will become stuck. + */ + +int qmgr_deliver_concurrency; + + /* + * Message delivery status codes. + */ +#define DELIVER_STAT_OK 0 /* all recipients delivered */ +#define DELIVER_STAT_DEFER 1 /* try some recipients later */ +#define DELIVER_STAT_CRASH 2 /* mailer internal problem */ + +/* qmgr_deliver_initial_reply - retrieve initial delivery process response */ + +static int qmgr_deliver_initial_reply(VSTREAM *stream) +{ + int stat; + + if (peekfd(vstream_fileno(stream)) < 0) { + msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else if (attr_scan(stream, ATTR_FLAG_STRICT, + RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat), + ATTR_TYPE_END) != 1) { + msg_warn("%s: malformed response", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else { + return (stat ? DELIVER_STAT_DEFER : 0); + } +} + +/* qmgr_deliver_final_reply - retrieve final delivery process response */ + +static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb) +{ + int stat; + + if (peekfd(vstream_fileno(stream)) < 0) { + msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else if (attr_scan(stream, ATTR_FLAG_STRICT, + RECV_ATTR_FUNC(dsb_scan, (void *) dsb), + RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat), + ATTR_TYPE_END) != 2) { + msg_warn("%s: malformed response", VSTREAM_PATH(stream)); + return (DELIVER_STAT_CRASH); + } else { + return (stat ? DELIVER_STAT_DEFER : 0); + } +} + +/* qmgr_deliver_send_request - send delivery request to delivery process */ + +static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream) +{ + RECIPIENT_LIST list = entry->rcpt_list; + RECIPIENT *recipient; + QMGR_MESSAGE *message = entry->message; + VSTRING *sender_buf = 0; + MSG_STATS stats; + char *sender; + int flags; + int smtputf8 = message->smtputf8; + const char *addr; + + /* + * Todo: integrate with code up-stream that builds the delivery request. + */ + for (recipient = list.info; recipient < list.info + list.len; recipient++) + if (var_smtputf8_enable && (addr = recipient->address)[0] + && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) { + smtputf8 |= SMTPUTF8_FLAG_RECIPIENT; + if (message->verp_delims) + smtputf8 |= SMTPUTF8_FLAG_SENDER; + } + + /* + * If variable envelope return path is requested, change prefix+@origin + * into prefix+user=domain@origin. Note that with VERP there is only one + * recipient per delivery. + */ + if (message->verp_delims == 0) { + sender = message->sender; + } else { + sender_buf = vstring_alloc(100); + verp_sender(sender_buf, message->verp_delims, + message->sender, list.info); + sender = vstring_str(sender_buf); + } + + flags = message->tflags + | entry->queue->dflags + | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT); + (void) QMGR_MSG_STATS(&stats, message); + attr_print(stream, ATTR_FLAG_NONE, + SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags), + SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name), + SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id), + SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset), + SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length), + SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop), + SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding), + SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8), + SEND_ATTR_STR(MAIL_ATTR_SENDER, sender), + SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid), + SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret), + SEND_ATTR_FUNC(msg_stats_print, (void *) &stats), + /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ + SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name), + SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr), + SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port), + SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto), + SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo), + /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ + SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method), + SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username), + SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender), + /* XXX Ditto if we want to pass TLS certificate info. */ + SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident), + SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context), + SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len), + ATTR_TYPE_END); + if (sender_buf != 0) + vstring_free(sender_buf); + for (recipient = list.info; recipient < list.info + list.len; recipient++) + attr_print(stream, ATTR_FLAG_NONE, + SEND_ATTR_FUNC(rcpt_print, (void *) recipient), + ATTR_TYPE_END); + if (vstream_fflush(stream) != 0) { + msg_warn("write to process (%s): %m", entry->queue->transport->name); + return (-1); + } else { + if (msg_verbose) + msg_info("qmgr_deliver: site `%s'", entry->queue->name); + return (0); + } +} + +/* qmgr_deliver_abort - transport response watchdog */ + +static void qmgr_deliver_abort(int unused_event, void *context) +{ + QMGR_ENTRY *entry = (QMGR_ENTRY *) context; + QMGR_QUEUE *queue = entry->queue; + QMGR_TRANSPORT *transport = queue->transport; + QMGR_MESSAGE *message = entry->message; + + msg_fatal("%s: timeout receiving delivery status from transport: %s", + message->queue_id, transport->name); +} + +/* qmgr_deliver_update - process delivery status report */ + +static void qmgr_deliver_update(int unused_event, void *context) +{ + QMGR_ENTRY *entry = (QMGR_ENTRY *) context; + QMGR_QUEUE *queue = entry->queue; + QMGR_TRANSPORT *transport = queue->transport; + QMGR_MESSAGE *message = entry->message; + static DSN_BUF *dsb; + int status; + + /* + * Release the delivery agent from a "hot" queue entry. + */ +#define QMGR_DELIVER_RELEASE_AGENT(entry) do { \ + event_disable_readwrite(vstream_fileno(entry->stream)); \ + (void) vstream_fclose(entry->stream); \ + entry->stream = 0; \ + qmgr_deliver_concurrency--; \ + } while (0) + + if (dsb == 0) + dsb = dsb_create(); + + /* + * The message transport has responded. Stop the watchdog timer. + */ + event_cancel_timer(qmgr_deliver_abort, context); + + /* + * Retrieve the delivery agent status report. The numerical status code + * indicates if delivery should be tried again. The reason text is sent + * only when a site should be avoided for a while, so that the queue + * manager can log why it does not even try to schedule delivery to the + * affected recipients. + */ + status = qmgr_deliver_final_reply(entry->stream, dsb); + + /* + * The mail delivery process failed for some reason (although delivery + * may have been successful). Back off with this transport type for a + * while. Dispose of queue entries for this transport that await + * selection (the todo lists). Stay away from queue entries that have + * been selected (the busy lists), or we would have dangling pointers. + * The queue itself won't go away before we dispose of the current queue + * entry. + */ + if (status == DELIVER_STAT_CRASH) { + message->flags |= DELIVER_STAT_DEFER; +#if 0 + whatsup = concatenate("unknown ", transport->name, + " mail transport error", (char *) 0); + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup)); + myfree(whatsup); +#else + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsb->dsn, "4.3.0", + "unknown mail transport error")); +#endif + msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description", + transport->name); + + /* + * Assume the worst and write a defer logfile record for each + * recipient. This omission was already present in the first queue + * manager implementation of 199703, and was fixed 200511. + * + * To avoid the synchronous qmgr_defer_recipient() operation for each + * recipient of this queue entry, release the delivery process and + * move the entry back to the todo queue. Let qmgr_defer_transport() + * log the recipient asynchronously if possible, and get out of here. + * Note: if asynchronous logging is not possible, + * qmgr_defer_transport() eventually invokes qmgr_entry_done() and + * the entry becomes a dangling pointer. + */ + QMGR_DELIVER_RELEASE_AGENT(entry); + qmgr_entry_unselect(queue, entry); + qmgr_defer_transport(transport, &dsb->dsn); + return; + } + + /* + * This message must be tried again. + * + * If we have a problem talking to this site, back off with this site for a + * while; dispose of queue entries for this site that await selection + * (the todo list); stay away from queue entries that have been selected + * (the busy list), or we would have dangling pointers. The queue itself + * won't go away before we dispose of the current queue entry. + * + * XXX Caution: DSN_COPY() will panic on empty status or reason. + */ +#define SUSPENDED "delivery temporarily suspended: " + + if (status == DELIVER_STAT_DEFER) { + message->flags |= DELIVER_STAT_DEFER; + if (VSTRING_LEN(dsb->status)) { + /* Sanitize the DSN status/reason from the delivery agent. */ + if (!dsn_valid(vstring_str(dsb->status))) + vstring_strcpy(dsb->status, "4.0.0"); + if (VSTRING_LEN(dsb->reason) == 0) + vstring_strcpy(dsb->reason, "unknown error"); + vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1); + if (QMGR_QUEUE_READY(queue)) { + qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb)); + if (QMGR_QUEUE_THROTTLED(queue)) + qmgr_defer_todo(queue, &dsb->dsn); + } + } + } + + /* + * No problems detected. Mark the transport and queue as alive. The queue + * itself won't go away before we dispose of the current queue entry. + */ + if (status != DELIVER_STAT_CRASH) { + qmgr_transport_unthrottle(transport); + if (VSTRING_LEN(dsb->reason) == 0) + qmgr_queue_unthrottle(queue); + } + + /* + * Release the delivery process, and give some other queue entry a chance + * to be delivered. When all recipients for a message have been tried, + * decide what to do next with this message: defer, bounce, delete. + */ + QMGR_DELIVER_RELEASE_AGENT(entry); + qmgr_entry_done(entry, QMGR_QUEUE_BUSY); +} + +/* qmgr_deliver - deliver one per-site queue entry */ + +void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream) +{ + QMGR_QUEUE *queue; + QMGR_ENTRY *entry; + DSN dsn; + + /* + * Find out if this delivery process is really available. Once elected, + * the delivery process is supposed to express its happiness. If there is + * a problem, wipe the pending deliveries for this transport. This + * routine runs in response to an external event, so it does not run + * while some other queue manipulation is happening. + */ + if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) { +#if 0 + whatsup = concatenate(transport->name, + " mail transport unavailable", (char *) 0); + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", whatsup)); + myfree(whatsup); +#else + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", + "mail transport unavailable")); +#endif + qmgr_defer_transport(transport, &dsn); + if (stream) + (void) vstream_fclose(stream); + return; + } + + /* + * Find a suitable queue entry. Things may have changed since this + * transport was allocated. If no suitable entry is found, + * unceremoniously disconnect from the delivery process. The delivery + * agent request reading routine is prepared for the queue manager to + * change its mind for no apparent reason. + */ + if ((queue = qmgr_queue_select(transport)) == 0 + || (entry = qmgr_entry_select(queue)) == 0) { + (void) vstream_fclose(stream); + return; + } + + /* + * Send the queue file info and recipient info to the delivery process. + * If there is a problem, wipe the pending deliveries for this transport. + * This routine runs in response to an external event, so it does not run + * while some other queue manipulation is happening. + */ + if (qmgr_deliver_send_request(entry, stream) < 0) { + qmgr_entry_unselect(queue, entry); +#if 0 + whatsup = concatenate(transport->name, + " mail transport unavailable", (char *) 0); + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", whatsup)); + myfree(whatsup); +#else + qmgr_transport_throttle(transport, + DSN_SIMPLE(&dsn, "4.3.0", + "mail transport unavailable")); +#endif + qmgr_defer_transport(transport, &dsn); + /* warning: entry and queue may be dangling pointers here */ + (void) vstream_fclose(stream); + return; + } + + /* + * If we get this far, go wait for the delivery status report. + */ + qmgr_deliver_concurrency++; + entry->stream = stream; + event_enable_read(vstream_fileno(stream), + qmgr_deliver_update, (void *) entry); + + /* + * Guard against broken systems. + */ + event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout); +} diff --git a/src/oqmgr/qmgr_enable.c b/src/oqmgr/qmgr_enable.c new file mode 100644 index 0000000..a35e46e --- /dev/null +++ b/src/oqmgr/qmgr_enable.c @@ -0,0 +1,107 @@ +/*++ +/* NAME +/* qmgr_enable +/* SUMMARY +/* enable dead transports or sites +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_enable_queue(queue) +/* QMGR_QUEUE *queue; +/* +/* QMGR_QUEUE *qmgr_enable_transport(transport) +/* QMGR_TRANSPORT *transport; +/* +/* void qmgr_enable_all(void) +/* DESCRIPTION +/* This module purges dead in-core state information, effectively +/* re-enabling delivery. +/* +/* qmgr_enable_queue() enables deliveries to the named dead site. +/* Empty queues are destroyed. The existed solely to indicate that +/* a site is dead. +/* +/* qmgr_enable_transport() enables deliveries via the specified +/* transport, and calls qmgr_enable_queue() for each destination +/* on that transport. Empty queues are destroyed. +/* +/* qmgr_enable_all() enables all transports and queues. +/* See above for the side effects caused by doing this. +/* BUGS +/* The side effects of calling this module can be quite dramatic. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> + +/* Utility library. */ + +#include <msg.h> +#include <vstream.h> + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_enable_all - enable transports and queues */ + +void qmgr_enable_all(void) +{ + QMGR_TRANSPORT *xport; + + if (msg_verbose) + msg_info("qmgr_enable_all"); + + /* + * The number of transports does not change as a side effect, so this can + * be a straightforward loop. + */ + for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) + qmgr_enable_transport(xport); +} + +/* qmgr_enable_transport - defer todo entries for named transport */ + +void qmgr_enable_transport(QMGR_TRANSPORT *transport) +{ + QMGR_QUEUE *queue; + QMGR_QUEUE *next; + + /* + * Proceed carefully. Queues may disappear as a side effect. + */ + if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) { + if (msg_verbose) + msg_info("enable transport %s", transport->name); + qmgr_transport_unthrottle(transport); + } + for (queue = transport->queue_list.next; queue; queue = next) { + next = queue->peers.next; + qmgr_enable_queue(queue); + } +} + +/* qmgr_enable_queue - enable and possibly delete queue */ + +void qmgr_enable_queue(QMGR_QUEUE *queue) +{ + if (QMGR_QUEUE_THROTTLED(queue)) { + if (msg_verbose) + msg_info("enable site %s/%s", queue->transport->name, queue->name); + qmgr_queue_unthrottle(queue); + } + if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) + qmgr_queue_done(queue); +} diff --git a/src/oqmgr/qmgr_entry.c b/src/oqmgr/qmgr_entry.c new file mode 100644 index 0000000..5a81487 --- /dev/null +++ b/src/oqmgr/qmgr_entry.c @@ -0,0 +1,391 @@ +/*++ +/* NAME +/* qmgr_entry 3 +/* SUMMARY +/* per-site queue entries +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_ENTRY *qmgr_entry_create(queue, message) +/* QMGR_QUEUE *queue; +/* QMGR_MESSAGE *message; +/* +/* void qmgr_entry_done(entry, which) +/* QMGR_ENTRY *entry; +/* int which; +/* +/* QMGR_ENTRY *qmgr_entry_select(queue) +/* QMGR_QUEUE *queue; +/* +/* void qmgr_entry_unselect(queue, entry) +/* QMGR_QUEUE *queue; +/* QMGR_ENTRY *entry; +/* +/* void qmgr_entry_move_todo(dst, entry) +/* QMGR_QUEUE *dst; +/* QMGR_ENTRY *entry; +/* DESCRIPTION +/* These routines add/delete/manipulate per-site message +/* delivery requests. +/* +/* qmgr_entry_create() creates an entry for the named queue and +/* message, and appends the entry to the queue's todo list. +/* Filling in and cleaning up the recipients is the responsibility +/* of the caller. +/* +/* qmgr_entry_done() discards a per-site queue entry. The +/* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry +/* of the site's `busy' list (i.e. queue entries that have been +/* selected for actual delivery), or QMGR_QUEUE_TODO for an entry +/* of the site's `todo' list (i.e. queue entries awaiting selection +/* for actual delivery). +/* +/* qmgr_entry_done() triggers cleanup of the per-site queue when +/* the site has no pending deliveries, and the site is either +/* alive, or the site is dead and the number of in-core queues +/* exceeds a configurable limit (see qmgr_queue_done()). +/* +/* qmgr_entry_done() triggers special action when the last in-core +/* queue entry for a message is done with: either read more +/* recipients from the queue file, delete the queue file, or move +/* the queue file to the deferred queue; send bounce reports to the +/* message originator (see qmgr_active_done()). +/* +/* qmgr_entry_select() selects the next entry from the named +/* per-site queue's `todo' list for actual delivery. The entry is +/* moved to the queue's `busy' list: the list of messages being +/* delivered. +/* +/* qmgr_entry_unselect() takes the named entry off the named +/* per-site queue's `busy' list and moves it to the queue's +/* `todo' list. +/* +/* qmgr_entry_move_todo() moves the specified "todo" queue entry +/* to the specified "todo" queue. +/* DIAGNOSTICS +/* Panic: interface violations, internal inconsistencies. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <stdlib.h> +#include <time.h> + +/* Utility library. */ + +#include <msg.h> +#include <mymalloc.h> +#include <events.h> +#include <vstream.h> + +/* Global library. */ + +#include <mail_params.h> +#include <deliver_request.h> /* opportunistic session caching */ + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_entry_select - select queue entry for delivery */ + +QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *queue) +{ + const char *myname = "qmgr_entry_select"; + QMGR_ENTRY *entry; + + if ((entry = queue->todo.prev) != 0) { + QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry); + queue->todo_refcount--; + QMGR_LIST_APPEND(queue->busy, entry); + queue->busy_refcount++; + + /* + * With opportunistic session caching, the delivery agent must not + * only 1) save a session upon completion, but also 2) reuse a cached + * session upon the next delivery request. In order to not miss out + * on 2), we have to make caching sticky or else we get silly + * behavior when the in-memory queue drains. Specifically, new + * connections must not be made as long as cached connections exist. + * + * Safety: don't enable opportunistic session caching unless the queue + * manager is able to schedule concurrent or back-to-back deliveries + * (we need to recognize back-to-back deliveries for transports with + * concurrency 1). + * + * If caching has previously been enabled, but is not now, fetch any + * existing entries from the cache, but don't add new ones. + */ +#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \ + (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY()) + +#define BACK_TO_BACK_DELIVERY() \ + (queue->last_done + 1 >= event_time()) + + /* + * Turn on session caching after we get up to speed. Don't enable + * session caching just because we have concurrent deliveries. This + * prevents unnecessary session caching when we have a burst of mail + * <= the initial concurrency limit. + */ + if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) { + if (BACK_TO_BACK_DELIVERY()) { + if (msg_verbose) + msg_info("%s: allowing on-demand session caching for %s", + myname, queue->name); + queue->dflags |= DEL_REQ_FLAG_CONN_MASK; + } + } + + /* + * Turn off session caching when concurrency drops and we're running + * out of steam. This is what prevents from turning off session + * caching too early, and from making new connections while old ones + * are still cached. + */ + else { + if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) { + if (msg_verbose) + msg_info("%s: disallowing on-demand session caching for %s", + myname, queue->name); + queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE; + } + } + } + return (entry); +} + +/* qmgr_entry_unselect - unselect queue entry for delivery */ + +void qmgr_entry_unselect(QMGR_QUEUE *queue, QMGR_ENTRY *entry) +{ + QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry); + queue->busy_refcount--; + QMGR_LIST_APPEND(queue->todo, entry); + queue->todo_refcount++; +} + +/* qmgr_entry_move_todo - move entry between todo queues */ + +void qmgr_entry_move_todo(QMGR_QUEUE *dst, QMGR_ENTRY *entry) +{ + const char *myname = "qmgr_entry_move_todo"; + QMGR_MESSAGE *message = entry->message; + QMGR_QUEUE *src = entry->queue; + QMGR_ENTRY *new_entry; + + if (entry->stream != 0) + msg_panic("%s: queue %s entry is busy", myname, src->name); + if (QMGR_QUEUE_THROTTLED(dst)) + msg_panic("%s: destination queue %s is throttled", myname, dst->name); + if (QMGR_TRANSPORT_THROTTLED(dst->transport)) + msg_panic("%s: destination transport %s is throttled", + myname, dst->transport->name); + + /* + * Create new entry, swap the recipients between the old and new entries, + * then dispose of the old entry. This gives us any end-game actions that + * are implemented by qmgr_entry_done(), so we don't have to duplicate + * those actions here. + * + * XXX This does not enforce the per-entry recipient limit, but that is not + * a problem as long as qmgr_entry_move_todo() is called only to bounce + * or defer mail. + */ + new_entry = qmgr_entry_create(dst, message); + recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list); + qmgr_entry_done(entry, QMGR_QUEUE_TODO); +} + +/* qmgr_entry_done - dispose of queue entry */ + +void qmgr_entry_done(QMGR_ENTRY *entry, int which) +{ + const char *myname = "qmgr_entry_done"; + QMGR_QUEUE *queue = entry->queue; + QMGR_MESSAGE *message = entry->message; + QMGR_TRANSPORT *transport = queue->transport; + + /* + * Take this entry off the in-core queue. + */ + if (entry->stream != 0) + msg_panic("%s: file is open", myname); + if (which == QMGR_QUEUE_BUSY) { + QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry); + queue->busy_refcount--; + } else if (which == QMGR_QUEUE_TODO) { + QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry); + queue->todo_refcount--; + } else { + msg_panic("%s: bad queue spec: %d", myname, which); + } + + /* + * Free the recipient list and decrease the in-core recipient count + * accordingly. + */ + qmgr_recipient_count -= entry->rcpt_list.len; + recipient_list_free(&entry->rcpt_list); + + myfree((void *) entry); + + /* + * Maintain back-to-back delivery status. + */ + if (which == QMGR_QUEUE_BUSY) + queue->last_done = event_time(); + + /* + * Suspend a rate-limited queue, so that mail trickles out. + */ + if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) { + if (queue->window > 1) + msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", + myname, transport->name, queue->name, queue->window); + if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue)) + qmgr_queue_suspend(queue, transport->rate_delay); + } + + /* + * When the in-core queue for this site is empty and when this site is + * not dead, discard the in-core queue. When this site is dead, but the + * number of in-core queues exceeds some threshold, get rid of this + * in-core queue anyway, in order to avoid running out of memory. + * + * See also: qmgr_entry_move_todo(). + */ + if (queue->todo.next == 0 && queue->busy.next == 0) { + if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit) + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue)) + qmgr_queue_done(queue); + } + + /* + * Update the in-core message reference count. When the in-core message + * structure has no more references, dispose of the message. + * + * When the in-core recipient count falls below a threshold, and this + * message has more recipients, read more recipients now. If we read more + * recipients as soon as the recipient count falls below the in-core + * recipient limit, we do not give other messages a chance until this + * message is delivered. That's good for mailing list deliveries, bad for + * one-to-one mail. If we wait until the in-core recipient count drops + * well below the in-core recipient limit, we give other mail a chance, + * but we also allow list deliveries to become interleaved. In the worst + * case, people near the start of a mailing list get a burst of postings + * today, while people near the end of the list get that same burst of + * postings a whole day later. + */ +#define FUDGE(x) ((x) * (var_qmgr_fudge / 100.0)) + message->refcount--; + if (message->rcpt_offset > 0 + && qmgr_recipient_count < FUDGE(var_qmgr_rcpt_limit) - 100) + qmgr_message_realloc(message); + if (message->refcount == 0) + qmgr_active_done(message); +} + +/* qmgr_entry_create - create queue todo entry */ + +QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *queue, QMGR_MESSAGE *message) +{ + QMGR_ENTRY *entry; + + /* + * Sanity check. + */ + if (QMGR_QUEUE_THROTTLED(queue)) + msg_panic("qmgr_entry_create: dead queue: %s", queue->name); + + /* + * Create the delivery request. + */ + entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY)); + entry->stream = 0; + entry->message = message; + recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE); + message->refcount++; + entry->queue = queue; + QMGR_LIST_APPEND(queue->todo, entry); + queue->todo_refcount++; + + /* + * Warn if a destination is falling behind while the active queue + * contains a non-trivial amount of single-recipient email. When a + * destination takes up more and more space in the active queue, then + * other mail will not get through and delivery performance will suffer. + * + * XXX At this point in the code, the busy reference count is still less + * than the concurrency limit (otherwise this code would not be invoked + * in the first place) so we have to make make some awkward adjustments + * below. + * + * XXX The queue length test below looks at the active queue share of an + * individual destination. This catches the case where mail for one + * destination is falling behind because it has to round-robin compete + * with many other destinations. However, Postfix will also perform + * poorly when most of the active queue is tied up by a small number of + * concurrency limited destinations. The queue length test below detects + * such conditions only indirectly. + * + * XXX This code does not detect the case that the active queue is being + * starved because incoming mail is pounding the disk. + */ + if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) { + int queue_length = queue->todo_refcount + queue->busy_refcount; + time_t now; + QMGR_TRANSPORT *transport; + double active_share; + + if (queue_length > var_qmgr_active_limit / 5 + && (now = event_time()) >= queue->clog_time_to_warn) { + active_share = queue_length / (double) qmgr_message_count; + msg_warn("mail for %s is using up %d of %d active queue entries", + queue->nexthop, queue_length, qmgr_message_count); + if (active_share < 0.9) + msg_warn("this may slow down other mail deliveries"); + transport = queue->transport; + if (transport->dest_concurrency_limit > 0 + && transport->dest_concurrency_limit <= queue->busy_refcount + 1) + msg_warn("you may need to increase the main.cf %s%s from %d", + transport->name, _DEST_CON_LIMIT, + transport->dest_concurrency_limit); + else if (queue->window > var_qmgr_active_limit * active_share) + msg_warn("you may need to increase the main.cf %s from %d", + VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit); + else if (queue->peers.next != queue->peers.prev) + msg_warn("you may need a separate master.cf transport for %s", + queue->nexthop); + else { + msg_warn("you may need to reduce %s connect and helo timeouts", + transport->name); + msg_warn("so that Postfix quickly skips unavailable hosts"); + msg_warn("you may need to increase the main.cf %s and %s", + VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME); + msg_warn("so that Postfix wastes less time on undeliverable mail"); + msg_warn("you may need to increase the master.cf %s process limit", + transport->name); + } + msg_warn("please avoid flushing the whole queue when you have"); + msg_warn("lots of deferred mail, that is bad for performance"); + msg_warn("to turn off these warnings specify: %s = 0", + VAR_QMGR_CLOG_WARN_TIME); + queue->clog_time_to_warn = now + var_qmgr_clog_warn_time; + } + } + return (entry); +} diff --git a/src/oqmgr/qmgr_error.c b/src/oqmgr/qmgr_error.c new file mode 100644 index 0000000..6541c35 --- /dev/null +++ b/src/oqmgr/qmgr_error.c @@ -0,0 +1,121 @@ +/*++ +/* NAME +/* qmgr_error 3 +/* SUMMARY +/* look up/create error/retry queue +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_TRANSPORT *qmgr_error_transport(service) +/* const char *service; +/* +/* QMGR_QUEUE *qmgr_error_queue(service, dsn) +/* const char *service; +/* DSN *dsn; +/* +/* char *qmgr_error_nexthop(dsn) +/* DSN *dsn; +/* DESCRIPTION +/* qmgr_error_transport() looks up the error transport for the +/* specified service. The result is null if the transport is +/* not available. +/* +/* qmgr_error_queue() looks up an error queue for the specified +/* service and problem. The result is null if the queue is not +/* available. +/* +/* qmgr_error_nexthop() computes the next-hop information for +/* the specified problem. The result must be passed to myfree(). +/* +/* Arguments: +/* .IP dsn +/* See dsn(3). +/* .IP service +/* One of MAIL_SERVICE_ERROR or MAIL_SERVICE_RETRY. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> + +/* Utility library. */ + +#include <mymalloc.h> +#include <stringops.h> + +/* Global library. */ + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_error_transport - look up error transport for specified service */ + +QMGR_TRANSPORT *qmgr_error_transport(const char *service) +{ + QMGR_TRANSPORT *transport; + + /* + * Find or create retry transport. + */ + if ((transport = qmgr_transport_find(service)) == 0) + transport = qmgr_transport_create(service); + if (QMGR_TRANSPORT_THROTTLED(transport)) + return (0); + + /* + * Done. + */ + return (transport); +} + +/* qmgr_error_queue - look up error queue for specified service and problem */ + +QMGR_QUEUE *qmgr_error_queue(const char *service, DSN *dsn) +{ + QMGR_TRANSPORT *transport; + QMGR_QUEUE *queue; + char *nexthop; + + /* + * Find or create transport. + */ + if ((transport = qmgr_error_transport(service)) == 0) + return (0); + + /* + * Find or create queue. + */ + nexthop = qmgr_error_nexthop(dsn); + if ((queue = qmgr_queue_find(transport, nexthop)) == 0) + queue = qmgr_queue_create(transport, nexthop, nexthop); + myfree(nexthop); + if (QMGR_QUEUE_THROTTLED(queue)) + return (0); + + /* + * Done. + */ + return (queue); +} + +/* qmgr_error_nexthop - compute next-hop information from problem description */ + +char *qmgr_error_nexthop(DSN *dsn) +{ + char *nexthop; + + nexthop = concatenate(dsn->status, " ", dsn->reason, (char *) 0); + return (nexthop); +} diff --git a/src/oqmgr/qmgr_feedback.c b/src/oqmgr/qmgr_feedback.c new file mode 100644 index 0000000..f341b95 --- /dev/null +++ b/src/oqmgr/qmgr_feedback.c @@ -0,0 +1,177 @@ +/*++ +/* NAME +/* qmgr_feedback 3 +/* SUMMARY +/* delivery agent feedback management +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_feedback_init(fbck_ctl, name_prefix, name_tail, +/* def_name, def_value) +/* QMGR_FEEDBACK *fbck_ctl; +/* const char *name_prefix; +/* const char *name_tail; +/* const char *def_name; +/* const char *def_value; +/* +/* double QMGR_FEEDBACK_VAL(fbck_ctl, concurrency) +/* QMGR_FEEDBACK *fbck_ctl; +/* const int concurrency; +/* DESCRIPTION +/* Upon completion of a delivery request, a delivery agent +/* provides a hint that the scheduler should dedicate fewer or +/* more resources to a specific destination. +/* +/* qmgr_feedback_init() looks up transport-dependent positive +/* or negative concurrency feedback control information from +/* main.cf, and converts it to internal form. +/* +/* QMGR_FEEDBACK_VAL() computes a concurrency adjustment based +/* on a preprocessed feedback control information and the +/* current concurrency window. This is an "unsafe" macro that +/* evaluates some arguments multiple times. +/* +/* Arguments: +/* .IP fbck_ctl +/* Pointer to QMGR_FEEDBACK structure where the result will +/* be stored. +/* .IP name_prefix +/* Mail delivery transport name, used as the initial portion +/* of a transport-dependent concurrency feedback parameter +/* name. +/* .IP name_tail +/* The second, and fixed, portion of a transport-dependent +/* concurrency feedback parameter. +/* .IP def_name +/* The name of a default feedback parameter. +/* .IP def_val +/* The value of the default feedback parameter. +/* .IP concurrency +/* Delivery concurrency for concurrency-dependent feedback calculation. +/* DIAGNOSTICS +/* Warning: configuration error or unreasonable input. The program +/* uses name_tail feedback instead. +/* Panic: consistency check failure. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <stdlib.h> +#include <limits.h> /* INT_MAX */ +#include <stdio.h> /* sscanf() */ +#include <string.h> + +/* Utility library. */ + +#include <msg.h> +#include <name_code.h> +#include <stringops.h> +#include <mymalloc.h> + +/* Global library. */ + +#include <mail_params.h> +#include <mail_conf.h> + +/* Application-specific. */ + +#include "qmgr.h" + + /* + * Lookup tables for main.cf feedback method names. + */ +const NAME_CODE qmgr_feedback_map[] = { + CONC_FDBACK_NAME_WIN, QMGR_FEEDBACK_IDX_WIN, +#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN + CONC_FDBACK_NAME_SQRT_WIN, QMGR_FEEDBACK_IDX_SQRT_WIN, +#endif + 0, QMGR_FEEDBACK_IDX_NONE, +}; + +/* qmgr_feedback_init - initialize feedback control */ + +void qmgr_feedback_init(QMGR_FEEDBACK *fb, + const char *name_prefix, + const char *name_tail, + const char *def_name, + const char *def_val) +{ + double enum_val; + char denom_str[30 + 1]; + double denom_val; + char slash; + char junk; + char *fbck_name; + char *fbck_val; + + /* + * Look up the transport-dependent feedback value. + */ + fbck_name = concatenate(name_prefix, name_tail, (char *) 0); + fbck_val = get_mail_conf_str(fbck_name, def_val, 1, 0); + + /* + * We allow users to express feedback as 1/8, as a more user-friendly + * alternative to 0.125 (or worse, having users specify the number of + * events in a feedback hysteresis cycle). + * + * We use some sscanf() fu to parse the value into numerator and optional + * "/" followed by denominator. We're doing this only a few times during + * the process life time, so we strive for convenience instead of speed. + */ +#define INCLUSIVE_BOUNDS(val, low, high) ((val) >= (low) && (val) <= (high)) + + fb->hysteresis = 1; /* legacy */ + fb->base = -1; /* assume error */ + + switch (sscanf(fbck_val, "%lf %1[/] %30s%c", + &enum_val, &slash, denom_str, &junk)) { + case 1: + fb->index = QMGR_FEEDBACK_IDX_NONE; + fb->base = enum_val; + break; + case 3: + if ((fb->index = name_code(qmgr_feedback_map, NAME_CODE_FLAG_NONE, + denom_str)) != QMGR_FEEDBACK_IDX_NONE) { + fb->base = enum_val; + } else if (INCLUSIVE_BOUNDS(enum_val, 0, INT_MAX) + && sscanf(denom_str, "%lf%c", &denom_val, &junk) == 1 + && INCLUSIVE_BOUNDS(denom_val, 1.0 / INT_MAX, INT_MAX)) { + fb->base = enum_val / denom_val; + } + break; + } + + /* + * Sanity check. If input is bad, we just warn and use a reasonable + * default. + */ + if (!INCLUSIVE_BOUNDS(fb->base, 0, 1)) { + msg_warn("%s: ignoring malformed or unreasonable feedback: %s", + strcmp(fbck_val, def_val) ? fbck_name : def_name, fbck_val); + fb->index = QMGR_FEEDBACK_IDX_NONE; + fb->base = 1; + } + + /* + * Performance debugging/analysis. + */ + if (var_conc_feedback_debug) + msg_info("%s: %s feedback type %d value at %d: %g", + name_prefix, strcmp(fbck_val, def_val) ? + fbck_name : def_name, fb->index, var_init_dest_concurrency, + QMGR_FEEDBACK_VAL(*fb, var_init_dest_concurrency)); + + myfree(fbck_name); + myfree(fbck_val); +} diff --git a/src/oqmgr/qmgr_message.c b/src/oqmgr/qmgr_message.c new file mode 100644 index 0000000..126d34e --- /dev/null +++ b/src/oqmgr/qmgr_message.c @@ -0,0 +1,1445 @@ +/*++ +/* NAME +/* qmgr_message 3 +/* SUMMARY +/* in-core message structures +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_message_count; +/* int qmgr_recipient_count; +/* int qmgr_vrfy_pend_count; +/* +/* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode) +/* const char *class; +/* const char *name; +/* int qflags; +/* mode_t mode; +/* +/* QMGR_MESSAGE *qmgr_message_realloc(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_free(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_update_warn(message) +/* QMGR_MESSAGE *message; +/* +/* void qmgr_message_kill_record(message, offset) +/* QMGR_MESSAGE *message; +/* long offset; +/* DESCRIPTION +/* This module performs en-gross operations on queue messages. +/* +/* qmgr_message_count is a global counter for the total number +/* of in-core message structures (i.e. the total size of the +/* `active' message queue). +/* +/* qmgr_recipient_count is a global counter for the total number +/* of in-core recipient structures (i.e. the sum of all recipients +/* in all in-core message structures). +/* +/* qmgr_vrfy_pend_count is a global counter for the total +/* number of in-core message structures that are associated +/* with an address verification request. Requests that exceed +/* the address_verify_pending_limit are deferred immediately. +/* This is a backup mechanism for a more refined enforcement +/* mechanism in the verify(8) daemon. +/* +/* qmgr_message_alloc() creates an in-core message structure +/* with sender and recipient information taken from the named queue +/* file. A null result means the queue file could not be read or +/* that the queue file contained incorrect information. A result +/* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number +/* of recipients read from a queue file is limited by the global +/* var_qmgr_rcpt_limit configuration parameter. When the limit +/* is reached, the \fIrcpt_offset\fR structure member is set to +/* the position where the read was terminated. Recipients are +/* run through the resolver, and are assigned to destination +/* queues. Recipients that cannot be assigned are deferred or +/* bounced. Mail that has bounced twice is silently absorbed. +/* A non-zero mode means change the queue file permissions. +/* +/* qmgr_message_realloc() resumes reading recipients from the queue +/* file, and updates the recipient list and \fIrcpt_offset\fR message +/* structure members. A null result means that the file could not be +/* read or that the file contained incorrect information. +/* +/* qmgr_message_free() destroys an in-core message structure and makes +/* the resources available for reuse. It is an error to destroy +/* a message structure that is still referenced by queue entry structures. +/* +/* qmgr_message_update_warn() takes a closed message, opens it, updates +/* the warning field, and closes it again. +/* +/* qmgr_message_kill_record() takes a closed message, opens it, updates +/* the record type at the given offset to "killed", and closes the file. +/* A killed envelope record is ignored. Killed records are not allowed +/* inside the message content. +/* DIAGNOSTICS +/* Warnings: malformed message file. Fatal errors: out of memory. +/* SEE ALSO +/* envelope(3) message envelope parser +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <sys/stat.h> +#include <stdlib.h> +#include <stdio.h> /* sscanf() */ +#include <fcntl.h> +#include <errno.h> +#include <unistd.h> +#include <string.h> +#include <ctype.h> + +/* Utility library. */ + +#include <msg.h> +#include <mymalloc.h> +#include <vstring.h> +#include <vstream.h> +#include <split_at.h> +#include <valid_hostname.h> +#include <argv.h> +#include <stringops.h> +#include <myflock.h> + +/* Global library. */ + +#include <dict.h> +#include <mail_queue.h> +#include <mail_params.h> +#include <canon_addr.h> +#include <record.h> +#include <rec_type.h> +#include <sent.h> +#include <deliver_completed.h> +#include <opened.h> +#include <verp_sender.h> +#include <mail_proto.h> +#include <qmgr_user.h> +#include <split_addr.h> +#include <dsn_mask.h> +#include <rec_attr_map.h> + +/* Client stubs. */ + +#include <rewrite_clnt.h> +#include <resolve_clnt.h> + +/* Application-specific. */ + +#include "qmgr.h" + +int qmgr_message_count; +int qmgr_recipient_count; +int qmgr_vrfy_pend_count; + +/* qmgr_message_create - create in-core message structure */ + +static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, + const char *queue_id, int qflags) +{ + QMGR_MESSAGE *message; + + message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE)); + qmgr_message_count++; + message->flags = 0; + message->qflags = qflags; + message->tflags = 0; + message->tflags_offset = 0; + message->rflags = QMGR_READ_FLAG_DEFAULT; + message->fp = 0; + message->refcount = 0; + message->single_rcpt = 0; + message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0; + message->create_time = 0; + GETTIMEOFDAY(&message->active_time); + message->data_offset = 0; + message->queue_id = mystrdup(queue_id); + message->queue_name = mystrdup(queue_name); + message->encoding = 0; + message->sender = 0; + message->dsn_envid = 0; + message->dsn_ret = 0; + message->smtputf8 = 0; + message->filter_xport = 0; + message->inspect_xport = 0; + message->redirect_addr = 0; + message->data_size = 0; + message->cont_length = 0; + message->warn_offset = 0; + message->warn_time = 0; + message->rcpt_offset = 0; + message->verp_delims = 0; + message->client_name = 0; + message->client_addr = 0; + message->client_port = 0; + message->client_proto = 0; + message->client_helo = 0; + message->sasl_method = 0; + message->sasl_username = 0; + message->sasl_sender = 0; + message->log_ident = 0; + message->rewrite_context = 0; + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); + return (message); +} + +/* qmgr_message_close - close queue file */ + +static void qmgr_message_close(QMGR_MESSAGE *message) +{ + vstream_fclose(message->fp); + message->fp = 0; +} + +/* qmgr_message_open - open queue file */ + +static int qmgr_message_open(QMGR_MESSAGE *message) +{ + + /* + * Sanity check. + */ + if (message->fp) + msg_panic("%s: queue file is open", message->queue_id); + + /* + * Open this queue file. Skip files that we cannot open. Back off when + * the system appears to be running out of resources. + */ + if ((message->fp = mail_queue_open(message->queue_name, + message->queue_id, + O_RDWR, 0)) == 0) { + if (errno != ENOENT) + msg_fatal("open %s %s: %m", message->queue_name, message->queue_id); + msg_warn("open %s %s: %m", message->queue_name, message->queue_id); + return (-1); + } + return (0); +} + +/* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */ + +static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message) +{ + VSTRING *buf; + long orig_offset, extra_offset; + int rec_type; + char *start; + + /* + * Initialize. No early returns or we have a memory leak. + */ + buf = vstring_alloc(100); + if ((orig_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Rewind to the very beginning to make sure we see all records. + */ + if (vstream_fseek(message->fp, 0, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Scan through the old style queue file. Count the total number of + * recipients and find the data/extra sections offsets. Note that the new + * queue files require that data_size equals extra_offset - data_offset, + * so we set data_size to this as well and ignore the size record itself + * completely. + */ + for (;;) { + rec_type = rec_get(message->fp, buf, 0); + if (rec_type <= 0) + /* Report missing end record later. */ + break; + start = vstring_str(buf); + if (msg_verbose > 1) + msg_info("old-style scan record %c %s", rec_type, start); + if (rec_type == REC_TYPE_END) + break; + if (rec_type == REC_TYPE_MESG) { + if (message->data_offset == 0) { + if ((message->data_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + if ((extra_offset = atol(start)) <= message->data_offset) + msg_fatal("bad extra offset %s file %s", + start, VSTREAM_PATH(message->fp)); + if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + message->data_size = extra_offset - message->data_offset; + } + continue; + } + } + + /* + * Clean up. + */ + if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + vstring_free(buf); + + /* + * Sanity checks. Verify that all required information was found, + * including the queue file end marker. + */ + if (message->data_offset == 0 || rec_type != REC_TYPE_END) + msg_fatal("%s: envelope records out of order", message->queue_id); +} + +/* qmgr_message_read - read envelope records */ + +static int qmgr_message_read(QMGR_MESSAGE *message) +{ + VSTRING *buf; + int rec_type; + long curr_offset; + long save_offset = message->rcpt_offset; /* save a flag */ + char *start; + int nrcpt = 0; + const char *error_text; + char *name; + char *value; + char *orig_rcpt = 0; + int count; + int dsn_notify = 0; + char *dsn_orcpt = 0; + int n; + int have_log_client_attr = 0; + + /* + * Initialize. No early returns or we have a memory leak. + */ + buf = vstring_alloc(100); + + /* + * If we re-open this file, skip over on-file recipient records that we + * already looked at, and refill the in-core recipient address list. + */ + if (message->rcpt_offset) { + if (message->rcpt_list.len) + msg_panic("%s: recipient list not empty on recipient reload", + message->queue_id); + if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + message->rcpt_offset = 0; + } + + /* + * Read envelope records. XXX Rely on the front-end programs to enforce + * record size limits. Read up to var_qmgr_rcpt_limit recipients from the + * queue file, to protect against memory exhaustion. Recipient records + * may appear before or after the message content, so we keep reading + * from the queue file until we have enough recipients (rcpt_offset != 0) + * and until we know all the non-recipient information. + * + * When reading recipients from queue file, stop reading when we reach a + * per-message in-core recipient limit rather than a global in-core + * recipient limit. Use the global recipient limit only in order to stop + * opening queue files. The purpose is to achieve equal delay for + * messages with recipient counts up to var_qmgr_rcpt_limit recipients. + * + * If we would read recipients up to a global recipient limit, the average + * number of in-core recipients per message would asymptotically approach + * (global recipient limit)/(active queue size limit), which gives equal + * delay per recipient rather than equal delay per message. + * + * On the first open, we must examine all non-recipient records. + * + * Optimization: when we know that recipient records are not mixed with + * non-recipient records, as is typical with mailing list mail, then we + * can avoid having to examine all the queue file records before we can + * start deliveries. This avoids some file system thrashing with huge + * mailing lists. + */ + for (;;) { + if ((curr_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); + if (curr_offset == message->data_offset && curr_offset > 0) { + if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + curr_offset += message->data_size; + } + rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE); + start = vstring_str(buf); + if (msg_verbose > 1) + msg_info("record %c %s", rec_type, start); + if (rec_type == REC_TYPE_PTR) { + if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR) + break; + /* Need to update curr_offset after pointer jump. */ + continue; + } + if (rec_type <= 0) { + msg_warn("%s: message rejected: missing end record", + message->queue_id); + break; + } + if (rec_type == REC_TYPE_END) { + message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; + break; + } + + /* + * Map named attributes to pseudo record types, so that we don't have + * to pollute the queue file with records that are incompatible with + * past Postfix versions. Preferably, people should be able to back + * out from an upgrade without losing mail. + */ + if (rec_type == REC_TYPE_ATTR) { + if ((error_text = split_nameval(start, &name, &value)) != 0) { + msg_warn("%s: ignoring bad attribute: %s: %.200s", + message->queue_id, error_text, start); + rec_type = REC_TYPE_ERROR; + break; + } + if ((n = rec_attr_map(name)) != 0) { + start = value; + rec_type = n; + } + } + + /* + * Process recipient records. + */ + if (rec_type == REC_TYPE_RCPT) { + /* See also below for code setting orig_rcpt etc. */ +#define FUDGE(x) ((x) * (var_qmgr_fudge / 100.0)) + if (message->rcpt_offset == 0) { + recipient_list_add(&message->rcpt_list, curr_offset, + dsn_orcpt ? dsn_orcpt : "", + dsn_notify ? dsn_notify : 0, + orig_rcpt ? orig_rcpt : "", start); + if (dsn_orcpt) { + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (orig_rcpt) { + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (dsn_notify) + dsn_notify = 0; + if (message->rcpt_list.len >= FUDGE(var_qmgr_rcpt_limit)) { + if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0) + msg_fatal("vstream_ftell %s: %m", + VSTREAM_PATH(message->fp)); + if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) + /* We already examined all non-recipient records. */ + break; + if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER) + /* Examine all remaining non-recipient records. */ + continue; + /* Optimizations for "pure recipient" record sections. */ + if (curr_offset > message->data_offset) { + /* We already examined all non-recipient records. */ + message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; + break; + } + /* Examine non-recipient records in extracted segment. */ + if (vstream_fseek(message->fp, message->data_offset + + message->data_size, SEEK_SET) < 0) + msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); + continue; + } + } + continue; + } + if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) { + if (message->rcpt_offset == 0) { + if (dsn_orcpt) { + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (orig_rcpt) { + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (dsn_notify) + dsn_notify = 0; + } + continue; + } + if (rec_type == REC_TYPE_DSN_ORCPT) { + /* See also above for code clearing dsn_orcpt. */ + if (dsn_orcpt != 0) { + msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>", + message->queue_id, dsn_orcpt); + myfree(dsn_orcpt); + dsn_orcpt = 0; + } + if (message->rcpt_offset == 0) + dsn_orcpt = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_DSN_NOTIFY) { + /* See also above for code clearing dsn_notify. */ + if (dsn_notify != 0) { + msg_warn("%s: ignoring out-of-order DSN notify flags <%d>", + message->queue_id, dsn_notify); + dsn_notify = 0; + } + if (message->rcpt_offset == 0) { + if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n)) + msg_warn("%s: ignoring malformed DSN notify flags <%.200s>", + message->queue_id, start); + else + dsn_notify = n; + continue; + } + } + if (rec_type == REC_TYPE_ORCP) { + /* See also above for code clearing orig_rcpt. */ + if (orig_rcpt != 0) { + msg_warn("%s: ignoring out-of-order original recipient <%.200s>", + message->queue_id, orig_rcpt); + myfree(orig_rcpt); + orig_rcpt = 0; + } + if (message->rcpt_offset == 0) + orig_rcpt = mystrdup(start); + continue; + } + + /* + * Process non-recipient records. + */ + if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) + /* We already examined all non-recipient records. */ + continue; + if (rec_type == REC_TYPE_SIZE) { + if (message->data_offset == 0) { + if ((count = sscanf(start, "%ld %ld %d %d %ld %d", + &message->data_size, &message->data_offset, + &nrcpt, &message->rflags, + &message->cont_length, + &message->smtputf8)) >= 3) { + /* Postfix >= 1.0 (a.k.a. 20010228). */ + if (message->data_offset <= 0 || message->data_size <= 0) { + msg_warn("%s: invalid size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + if (message->rflags & ~QMGR_READ_FLAG_USER) { + msg_warn("%s: invalid flags in size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + } else if (count == 1) { + /* Postfix < 1.0 (a.k.a. 20010228). */ + qmgr_message_oldstyle_scan(message); + } else { + /* Can't happen. */ + msg_warn("%s: message rejected: weird size record", + message->queue_id); + rec_type = REC_TYPE_ERROR; + break; + } + } + /* Postfix < 2.4 compatibility. */ + if (message->cont_length == 0) { + message->cont_length = message->data_size; + } else if (message->cont_length < 0) { + msg_warn("%s: invalid size record: %.100s", + message->queue_id, start); + rec_type = REC_TYPE_ERROR; + break; + } + continue; + } + if (rec_type == REC_TYPE_TIME) { + if (message->arrival_time.tv_sec == 0) + REC_TYPE_TIME_SCAN(start, message->arrival_time); + continue; + } + if (rec_type == REC_TYPE_CTIME) { + if (message->create_time == 0) + message->create_time = atol(start); + continue; + } + if (rec_type == REC_TYPE_FILT) { + if (message->filter_xport != 0) + myfree(message->filter_xport); + message->filter_xport = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_INSP) { + if (message->inspect_xport != 0) + myfree(message->inspect_xport); + message->inspect_xport = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_RDR) { + if (message->redirect_addr != 0) + myfree(message->redirect_addr); + message->redirect_addr = mystrdup(start); + continue; + } + if (rec_type == REC_TYPE_FROM) { + if (message->sender == 0) { + message->sender = mystrdup(start); + opened(message->queue_id, message->sender, + message->cont_length, nrcpt, + "queue %s", message->queue_name); + } + continue; + } + if (rec_type == REC_TYPE_DSN_ENVID) { + /* Allow Milter override. */ + if (message->dsn_envid != 0) + myfree(message->dsn_envid); + message->dsn_envid = mystrdup(start); + } + if (rec_type == REC_TYPE_DSN_RET) { + /* Allow Milter override. */ + if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n)) + msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s", + message->queue_id, start); + else + message->dsn_ret = n; + } + if (rec_type == REC_TYPE_ATTR) { + /* Allow extra segment to override envelope segment info. */ + if (strcmp(name, MAIL_ATTR_ENCODING) == 0) { + if (message->encoding != 0) + myfree(message->encoding); + message->encoding = mystrdup(value); + } + + /* + * Backwards compatibility. Before Postfix 2.3, the logging + * attributes were called client_name, etc. Now they are called + * log_client_name. etc., and client_name is used for the actual + * client information. To support old queue files, we accept both + * names for the purpose of logging; the new name overrides the + * old one. + * + * XXX Do not use the "legacy" client_name etc. attribute values for + * initializing the logging attributes, when this file already + * contains the "modern" log_client_name etc. logging attributes. + * Otherwise, logging attributes that are not present in the + * queue file would be set with information from the real client. + */ + else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_name == 0) + message->client_name = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) { + if (have_log_client_attr == 0 && message->client_addr == 0) + message->client_addr = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) { + if (have_log_client_attr == 0 && message->client_port == 0) + message->client_port = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_proto == 0) + message->client_proto = mystrdup(value); + } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) { + if (have_log_client_attr == 0 && message->client_helo == 0) + message->client_helo = mystrdup(value); + } + /* Original client attributes. */ + else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) { + if (message->client_name != 0) + myfree(message->client_name); + message->client_name = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) { + if (message->client_addr != 0) + myfree(message->client_addr); + message->client_addr = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) { + if (message->client_port != 0) + myfree(message->client_port); + message->client_port = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) { + if (message->client_proto != 0) + myfree(message->client_proto); + message->client_proto = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) { + if (message->client_helo != 0) + myfree(message->client_helo); + message->client_helo = mystrdup(value); + have_log_client_attr = 1; + } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) { + if (message->sasl_method == 0) + message->sasl_method = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_METHOD, value); + } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) { + if (message->sasl_username == 0) + message->sasl_username = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_USERNAME, value); + } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) { + if (message->sasl_sender == 0) + message->sasl_sender = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_SASL_SENDER, value); + } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) { + if (message->log_ident == 0) + message->log_ident = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_LOG_IDENT, value); + } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) { + if (message->rewrite_context == 0) + message->rewrite_context = mystrdup(value); + else + msg_warn("%s: ignoring multiple %s attribute: %s", + message->queue_id, MAIL_ATTR_RWR_CONTEXT, value); + } + + /* + * Optional tracing flags (verify, sendmail -v, sendmail -bv). + * This record is killed after a trace logfile report is sent and + * after the logfile is deleted. + */ + else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) { + if (message->tflags == 0) { + message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value)); + if (message->tflags == DEL_REQ_FLAG_RECORD) + message->tflags_offset = curr_offset; + else + message->tflags_offset = 0; + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) + qmgr_vrfy_pend_count++; + } + } + continue; + } + if (rec_type == REC_TYPE_WARN) { + if (message->warn_offset == 0) { + message->warn_offset = curr_offset; + REC_TYPE_WARN_SCAN(start, message->warn_time); + } + continue; + } + if (rec_type == REC_TYPE_VERP) { + if (message->verp_delims == 0) { + if (message->sender == 0 || message->sender[0] == 0) { + msg_warn("%s: ignoring VERP request for null sender", + message->queue_id); + } else if (verp_delims_verify(start) != 0) { + msg_warn("%s: ignoring bad VERP request: \"%.100s\"", + message->queue_id, start); + } else { + if (msg_verbose) + msg_info("%s: enabling VERP for sender \"%.100s\"", + message->queue_id, message->sender); + message->single_rcpt = 1; + message->verp_delims = mystrdup(start); + } + } + continue; + } + } + + /* + * Grr. + */ + if (dsn_orcpt != 0) { + if (rec_type > 0) + msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>", + message->queue_id, dsn_orcpt); + myfree(dsn_orcpt); + } + if (orig_rcpt != 0) { + if (rec_type > 0) + msg_warn("%s: ignoring out-of-order original recipient <%.200s>", + message->queue_id, orig_rcpt); + myfree(orig_rcpt); + } + + /* + * After sending a "delayed" warning, request sender notification when + * message delivery is completed. While "mail delayed" notifications are + * bad enough because they multiply the amount of email traffic, "delay + * cleared" notifications are even worse because they come in a sudden + * burst when the queue drains after a network outage. + */ + if (var_dsn_delay_cleared && message->warn_time < 0) + message->tflags |= DEL_REQ_FLAG_REC_DLY_SENT; + + /* + * Avoid clumsiness elsewhere in the program. When sending data across an + * IPC channel, sending an empty string is more convenient than sending a + * null pointer. + */ + if (message->dsn_envid == 0) + message->dsn_envid = mystrdup(""); + if (message->encoding == 0) + message->encoding = mystrdup(MAIL_ATTR_ENC_NONE); + if (message->client_name == 0) + message->client_name = mystrdup(""); + if (message->client_addr == 0) + message->client_addr = mystrdup(""); + if (message->client_port == 0) + message->client_port = mystrdup(""); + if (message->client_proto == 0) + message->client_proto = mystrdup(""); + if (message->client_helo == 0) + message->client_helo = mystrdup(""); + if (message->sasl_method == 0) + message->sasl_method = mystrdup(""); + if (message->sasl_username == 0) + message->sasl_username = mystrdup(""); + if (message->sasl_sender == 0) + message->sasl_sender = mystrdup(""); + if (message->log_ident == 0) + message->log_ident = mystrdup(""); + if (message->rewrite_context == 0) + message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL); + /* Postfix < 2.3 compatibility. */ + if (message->create_time == 0) + message->create_time = message->arrival_time.tv_sec; + + /* + * Clean up. + */ + vstring_free(buf); + + /* + * Sanity checks. Verify that all required information was found, + * including the queue file end marker. + */ + if (rec_type <= 0) { + /* Already logged warning. */ + } else if (message->arrival_time.tv_sec == 0) { + msg_warn("%s: message rejected: missing arrival time record", + message->queue_id); + } else if (message->sender == 0) { + msg_warn("%s: message rejected: missing sender record", + message->queue_id); + } else if (message->data_offset == 0) { + msg_warn("%s: message rejected: missing size record", + message->queue_id); + } else { + return (0); + } + message->rcpt_offset = save_offset; /* restore flag */ + return (-1); +} + +/* qmgr_message_update_warn - update the time of next delay warning */ + +void qmgr_message_update_warn(QMGR_MESSAGE *message) +{ + + /* + * After the "mail delayed" warning, optionally send a "delay cleared" + * notification. + */ + if (qmgr_message_open(message) + || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0 + || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT, + REC_TYPE_WARN_ARG(-1)) < 0 + || vstream_fflush(message->fp)) + msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); + qmgr_message_close(message); +} + +/* qmgr_message_kill_record - mark one message record as killed */ + +void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset) +{ + if (offset <= 0) + msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset); + if (qmgr_message_open(message) + || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0 + || vstream_fflush(message->fp)) + msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); + qmgr_message_close(message); +} + +/* qmgr_message_sort_compare - compare recipient information */ + +static int qmgr_message_sort_compare(const void *p1, const void *p2) +{ + RECIPIENT *rcpt1 = (RECIPIENT *) p1; + RECIPIENT *rcpt2 = (RECIPIENT *) p2; + QMGR_QUEUE *queue1; + QMGR_QUEUE *queue2; + char *at1; + char *at2; + int result; + + /* + * Compare most significant to least significant recipient attributes. + * The comparison function must be transitive, so NULL values need to be + * assigned an ordinal (we set NULL last). + */ + + queue1 = rcpt1->u.queue; + queue2 = rcpt2->u.queue; + if (queue1 != 0 && queue2 == 0) + return (-1); + if (queue1 == 0 && queue2 != 0) + return (1); + if (queue1 != 0 && queue2 != 0) { + + /* + * Compare message transport. + */ + if ((result = strcmp(queue1->transport->name, + queue2->transport->name)) != 0) + return (result); + + /* + * Compare queue name (nexthop or recipient@nexthop). + */ + if ((result = strcmp(queue1->name, queue2->name)) != 0) + return (result); + } + + /* + * Compare recipient domain. + */ + at1 = strrchr(rcpt1->address, '@'); + at2 = strrchr(rcpt2->address, '@'); + if (at1 == 0 && at2 != 0) + return (1); + if (at1 != 0 && at2 == 0) + return (-1); + if (at1 != 0 && at2 != 0 + && (result = strcasecmp_utf8(at1, at2)) != 0) + return (result); + + /* + * Compare recipient address. + */ + return (strcasecmp_utf8(rcpt1->address, rcpt2->address)); +} + +/* qmgr_message_sort - sort message recipient addresses by domain */ + +static void qmgr_message_sort(QMGR_MESSAGE *message) +{ + qsort((void *) message->rcpt_list.info, message->rcpt_list.len, + sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare); + if (msg_verbose) { + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *rcpt; + + msg_info("start sorted recipient list"); + for (rcpt = list.info; rcpt < list.info + list.len; rcpt++) + msg_info("qmgr_message_sort: %s", rcpt->address); + msg_info("end sorted recipient list"); + } +} + +/* qmgr_resolve_one - resolve or skip one recipient */ + +static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient, + const char *addr, RESOLVE_REPLY *reply) +{ +#define QMGR_REDIRECT(rp, tp, np) do { \ + (rp)->flags = 0; \ + vstring_strcpy((rp)->transport, (tp)); \ + vstring_strcpy((rp)->nexthop, (np)); \ + } while (0) + + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0) + resolve_clnt_query_from(message->sender, addr, reply); + else + resolve_clnt_verify_from(message->sender, addr, reply); + if (reply->flags & RESOLVE_FLAG_FAIL) { + QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY, + "4.3.0 address resolver failure"); + return (0); + } else if (reply->flags & RESOLVE_FLAG_ERROR) { + QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR, + "5.1.3 bad address syntax"); + return (0); + } else { + return (0); + } +} + +/* qmgr_message_resolve - resolve recipients */ + +static void qmgr_message_resolve(QMGR_MESSAGE *message) +{ + static ARGV *defer_xport_argv; + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *recipient; + QMGR_TRANSPORT *transport = 0; + QMGR_QUEUE *queue = 0; + RESOLVE_REPLY reply; + VSTRING *queue_name; + char *at; + char **cpp; + char *nexthop; + ssize_t len; + int status; + DSN dsn; + MSG_STATS stats; + DSN *saved_dsn; + +#define STREQ(x,y) (strcmp(x,y) == 0) +#define STR vstring_str +#define LEN VSTRING_LEN + + resolve_clnt_init(&reply); + queue_name = vstring_alloc(1); + for (recipient = list.info; recipient < list.info + list.len; recipient++) { + + /* + * Redirect overrides all else. But only once (per entire message). + * For consistency with the remainder of Postfix, rewrite the address + * to canonical form before resolving it. + */ + if (message->redirect_addr) { + if (recipient > list.info) { + recipient->u.queue = 0; + continue; + } + message->rcpt_offset = 0; + rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr, + reply.recipient); + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + if (qmgr_resolve_one(message, recipient, + recipient->address, &reply) < 0) + continue; + if (!STREQ(recipient->address, STR(reply.recipient))) + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + } + + /* + * Content filtering overrides the address resolver. + * + * XXX Bypass content_filter inspection for user-generated probes + * (sendmail -bv). MTA-generated probes never have the "please filter + * me" bits turned on, but we handle them here anyway for the sake of + * future proofing. + */ +#define FILTER_WITHOUT_NEXTHOP(filter, next) \ + (((next) = split_at((filter), ':')) == 0 || *(next) == 0) + +#define RCPT_WITHOUT_DOMAIN(rcpt, next) \ + ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0) + + else if (message->filter_xport + && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) { + reply.flags = 0; + vstring_strcpy(reply.transport, message->filter_xport); + if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop) + && *(nexthop = var_def_filter_nexthop) == 0 + && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop)) + nexthop = var_myhostname; + vstring_strcpy(reply.nexthop, nexthop); + vstring_strcpy(reply.recipient, recipient->address); + } + + /* + * Resolve the destination to (transport, nexthop, address). The + * result address may differ from the one specified by the sender. + */ + else { + if (qmgr_resolve_one(message, recipient, + recipient->address, &reply) < 0) + continue; + if (!STREQ(recipient->address, STR(reply.recipient))) + RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); + } + + /* + * Bounce null recipients. This should never happen, but is most + * likely the result of a fault in a different program, so aborting + * the queue manager process does not help. + */ + if (recipient->address[0] == 0) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR, + "5.1.3 null recipient address"); + } + + /* + * Discard mail to the local double bounce address here, so this + * system can run without a local delivery agent. They'd still have + * to configure something for mail directed to the local postmaster, + * though, but that is an RFC requirement anyway. + * + * XXX This lookup should be done in the resolver, and the mail should + * be directed to a general-purpose null delivery agent. + */ + if (reply.flags & RESOLVE_CLASS_LOCAL) { + at = strrchr(STR(reply.recipient), '@'); + len = (at ? (at - STR(reply.recipient)) + : strlen(STR(reply.recipient))); + if (strncasecmp_utf8(STR(reply.recipient), + var_double_bounce_sender, len) == 0 + && !var_double_bounce_sender[len]) { + status = sent(message->tflags, message->queue_id, + QMGR_MSG_STATS(&stats, message), recipient, + "none", DSN_SIMPLE(&dsn, "2.0.0", + "undeliverable postmaster notification discarded")); + if (status == 0) { + deliver_completed(message->fp, recipient->offset); +#if 0 + /* It's the default verification probe sender address. */ + msg_warn("%s: undeliverable postmaster notification discarded", + message->queue_id); +#endif + } else + message->flags |= status; + continue; + } + } + + /* + * Optionally defer deliveries over specific transports, unless the + * restriction is lifted temporarily. + */ + if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) { + if (defer_xport_argv == 0) + defer_xport_argv = argv_split(var_defer_xports, CHARS_COMMA_SP); + for (cpp = defer_xport_argv->argv; *cpp; cpp++) + if (strcmp(*cpp, STR(reply.transport)) == 0) + break; + if (*cpp) { + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.3.2 deferred transport"); + } + } + + /* + * Safety: defer excess address verification requests. + */ + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0 + && qmgr_vrfy_pend_count > var_vrfy_pend_limit) + QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, + "4.3.2 Too many address verification requests"); + + /* + * Look up or instantiate the proper transport. + */ + if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) { + if ((transport = qmgr_transport_find(STR(reply.transport))) == 0) + transport = qmgr_transport_create(STR(reply.transport)); + queue = 0; + } + + /* + * This message is being flushed. If need-be unthrottle the + * transport. + */ + if ((message->qflags & QMGR_FLUSH_EACH) != 0 + && QMGR_TRANSPORT_THROTTLED(transport)) + qmgr_transport_unthrottle(transport); + + /* + * This transport is dead. Defer delivery to this recipient. + */ + if (QMGR_TRANSPORT_THROTTLED(transport)) { + saved_dsn = transport->dsn; + if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) { + nexthop = qmgr_error_nexthop(saved_dsn); + vstring_strcpy(reply.nexthop, nexthop); + myfree(nexthop); + queue = 0; + } else { + qmgr_defer_recipient(message, recipient, saved_dsn); + continue; + } + } + + /* + * The nexthop destination provides the default name for the + * per-destination queue. When the delivery agent accepts only one + * recipient per delivery, give each recipient its own queue, so that + * deliveries to different recipients of the same message can happen + * in parallel, and so that we can enforce per-recipient concurrency + * limits and prevent one recipient from tying up all the delivery + * agent resources. We use recipient@nexthop as queue name rather + * than the actual recipient domain name, so that one recipient in + * multiple equivalent domains cannot evade the per-recipient + * concurrency limit. Split the address on the recipient delimiter if + * one is defined, so that extended addresses don't get extra + * delivery slots. + * + * Fold the result to lower case so that we don't have multiple queues + * for the same name. + * + * Important! All recipients in a queue must have the same nexthop + * value. It is OK to have multiple queues with the same nexthop + * value, but only when those queues are named after recipients. + * + * The single-recipient code below was written for local(8) like + * delivery agents, and assumes that all domains that deliver to the + * same (transport + nexthop) are aliases for $nexthop. Delivery + * concurrency is changed from per-domain into per-recipient, by + * changing the queue name from nexthop into localpart@nexthop. + * + * XXX This assumption is incorrect when different destinations share + * the same (transport + nexthop). In reality, such transports are + * rarely configured to use single-recipient deliveries. The fix is + * to decouple the per-destination recipient limit from the + * per-destination concurrency. + */ + vstring_strcpy(queue_name, STR(reply.nexthop)); + if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0 + && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0 + && transport->recipient_limit == 1) { + /* Copy the recipient localpart. */ + at = strrchr(STR(reply.recipient), '@'); + len = (at ? (at - STR(reply.recipient)) + : strlen(STR(reply.recipient))); + vstring_strncpy(queue_name, STR(reply.recipient), len); + /* Remove the address extension from the recipient localpart. */ + if (*var_rcpt_delim && split_addr(STR(queue_name), var_rcpt_delim)) + vstring_truncate(queue_name, strlen(STR(queue_name))); + /* Assume the recipient domain is equivalent to nexthop. */ + vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop)); + } + lowercase(STR(queue_name)); + + /* + * This transport is alive. Find or instantiate a queue for this + * recipient. + */ + if (queue == 0 || !STREQ(queue->name, STR(queue_name))) { + if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0) + queue = qmgr_queue_create(transport, STR(queue_name), + STR(reply.nexthop)); + } + + /* + * This message is being flushed. If need-be unthrottle the queue. + */ + if ((message->qflags & QMGR_FLUSH_EACH) != 0 + && QMGR_QUEUE_THROTTLED(queue)) + qmgr_queue_unthrottle(queue); + + /* + * This queue is dead. Defer delivery to this recipient. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + saved_dsn = queue->dsn; + if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) { + qmgr_defer_recipient(message, recipient, saved_dsn); + continue; + } + } + + /* + * This queue is alive. Bind this recipient to this queue instance. + */ + recipient->u.queue = queue; + } + resolve_clnt_free(&reply); + vstring_free(queue_name); +} + +/* qmgr_message_assign - assign recipients to specific delivery requests */ + +static void qmgr_message_assign(QMGR_MESSAGE *message) +{ + RECIPIENT_LIST list = message->rcpt_list; + RECIPIENT *recipient; + QMGR_ENTRY *entry = 0; + QMGR_QUEUE *queue; + + /* + * Try to bundle as many recipients in a delivery request as we can. When + * the recipient resolves to the same site and transport as the previous + * recipient, do not create a new queue entry, just move that recipient + * to the recipient list of the existing queue entry. All this provided + * that we do not exceed the transport-specific limit on the number of + * recipients per transaction. Skip recipients with a dead transport or + * destination. + */ +#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit))) + + for (recipient = list.info; recipient < list.info + list.len; recipient++) { + if ((queue = recipient->u.queue) != 0) { + if (message->single_rcpt || entry == 0 || entry->queue != queue + || !LIMIT_OK(entry->queue->transport->recipient_limit, + entry->rcpt_list.len)) { + entry = qmgr_entry_create(queue, message); + } + recipient_list_add(&entry->rcpt_list, recipient->offset, + recipient->dsn_orcpt, recipient->dsn_notify, + recipient->orig_addr, recipient->address); + qmgr_recipient_count++; + } + } + recipient_list_free(&message->rcpt_list); + recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); +} + +/* qmgr_message_free - release memory for in-core message structure */ + +void qmgr_message_free(QMGR_MESSAGE *message) +{ + if (message->refcount != 0) + msg_panic("qmgr_message_free: reference len: %d", message->refcount); + if (message->fp) + msg_panic("qmgr_message_free: queue file is open"); + myfree(message->queue_id); + myfree(message->queue_name); + if (message->dsn_envid) + myfree(message->dsn_envid); + if (message->encoding) + myfree(message->encoding); + if (message->sender) + myfree(message->sender); + if (message->verp_delims) + myfree(message->verp_delims); + if (message->filter_xport) + myfree(message->filter_xport); + if (message->inspect_xport) + myfree(message->inspect_xport); + if (message->redirect_addr) + myfree(message->redirect_addr); + if (message->client_name) + myfree(message->client_name); + if (message->client_addr) + myfree(message->client_addr); + if (message->client_port) + myfree(message->client_port); + if (message->client_proto) + myfree(message->client_proto); + if (message->client_helo) + myfree(message->client_helo); + if (message->sasl_method) + myfree(message->sasl_method); + if (message->sasl_username) + myfree(message->sasl_username); + if (message->sasl_sender) + myfree(message->sasl_sender); + if (message->log_ident) + myfree(message->log_ident); + if (message->rewrite_context) + myfree(message->rewrite_context); + recipient_list_free(&message->rcpt_list); + qmgr_message_count--; + if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) != 0) + qmgr_vrfy_pend_count--; + myfree((void *) message); +} + +/* qmgr_message_alloc - create in-core message structure */ + +QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, + int qflags, mode_t mode) +{ + const char *myname = "qmgr_message_alloc"; + QMGR_MESSAGE *message; + + if (msg_verbose) + msg_info("%s: %s %s", myname, queue_name, queue_id); + + /* + * Create an in-core message structure. + */ + message = qmgr_message_create(queue_name, queue_id, qflags); + + /* + * Extract message envelope information: time of arrival, sender address, + * recipient addresses. Skip files with malformed envelope information. + */ +#define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT) + + if (qmgr_message_open(message) < 0) { + qmgr_message_free(message); + return (0); + } + if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) { + msg_info("%s: skipped, still being delivered", queue_id); + qmgr_message_close(message); + qmgr_message_free(message); + return (QMGR_MESSAGE_LOCKED); + } + if (qmgr_message_read(message) < 0) { + qmgr_message_close(message); + qmgr_message_free(message); + return (0); + } else { + + /* + * We have validated the queue file content, so it is safe to modify + * the file properties now. + */ + if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0) + msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp)); + + /* + * Reset the defer log. This code should not be here, but we must + * reset the defer log *after* acquiring the exclusive lock on the + * queue file and *before* resolving new recipients. Since all those + * operations are encapsulated so nicely by this routine, the defer + * log reset has to be done here as well. + * + * Note: it is safe to remove the defer logfile from a previous queue + * run of this queue file, because the defer log contains information + * about recipients that still exist in this queue file. + */ + if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT) + msg_fatal("%s: %s: remove %s %s: %m", myname, + queue_id, MAIL_QUEUE_DEFER, queue_id); + qmgr_message_sort(message); + qmgr_message_resolve(message); + qmgr_message_sort(message); + qmgr_message_assign(message); + qmgr_message_close(message); + return (message); + } +} + +/* qmgr_message_realloc - refresh in-core message structure */ + +QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message) +{ + const char *myname = "qmgr_message_realloc"; + + /* + * Sanity checks. + */ + if (message->rcpt_offset <= 0) + msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset); + if (msg_verbose) + msg_info("%s: %s %s offset %ld", myname, message->queue_name, + message->queue_id, message->rcpt_offset); + + /* + * Extract recipient addresses. Skip files with malformed envelope + * information. + */ + if (qmgr_message_open(message) < 0) + return (0); + if (qmgr_message_read(message) < 0) { + qmgr_message_close(message); + return (0); + } else { + qmgr_message_sort(message); + qmgr_message_resolve(message); + qmgr_message_sort(message); + qmgr_message_assign(message); + qmgr_message_close(message); + return (message); + } +} diff --git a/src/oqmgr/qmgr_move.c b/src/oqmgr/qmgr_move.c new file mode 100644 index 0000000..e68f803 --- /dev/null +++ b/src/oqmgr/qmgr_move.c @@ -0,0 +1,104 @@ +/*++ +/* NAME +/* qmgr_move 3 +/* SUMMARY +/* move queue entries to another queue +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* void qmgr_move(from, to, time_stamp) +/* const char *from; +/* const char *to; +/* time_t time_stamp; +/* DESCRIPTION +/* The \fBqmgr_move\fR routine scans the \fIfrom\fR queue for entries +/* with valid queue names and moves them to the \fIto\fR queue. +/* If \fItime_stamp\fR is non-zero, the queue file time stamps are +/* set to the specified value. +/* Entries with invalid names are left alone. No attempt is made to +/* look for other badness such as multiple links or weird file types. +/* These issues are dealt with when a queue file is actually opened. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <sys/stat.h> +#include <string.h> +#include <utime.h> +#include <errno.h> + +/* Utility library. */ + +#include <msg.h> +#include <scan_dir.h> +#include <recipient_list.h> + +/* Global library. */ + +#include <mail_queue.h> +#include <mail_scan_dir.h> + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_move - move queue entries to another queue, leave bad files alone */ + +void qmgr_move(const char *src_queue, const char *dst_queue, + time_t time_stamp) +{ + const char *myname = "qmgr_move"; + SCAN_DIR *queue_dir; + char *queue_id; + struct utimbuf tbuf; + const char *path; + + if (strcmp(src_queue, dst_queue) == 0) + msg_panic("%s: source queue %s is destination", myname, src_queue); + if (msg_verbose) + msg_info("start move queue %s -> %s", src_queue, dst_queue); + + queue_dir = scan_dir_open(src_queue); + while ((queue_id = mail_scan_dir_next(queue_dir)) != 0) { + if (mail_queue_id_ok(queue_id)) { + if (time_stamp > 0) { + tbuf.actime = tbuf.modtime = time_stamp; + path = mail_queue_path((VSTRING *) 0, src_queue, queue_id); + if (utime(path, &tbuf) < 0) { + if (errno != ENOENT) + msg_fatal("%s: update %s time stamps: %m", myname, path); + msg_warn("%s: update %s time stamps: %m", myname, path); + continue; + } + } + if (mail_queue_rename(queue_id, src_queue, dst_queue)) { + if (errno != ENOENT) + msg_fatal("%s: rename %s from %s to %s: %m", + myname, queue_id, src_queue, dst_queue); + msg_warn("%s: rename %s from %s to %s: %m", + myname, queue_id, src_queue, dst_queue); + continue; + } + if (msg_verbose) + msg_info("%s: moved %s from %s to %s", + myname, queue_id, src_queue, dst_queue); + } else { + msg_warn("%s: ignored: queue %s id %s", + myname, src_queue, queue_id); + } + } + scan_dir_close(queue_dir); + + if (msg_verbose) + msg_info("end move queue %s -> %s", src_queue, dst_queue); +} diff --git a/src/oqmgr/qmgr_queue.c b/src/oqmgr/qmgr_queue.c new file mode 100644 index 0000000..a127c6b --- /dev/null +++ b/src/oqmgr/qmgr_queue.c @@ -0,0 +1,442 @@ +/*++ +/* NAME +/* qmgr_queue 3 +/* SUMMARY +/* per-destination queues +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* int qmgr_queue_count; +/* +/* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop) +/* QMGR_TRANSPORT *transport; +/* const char *name; +/* const char *nexthop; +/* +/* void qmgr_queue_done(queue) +/* QMGR_QUEUE *queue; +/* +/* QMGR_QUEUE *qmgr_queue_find(transport, name) +/* QMGR_TRANSPORT *transport; +/* const char *name; +/* +/* QMGR_QUEUE *qmgr_queue_select(transport) +/* QMGR_TRANSPORT *transport; +/* +/* void qmgr_queue_throttle(queue, dsn) +/* QMGR_QUEUE *queue; +/* DSN *dsn; +/* +/* void qmgr_queue_unthrottle(queue) +/* QMGR_QUEUE *queue; +/* +/* void qmgr_queue_suspend(queue, delay) +/* QMGR_QUEUE *queue; +/* int delay; +/* DESCRIPTION +/* These routines add/delete/manipulate per-destination queues. +/* Each queue corresponds to a specific transport and destination. +/* Each queue has a `todo' list of delivery requests for that +/* destination, and a `busy' list of delivery requests in progress. +/* +/* qmgr_queue_count is a global counter for the total number +/* of in-core queue structures. +/* +/* qmgr_queue_create() creates an empty named queue for the named +/* transport and destination. The queue is given an initial +/* concurrency limit as specified with the +/* \fIinitial_destination_concurrency\fR configuration parameter, +/* provided that it does not exceed the transport-specific +/* concurrency limit. +/* +/* qmgr_queue_done() disposes of a per-destination queue after all +/* its entries have been taken care of. It is an error to dispose +/* of a dead queue. +/* +/* qmgr_queue_find() looks up the named queue for the named +/* transport. A null result means that the queue was not found. +/* +/* qmgr_queue_select() uses a round-robin strategy to select +/* from the named transport one per-destination queue with a +/* non-empty `todo' list. +/* +/* qmgr_queue_throttle() handles a delivery error, and decrements the +/* concurrency limit for the destination, with a lower bound of 1. +/* When the cohort failure bound is reached, qmgr_queue_throttle() +/* sets the concurrency limit to zero and starts a timer +/* to re-enable delivery to the destination after a configurable delay. +/* +/* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects. +/* The concurrency limit for the destination is incremented, +/* provided that it does not exceed the destination concurrency +/* limit specified for the transport. This routine implements +/* "slow open" mode, and eliminates the "thundering herd" problem. +/* +/* qmgr_queue_suspend() suspends delivery for this destination +/* briefly. +/* DIAGNOSTICS +/* Panic: consistency check failure. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <time.h> + +/* Utility library. */ + +#include <msg.h> +#include <mymalloc.h> +#include <events.h> +#include <htable.h> + +/* Global library. */ + +#include <mail_params.h> +#include <recipient_list.h> +#include <mail_proto.h> /* QMGR_LOG_WINDOW */ + +/* Application-specific. */ + +#include "qmgr.h" + +int qmgr_queue_count; + +#define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ + (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \ + || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0) + +#define QMGR_LOG_FEEDBACK(feedback) \ + if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ + msg_info("%s: feedback %g", myname, feedback); + +#define QMGR_LOG_WINDOW(queue) \ + if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ + msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \ + myname, queue->name, queue->transport->dest_concurrency_limit, \ + queue->window, queue->success, queue->failure, queue->fail_cohorts); + +/* qmgr_queue_resume - resume delivery to destination */ + +static void qmgr_queue_resume(int event, void *context) +{ + QMGR_QUEUE *queue = (QMGR_QUEUE *) context; + const char *myname = "qmgr_queue_resume"; + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_SUSPENDED(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + + /* + * We can't simply force delivery on this queue: the transport's pending + * count may already be maxed out, and there may be other constraints + * that definitely should be none of our business. The best we can do is + * to play by the same rules as everyone else: let qmgr_active_drain() + * and round-robin selection take care of message selection. + */ + queue->window = 1; + + /* + * Every event handler that leaves a queue in the "ready" state should + * remove the queue when it is empty. + */ + if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) + qmgr_queue_done(queue); +} + +/* qmgr_queue_suspend - briefly suspend a destination */ + +void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) +{ + const char *myname = "qmgr_queue_suspend"; + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + if (queue->busy_refcount > 0) + msg_panic("%s: queue is busy", myname); + + /* + * Set the queue status to "suspended". No-one is supposed to remove a + * queue in suspended state. + */ + queue->window = QMGR_QUEUE_STAT_SUSPENDED; + event_request_timer(qmgr_queue_resume, (void *) queue, delay); +} + +/* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */ + +static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context) +{ + QMGR_QUEUE *queue = (QMGR_QUEUE *) context; + + /* + * This routine runs when a wakeup timer goes off; it does not run in the + * context of some queue manipulation. Therefore, it is safe to discard + * this in-core queue when it is empty and when this site is not dead. + */ + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) + qmgr_queue_done(queue); +} + +/* qmgr_queue_unthrottle - give this destination another chance */ + +void qmgr_queue_unthrottle(QMGR_QUEUE *queue) +{ + const char *myname = "qmgr_queue_unthrottle"; + QMGR_TRANSPORT *transport = queue->transport; + double feedback; + + if (msg_verbose) + msg_info("%s: queue %s", myname, queue->name); + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + + /* + * Don't restart the negative feedback hysteresis cycle with every + * positive feedback. Restart it only when we make a positive concurrency + * adjustment (i.e. at the end of a positive feedback hysteresis cycle). + * Otherwise negative feedback would be too aggressive: negative feedback + * takes effect immediately at the start of its hysteresis cycle. + */ + queue->fail_cohorts = 0; + + /* + * Special case when this site was dead. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue); + if (queue->dsn == 0) + msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); + dsn_free(queue->dsn); + queue->dsn = 0; + /* Back from the almost grave, best concurrency is anyone's guess. */ + if (queue->busy_refcount > 0) + queue->window = queue->busy_refcount; + else + queue->window = transport->init_dest_concurrency; + queue->success = queue->failure = 0; + QMGR_LOG_WINDOW(queue); + return; + } + + /* + * Increase the destination's concurrency limit until we reach the + * transport's concurrency limit. Allow for a margin the size of the + * initial destination concurrency, so that we're not too gentle. + * + * Why is the concurrency increment based on preferred concurrency and not + * on the number of outstanding delivery requests? The latter fluctuates + * wildly when deliveries complete in bursts (artificial benchmark + * measurements), and does not account for cached connections. + * + * Keep the window within reasonable distance from actual concurrency + * otherwise negative feedback will be ineffective. This expression + * assumes that busy_refcount changes gradually. This is invalid when + * deliveries complete in bursts (artificial benchmark measurements). + */ + if (transport->dest_concurrency_limit == 0 + || transport->dest_concurrency_limit > queue->window) + if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { + feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); + QMGR_LOG_FEEDBACK(feedback); + queue->success += feedback; + /* Prepare for overshoot (feedback > hysteresis, rounding error). */ + while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { + queue->window += transport->pos_feedback.hysteresis; + queue->success -= transport->pos_feedback.hysteresis; + queue->failure = 0; + } + /* Prepare for overshoot. */ + if (transport->dest_concurrency_limit > 0 + && queue->window > transport->dest_concurrency_limit) + queue->window = transport->dest_concurrency_limit; + } + QMGR_LOG_WINDOW(queue); +} + +/* qmgr_queue_throttle - handle destination delivery failure */ + +void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) +{ + const char *myname = "qmgr_queue_throttle"; + QMGR_TRANSPORT *transport = queue->transport; + double feedback; + + /* + * Sanity checks. + */ + if (!QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + if (queue->dsn) + msg_panic("%s: queue %s: spurious reason %s", + myname, queue->name, queue->dsn->reason); + if (msg_verbose) + msg_info("%s: queue %s: %s %s", + myname, queue->name, dsn->status, dsn->reason); + + /* + * Don't restart the positive feedback hysteresis cycle with every + * negative feedback. Restart it only when we make a negative concurrency + * adjustment (i.e. at the start of a negative feedback hysteresis + * cycle). Otherwise positive feedback would be too weak (positive + * feedback does not take effect until the end of its hysteresis cycle). + */ + + /* + * This queue is declared dead after a configurable number of + * pseudo-cohort failures. + */ + if (QMGR_QUEUE_READY(queue)) { + queue->fail_cohorts += 1.0 / queue->window; + if (transport->fail_cohort_limit > 0 + && queue->fail_cohorts >= transport->fail_cohort_limit) + queue->window = QMGR_QUEUE_STAT_THROTTLED; + } + + /* + * Decrease the destination's concurrency limit until we reach 1. Base + * adjustments on the concurrency limit itself, instead of using the + * actual concurrency. The latter fluctuates wildly when deliveries + * complete in bursts (artificial benchmark measurements). + * + * Even after reaching 1, we maintain the negative hysteresis cycle so that + * negative feedback can cancel out positive feedback. + */ + if (QMGR_QUEUE_READY(queue)) { + feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); + QMGR_LOG_FEEDBACK(feedback); + queue->failure -= feedback; + /* Prepare for overshoot (feedback > hysteresis, rounding error). */ + while (queue->failure - feedback / 2 < 0) { + queue->window -= transport->neg_feedback.hysteresis; + queue->success = 0; + queue->failure += transport->neg_feedback.hysteresis; + } + /* Prepare for overshoot. */ + if (queue->window < 1) + queue->window = 1; + } + + /* + * Special case for a site that just was declared dead. + */ + if (QMGR_QUEUE_THROTTLED(queue)) { + queue->dsn = DSN_COPY(dsn); + event_request_timer(qmgr_queue_unthrottle_wrapper, + (void *) queue, var_min_backoff_time); + queue->dflags = 0; + } + QMGR_LOG_WINDOW(queue); +} + +/* qmgr_queue_select - select in-core queue for delivery */ + +QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *transport) +{ + QMGR_QUEUE *queue; + + /* + * If we find a suitable site, rotate the list to enforce round-robin + * selection. See similar selection code in qmgr_transport_select(). + */ + for (queue = transport->queue_list.next; queue; queue = queue->peers.next) { + if (queue->window > queue->busy_refcount && queue->todo.next != 0) { + QMGR_LIST_ROTATE(transport->queue_list, queue); + if (msg_verbose) + msg_info("qmgr_queue_select: %s", queue->name); + return (queue); + } + } + return (0); +} + +/* qmgr_queue_done - delete in-core queue for site */ + +void qmgr_queue_done(QMGR_QUEUE *queue) +{ + const char *myname = "qmgr_queue_done"; + QMGR_TRANSPORT *transport = queue->transport; + + /* + * Sanity checks. It is an error to delete an in-core queue with pending + * messages or timers. + */ + if (queue->busy_refcount != 0 || queue->todo_refcount != 0) + msg_panic("%s: refcount: %d", myname, + queue->busy_refcount + queue->todo_refcount); + if (queue->todo.next || queue->busy.next) + msg_panic("%s: queue not empty: %s", myname, queue->name); + if (!QMGR_QUEUE_READY(queue)) + msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); + if (queue->dsn) + msg_panic("%s: queue %s: spurious reason %s", + myname, queue->name, queue->dsn->reason); + + /* + * Clean up this in-core queue. + */ + QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue); + htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0); + myfree(queue->name); + myfree(queue->nexthop); + qmgr_queue_count--; + myfree((void *) queue); +} + +/* qmgr_queue_create - create in-core queue for site */ + +QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, + const char *nexthop) +{ + QMGR_QUEUE *queue; + + /* + * If possible, choose an initial concurrency of > 1 so that one bad + * message or one bad network won't slow us down unnecessarily. + */ + + queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); + qmgr_queue_count++; + queue->dflags = 0; + queue->last_done = 0; + queue->name = mystrdup(name); + queue->nexthop = mystrdup(nexthop); + queue->todo_refcount = 0; + queue->busy_refcount = 0; + queue->transport = transport; + queue->window = transport->init_dest_concurrency; + queue->success = queue->failure = queue->fail_cohorts = 0; + QMGR_LIST_INIT(queue->todo); + QMGR_LIST_INIT(queue->busy); + queue->dsn = 0; + queue->clog_time_to_warn = 0; + QMGR_LIST_PREPEND(transport->queue_list, queue); + htable_enter(transport->queue_byname, name, (void *) queue); + return (queue); +} + +/* qmgr_queue_find - find in-core named queue */ + +QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name) +{ + return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name)); +} diff --git a/src/oqmgr/qmgr_scan.c b/src/oqmgr/qmgr_scan.c new file mode 100644 index 0000000..0665a23 --- /dev/null +++ b/src/oqmgr/qmgr_scan.c @@ -0,0 +1,185 @@ +/*++ +/* NAME +/* qmgr_scan 3 +/* SUMMARY +/* queue scanning +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_SCAN *qmgr_scan_create(queue_name) +/* const char *queue_name; +/* +/* char *qmgr_scan_next(scan_info) +/* QMGR_SCAN *scan_info; +/* +/* void qmgr_scan_request(scan_info, flags) +/* QMGR_SCAN *scan_info; +/* int flags; +/* DESCRIPTION +/* This module implements queue scans. A queue scan always runs +/* to completion, so that all files get a fair chance. The caller +/* can request that a queue scan be restarted once it completes. +/* +/* qmgr_scan_create() creates a context for scanning the named queue, +/* but does not start a queue scan. +/* +/* qmgr_scan_next() returns the base name of the next queue file. +/* A null pointer means that no file was found. qmgr_scan_next() +/* automagically restarts a queue scan when a scan request had +/* arrived while the scan was in progress. +/* +/* qmgr_scan_request() records a request for the next queue scan. The +/* flags argument is the bit-wise OR of zero or more of the following, +/* unrecognized flags being ignored: +/* .IP QMGR_FLUSH_ONCE +/* Forget state information about dead hosts or transports. +/* This request takes effect immediately. +/* .IP QMGR_FLUSH_DFXP +/* Override the defer_transports setting. This takes effect +/* immediately when a queue scan is in progress, and affects +/* the next queue scan. +/* .IP QMGR_SCAN_ALL +/* Ignore queue file time stamps. This takes effect immediately +/* when a queue scan is in progress, and affects the next queue +/* scan. +/* .IP QMGR_SCAN_START +/* Start a queue scan when none is in progress, or restart the +/* current scan upon completion. +/* DIAGNOSTICS +/* Fatal: out of memory. +/* Panic: interface violations, internal consistency errors. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> + +/* Utility library. */ + +#include <msg.h> +#include <mymalloc.h> +#include <scan_dir.h> + +/* Global library. */ + +#include <mail_scan_dir.h> + +/* Application-specific. */ + +#include "qmgr.h" + +/* qmgr_scan_start - start queue scan */ + +static void qmgr_scan_start(QMGR_SCAN *scan_info) +{ + const char *myname = "qmgr_scan_start"; + + /* + * Sanity check. + */ + if (scan_info->handle) + msg_panic("%s: %s queue scan in progress", + myname, scan_info->queue); + + /* + * Give the poor tester a clue. + */ + if (msg_verbose) + msg_info("%s: %sstart %s queue scan", + myname, + scan_info->nflags & QMGR_SCAN_START ? "re" : "", + scan_info->queue); + + /* + * Start or restart the scan. + */ + scan_info->flags = scan_info->nflags; + scan_info->nflags = 0; + scan_info->handle = scan_dir_open(scan_info->queue); +} + +/* qmgr_scan_request - request for future scan */ + +void qmgr_scan_request(QMGR_SCAN *scan_info, int flags) +{ + + /* + * Apply "forget all dead destinations" requests immediately. Throttle + * dead transports and queues at the earliest opportunity: preferably + * during an already ongoing queue scan, otherwise the throttling will + * have to wait until a "start scan" trigger arrives. + * + * The QMGR_FLUSH_ONCE request always comes with QMGR_FLUSH_DFXP, and + * sometimes it also comes with QMGR_SCAN_ALL. It becomes a completely + * different story when a flush request is encoded in file permissions. + */ + if (flags & QMGR_FLUSH_ONCE) + qmgr_enable_all(); + + /* + * Apply "ignore time stamp" requests also towards the scan that is + * already in progress. + */ + if (scan_info->handle != 0 && (flags & QMGR_SCAN_ALL)) + scan_info->flags |= QMGR_SCAN_ALL; + + /* + * Apply "override defer_transports" requests also towards the scan that + * is already in progress. + */ + if (scan_info->handle != 0 && (flags & QMGR_FLUSH_DFXP)) + scan_info->flags |= QMGR_FLUSH_DFXP; + + /* + * If a scan is in progress, just record the request. + */ + scan_info->nflags |= flags; + if (scan_info->handle == 0 && (flags & QMGR_SCAN_START) != 0) { + scan_info->nflags &= ~QMGR_SCAN_START; + qmgr_scan_start(scan_info); + } +} + +/* qmgr_scan_next - look for next queue file */ + +char *qmgr_scan_next(QMGR_SCAN *scan_info) +{ + char *path = 0; + + /* + * Restart the scan if we reach the end and a queue scan request has + * arrived in the mean time. + */ + if (scan_info->handle && (path = mail_scan_dir_next(scan_info->handle)) == 0) { + scan_info->handle = scan_dir_close(scan_info->handle); + if (msg_verbose && (scan_info->nflags & QMGR_SCAN_START) == 0) + msg_info("done %s queue scan", scan_info->queue); + } + if (!scan_info->handle && (scan_info->nflags & QMGR_SCAN_START)) { + qmgr_scan_start(scan_info); + path = mail_scan_dir_next(scan_info->handle); + } + return (path); +} + +/* qmgr_scan_create - create queue scan context */ + +QMGR_SCAN *qmgr_scan_create(const char *queue) +{ + QMGR_SCAN *scan_info; + + scan_info = (QMGR_SCAN *) mymalloc(sizeof(*scan_info)); + scan_info->queue = mystrdup(queue); + scan_info->flags = scan_info->nflags = 0; + scan_info->handle = 0; + return (scan_info); +} diff --git a/src/oqmgr/qmgr_transport.c b/src/oqmgr/qmgr_transport.c new file mode 100644 index 0000000..ed780db --- /dev/null +++ b/src/oqmgr/qmgr_transport.c @@ -0,0 +1,472 @@ +/*++ +/* NAME +/* qmgr_transport 3 +/* SUMMARY +/* per-transport data structures +/* SYNOPSIS +/* #include "qmgr.h" +/* +/* QMGR_TRANSPORT *qmgr_transport_create(name) +/* const char *name; +/* +/* QMGR_TRANSPORT *qmgr_transport_find(name) +/* const char *name; +/* +/* QMGR_TRANSPORT *qmgr_transport_select() +/* +/* void qmgr_transport_alloc(transport, notify) +/* QMGR_TRANSPORT *transport; +/* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp); +/* +/* void qmgr_transport_throttle(transport, dsn) +/* QMGR_TRANSPORT *transport; +/* DSN *dsn; +/* +/* void qmgr_transport_unthrottle(transport) +/* QMGR_TRANSPORT *transport; +/* DESCRIPTION +/* This module organizes the world by message transport type. +/* Each transport can have zero or more destination queues +/* associated with it. +/* +/* qmgr_transport_create() instantiates a data structure for the +/* named transport type. +/* +/* qmgr_transport_find() looks up an existing message transport +/* data structure. +/* +/* qmgr_transport_select() attempts to find a transport that +/* has messages pending delivery. This routine implements +/* round-robin search among transports. +/* +/* qmgr_transport_alloc() allocates a delivery process for the +/* specified transport type. Allocation is performed asynchronously. +/* When a process becomes available, the application callback routine +/* is invoked with as arguments the transport and a stream that +/* is connected to a delivery process. It is an error to call +/* qmgr_transport_alloc() while delivery process allocation for +/* the same transport is in progress. +/* +/* qmgr_transport_throttle blocks further allocation of delivery +/* processes for the named transport. Attempts to throttle a +/* throttled transport are ignored. +/* +/* qmgr_transport_unthrottle() undoes qmgr_transport_throttle(). +/* Attempts to unthrottle a non-throttled transport are ignored. +/* DIAGNOSTICS +/* Panic: consistency check failure. Fatal: out of memory. +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <unistd.h> + +#include <sys/time.h> /* FD_SETSIZE */ +#include <sys/types.h> /* FD_SETSIZE */ +#include <unistd.h> /* FD_SETSIZE */ + +#ifdef USE_SYS_SELECT_H +#include <sys/select.h> /* FD_SETSIZE */ +#endif + +/* Utility library. */ + +#include <msg.h> +#include <htable.h> +#include <events.h> +#include <mymalloc.h> +#include <vstream.h> +#include <iostuff.h> + +/* Global library. */ + +#include <mail_proto.h> +#include <recipient_list.h> +#include <mail_conf.h> +#include <mail_params.h> + +/* Application-specific. */ + +#include "qmgr.h" + +HTABLE *qmgr_transport_byname; /* transport by name */ +QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */ + + /* + * A local structure to remember a delivery process allocation request. + */ +typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC; + +struct QMGR_TRANSPORT_ALLOC { + QMGR_TRANSPORT *transport; /* transport context */ + VSTREAM *stream; /* delivery service stream */ + QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ +}; + + /* + * Connections to delivery agents are managed asynchronously. Each delivery + * agent connection goes through multiple wait states: + * + * - With Linux/Solaris and old queue manager implementations only, wait for + * the server to invoke accept(). + * + * - Wait for the delivery agent's announcement that it is ready to receive a + * delivery request. + * + * - Wait for the delivery request completion status. + * + * Older queue manager implementations had only one pending delivery agent + * connection per transport. With low-latency destinations, the output rates + * were reduced on Linux/Solaris systems that had the extra wait state. + * + * To maximize delivery agent output rates with low-latency destinations, the + * following changes were made to the queue manager by the end of the 2.4 + * development cycle: + * + * - The Linux/Solaris accept() wait state was eliminated. + * + * - A pipeline was implemented for pending delivery agent connections. The + * number of pending delivery agent connections was increased from one to + * two: the number of before-delivery wait states, plus one extra pipeline + * slot to prevent the pipeline from stalling easily. Increasing the + * pipeline much further actually hurt performance. + * + * - To reduce queue manager disk competition with delivery agents, the queue + * scanning algorithm was modified to import only one message per interrupt. + * The incoming and deferred queue scans now happen on alternate interrupts. + * + * Simplistically reasoned, a non-zero (incoming + active) queue length is + * equivalent to a time shift for mail deliveries; this is undesirable when + * delivery agents are not fully utilized. + * + * On the other hand a non-empty active queue is what allows us to do clever + * things such as queue file prefetch, concurrency windows, and connection + * caching; the idea is that such "thinking time" is affordable only after + * the output channels are maxed out. + */ +#ifndef QMGR_TRANSPORT_MAX_PEND +#define QMGR_TRANSPORT_MAX_PEND 2 +#endif + + /* + * Important note on the _transport_rate_delay implementation: after + * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all + * code paths must directly or indirectly invoke qmgr_transport_unthrottle() + * or qmgr_transport_throttle(). Otherwise, transports with non-zero + * _transport_rate_delay will become stuck. + */ + +/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ + +static void qmgr_transport_unthrottle_wrapper(int unused_event, void *context) +{ + qmgr_transport_unthrottle((QMGR_TRANSPORT *) context); +} + +/* qmgr_transport_unthrottle - open the throttle */ + +void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport) +{ + const char *myname = "qmgr_transport_unthrottle"; + + /* + * This routine runs after expiration of the timer set by + * qmgr_transport_throttle(), or whenever a delivery transport has been + * used without malfunction. In either case, we enable delivery again if + * the transport was throttled. We always reset the transport rate lock. + */ + if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) { + if (msg_verbose) + msg_info("%s: transport %s", myname, transport->name); + transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD; + if (transport->dsn == 0) + msg_panic("%s: transport %s: null reason", + myname, transport->name); + dsn_free(transport->dsn); + transport->dsn = 0; + event_cancel_timer(qmgr_transport_unthrottle_wrapper, + (void *) transport); + } + if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) + transport->flags &= ~QMGR_TRANSPORT_STAT_RATE_LOCK; +} + +/* qmgr_transport_throttle - disable delivery process allocation */ + +void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) +{ + const char *myname = "qmgr_transport_throttle"; + + /* + * We are unable to connect to a deliver process for this type of message + * transport. Instead of hosing the system by retrying in a tight loop, + * back off and disable this transport type for a while. + */ + if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) { + if (msg_verbose) + msg_info("%s: transport %s: status: %s reason: %s", + myname, transport->name, dsn->status, dsn->reason); + transport->flags |= QMGR_TRANSPORT_STAT_DEAD; + if (transport->dsn) + msg_panic("%s: transport %s: spurious reason: %s", + myname, transport->name, transport->dsn->reason); + transport->dsn = DSN_COPY(dsn); + event_request_timer(qmgr_transport_unthrottle_wrapper, + (void *) transport, var_transport_retry_time); + } +} + +/* qmgr_transport_abort - transport connect watchdog */ + +static void qmgr_transport_abort(int unused_event, void *context) +{ + QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; + + msg_fatal("timeout connecting to transport: %s", alloc->transport->name); +} + +/* qmgr_transport_rate_event - delivery process availability notice */ + +static void qmgr_transport_rate_event(int unused_event, void *context) +{ + QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; + + alloc->notify(alloc->transport, alloc->stream); + myfree((void *) alloc); +} + +/* qmgr_transport_event - delivery process availability notice */ + +static void qmgr_transport_event(int unused_event, void *context) +{ + QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; + + /* + * This routine notifies the application when the request given to + * qmgr_transport_alloc() completes. + */ + if (msg_verbose) + msg_info("transport_event: %s", alloc->transport->name); + + /* + * Connection request completed. Stop the watchdog timer. + */ + event_cancel_timer(qmgr_transport_abort, context); + + /* + * Disable further read events that end up calling this function, and + * free up this pending connection pipeline slot. + */ + if (alloc->stream) { + event_disable_readwrite(vstream_fileno(alloc->stream)); + non_blocking(vstream_fileno(alloc->stream), BLOCKING); + } + alloc->transport->pending -= 1; + + /* + * Notify the requestor. + */ + if (alloc->transport->xport_rate_delay > 0) { + if ((alloc->transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) == 0) + msg_panic("transport_event: missing rate lock for transport %s", + alloc->transport->name); + event_request_timer(qmgr_transport_rate_event, (void *) alloc, + alloc->transport->xport_rate_delay); + } else { + alloc->notify(alloc->transport, alloc->stream); + myfree((void *) alloc); + } +} + +/* qmgr_transport_select - select transport for allocation */ + +QMGR_TRANSPORT *qmgr_transport_select(void) +{ + QMGR_TRANSPORT *xport; + QMGR_QUEUE *queue; + int need; + + /* + * If we find a suitable transport, rotate the list of transports to + * effectuate round-robin selection. See similar selection code in + * qmgr_queue_select(). + * + * This function is called repeatedly until all transports have maxed out + * the number of pending delivery agent connections, until all delivery + * agent concurrency windows are maxed out, or until we run out of "todo" + * queue entries. + */ +#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) + + for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { + if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 + || (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0 + || xport->pending >= QMGR_TRANSPORT_MAX_PEND) + continue; + need = xport->pending + 1; + for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { + if (QMGR_QUEUE_READY(queue) == 0) + continue; + if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, + queue->todo_refcount)) <= 0) { + QMGR_LIST_ROTATE(qmgr_transport_list, xport); + if (msg_verbose) + msg_info("qmgr_transport_select: %s", xport->name); + return (xport); + } + } + } + return (0); +} + +/* qmgr_transport_alloc - allocate delivery process */ + +void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) +{ + QMGR_TRANSPORT_ALLOC *alloc; + + /* + * Sanity checks. + */ + if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) + msg_panic("qmgr_transport: dead transport: %s", transport->name); + if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) + msg_panic("qmgr_transport: rate-locked transport: %s", transport->name); + if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) + msg_panic("qmgr_transport: excess allocation: %s", transport->name); + + /* + * When this message delivery transport is rate-limited, do not select it + * again before the end of a message delivery transaction. + */ + if (transport->xport_rate_delay > 0) + transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK; + + /* + * Connect to the well-known port for this delivery service, and wake up + * when a process announces its availability. Allow only a limited number + * of delivery process allocation attempts for this transport. In case of + * problems, back off. Do not hose the system when it is in trouble + * already. + * + * Use non-blocking connect(), so that Linux won't block the queue manager + * until the delivery agent calls accept(). + * + * When the connection to delivery agent cannot be completed, notify the + * event handler so that it can throttle the transport and defer the todo + * queues, just like it does when communication fails *after* connection + * completion. + * + * Before Postfix 2.4, the event handler was not invoked after connect() + * error, and mail was not deferred. Because of this, mail would be stuck + * in the active queue after triggering a "connection refused" condition. + */ + alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); + alloc->transport = transport; + alloc->notify = notify; + transport->pending += 1; + if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, + NON_BLOCKING)) == 0) { + msg_warn("connect to transport %s/%s: %m", + MAIL_CLASS_PRIVATE, transport->name); + event_request_timer(qmgr_transport_event, (void *) alloc, 0); + return; + } +#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(CA_VSTREAM_CTL_DUPFD) +#ifndef THRESHOLD_FD_WORKAROUND +#define THRESHOLD_FD_WORKAROUND 128 +#endif + vstream_control(alloc->stream, + CA_VSTREAM_CTL_DUPFD(THRESHOLD_FD_WORKAROUND), + CA_VSTREAM_CTL_END); +#endif + event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, + (void *) alloc); + + /* + * Guard against broken systems. + */ + event_request_timer(qmgr_transport_abort, (void *) alloc, + var_daemon_timeout); +} + +/* qmgr_transport_create - create transport instance */ + +QMGR_TRANSPORT *qmgr_transport_create(const char *name) +{ + QMGR_TRANSPORT *transport; + + if (htable_find(qmgr_transport_byname, name) != 0) + msg_panic("qmgr_transport_create: transport exists: %s", name); + transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); + transport->flags = 0; + transport->pending = 0; + transport->name = mystrdup(name); + + /* + * Use global configuration settings or transport-specific settings. + */ + transport->dest_concurrency_limit = + get_mail_conf_int2(name, _DEST_CON_LIMIT, + var_dest_con_limit, 0, 0); + transport->recipient_limit = + get_mail_conf_int2(name, _DEST_RCPT_LIMIT, + var_dest_rcpt_limit, 0, 0); + transport->init_dest_concurrency = + get_mail_conf_int2(name, _INIT_DEST_CON, + var_init_dest_concurrency, 1, 0); + transport->xport_rate_delay = get_mail_conf_time2(name, _XPORT_RATE_DELAY, + var_xport_rate_delay, + 's', 0, 0); + transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, + var_dest_rate_delay, + 's', 0, 0); + + if (transport->rate_delay > 0) + transport->dest_concurrency_limit = 1; + if (transport->dest_concurrency_limit != 0 + && transport->dest_concurrency_limit < transport->init_dest_concurrency) + transport->init_dest_concurrency = transport->dest_concurrency_limit; + + transport->queue_byname = htable_create(0); + QMGR_LIST_INIT(transport->queue_list); + transport->dsn = 0; + qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK, + VAR_CONC_POS_FDBACK, var_conc_pos_feedback); + qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK, + VAR_CONC_NEG_FDBACK, var_conc_neg_feedback); + transport->fail_cohort_limit = + get_mail_conf_int2(name, _CONC_COHORT_LIM, + var_conc_cohort_limit, 0, 0); + if (qmgr_transport_byname == 0) + qmgr_transport_byname = htable_create(10); + htable_enter(qmgr_transport_byname, name, (void *) transport); + QMGR_LIST_APPEND(qmgr_transport_list, transport); + if (msg_verbose) + msg_info("qmgr_transport_create: %s concurrency %d recipients %d", + transport->name, transport->dest_concurrency_limit, + transport->recipient_limit); + return (transport); +} + +/* qmgr_transport_find - find transport instance */ + +QMGR_TRANSPORT *qmgr_transport_find(const char *name) +{ + return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name)); +} |