diff options
Diffstat (limited to '')
-rw-r--r-- | src/smtpstone/qmqp-sink.c | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/src/smtpstone/qmqp-sink.c b/src/smtpstone/qmqp-sink.c new file mode 100644 index 0000000..2fcbdd1 --- /dev/null +++ b/src/smtpstone/qmqp-sink.c @@ -0,0 +1,325 @@ +/*++ +/* NAME +/* qmqp-sink 1 +/* SUMMARY +/* parallelized QMQP test server +/* SYNOPSIS +/* .fi +/* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] +/* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR +/* +/* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] +/* \fBunix:\fR\fIpathname\fR \fIbacklog\fR +/* DESCRIPTION +/* \fBqmqp-sink\fR listens on the named host (or address) and port. +/* It receives messages from the network and throws them away. +/* The purpose is to measure QMQP client performance, not protocol +/* compliance. +/* Connections can be accepted on IPv4 or IPv6 endpoints, or on +/* UNIX-domain sockets. +/* IPv4 and IPv6 are the default. +/* This program is the complement of the \fBqmqp-source\fR(1) program. +/* +/* Note: this is an unsupported test program. No attempt is made +/* to maintain compatibility between successive versions. +/* +/* Arguments: +/* .IP \fB-4\fR +/* Support IPv4 only. This option has no effect when +/* Postfix is built without IPv6 support. +/* .IP \fB-6\fR +/* Support IPv6 only. This option is not available when +/* Postfix is built without IPv6 support. +/* .IP \fB-c\fR +/* Display a running counter that is updated whenever a delivery +/* is completed. +/* .IP \fB-v\fR +/* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP +/* conversation. +/* .IP "\fB-x \fItime\fR" +/* Terminate after \fItime\fR seconds. This is to facilitate memory +/* leak testing. +/* SEE ALSO +/* qmqp-source(1), QMQP message generator +/* LICENSE +/* .ad +/* .fi +/* The Secure Mailer license must be distributed with this software. +/* AUTHOR(S) +/* Wietse Venema +/* IBM T.J. Watson Research +/* P.O. Box 704 +/* Yorktown Heights, NY 10598, USA +/* +/* Wietse Venema +/* Google, Inc. +/* 111 8th Avenue +/* New York, NY 10011, USA +/*--*/ + +/* System library. */ + +#include <sys_defs.h> +#include <sys/socket.h> +#include <sys/wait.h> +#include <unistd.h> +#include <string.h> +#include <stdlib.h> +#include <fcntl.h> +#include <signal.h> + +/* Utility library. */ + +#include <msg.h> +#include <vstring.h> +#include <vstream.h> +#include <listen.h> +#include <events.h> +#include <mymalloc.h> +#include <iostuff.h> +#include <msg_vstream.h> +#include <netstring.h> +#include <inet_proto.h> + +/* Global library. */ + +#include <qmqp_proto.h> +#include <mail_version.h> + +/* Application-specific. */ + +typedef struct { + VSTREAM *stream; /* client connection */ + int count; /* bytes to go */ +} SINK_STATE; + +static int var_tmout; +static VSTRING *buffer; +static void disconnect(SINK_STATE *); +static int count_deliveries; +static int counter; + +/* send_reply - finish conversation */ + +static void send_reply(SINK_STATE *state) +{ + vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK); + NETSTRING_PUT_BUF(state->stream, buffer); + netstring_fflush(state->stream); + if (count_deliveries) { + counter++; + vstream_printf("%d\r", counter); + vstream_fflush(VSTREAM_OUT); + } + disconnect(state); +} + +/* read_data - read over-all netstring data */ + +static void read_data(int unused_event, void *context) +{ + SINK_STATE *state = (SINK_STATE *) context; + int fd = vstream_fileno(state->stream); + int count; + + /* + * Refill the VSTREAM buffer, if necessary. + */ + if (VSTREAM_GETC(state->stream) == VSTREAM_EOF) + netstring_except(state->stream, vstream_ftimeout(state->stream) ? + NETSTRING_ERR_TIME : NETSTRING_ERR_EOF); + state->count--; + + /* + * Flush the VSTREAM buffer. As documented, vstream_fseek() discards + * unread input. + */ + if ((count = vstream_peek(state->stream)) > 0) { + state->count -= count; + if (state->count <= 0) { + send_reply(state); + return; + } + vstream_fpurge(state->stream, VSTREAM_PURGE_BOTH); + } + + /* + * Do not block while waiting for the arrival of more data. + */ + event_disable_readwrite(fd); + event_enable_read(fd, read_data, context); +} + +/* read_length - read over-all netstring length */ + +static void read_length(int event, void *context) +{ + SINK_STATE *state = (SINK_STATE *) context; + + switch (vstream_setjmp(state->stream)) { + + default: + msg_panic("unknown error reading input"); + + case NETSTRING_ERR_TIME: + msg_panic("attempt to read non-readable socket"); + /* NOTREACHED */ + + case NETSTRING_ERR_EOF: + msg_warn("lost connection"); + disconnect(state); + return; + + case NETSTRING_ERR_FORMAT: + msg_warn("netstring format error"); + disconnect(state); + return; + + case NETSTRING_ERR_SIZE: + msg_warn("netstring size error"); + disconnect(state); + return; + + /* + * Include the netstring terminator in the read byte count. This + * violates abstractions. + */ + case 0: + state->count = netstring_get_length(state->stream) + 1; + read_data(event, context); + return; + } +} + +/* disconnect - handle disconnection events */ + +static void disconnect(SINK_STATE *state) +{ + event_disable_readwrite(vstream_fileno(state->stream)); + vstream_fclose(state->stream); + myfree((void *) state); +} + +/* connect_event - handle connection events */ + +static void connect_event(int unused_event, void *context) +{ + int sock = CAST_ANY_PTR_TO_INT(context); + struct sockaddr_storage ss; + SOCKADDR_SIZE len = sizeof(ss); + struct sockaddr *sa = (struct sockaddr *) &ss; + SINK_STATE *state; + int fd; + + if ((fd = accept(sock, sa, &len)) >= 0) { + if (msg_verbose) + msg_info("connect (%s)", +#ifdef AF_LOCAL + sa->sa_family == AF_LOCAL ? "AF_LOCAL" : +#else + sa->sa_family == AF_UNIX ? "AF_UNIX" : +#endif + sa->sa_family == AF_INET ? "AF_INET" : +#ifdef AF_INET6 + sa->sa_family == AF_INET6 ? "AF_INET6" : +#endif + "unknown protocol family"); + non_blocking(fd, NON_BLOCKING); + state = (SINK_STATE *) mymalloc(sizeof(*state)); + state->stream = vstream_fdopen(fd, O_RDWR); + vstream_tweak_sock(state->stream); + netstring_setup(state->stream, var_tmout); + event_enable_read(fd, read_length, (void *) state); + } +} + +/* terminate - voluntary exit */ + +static void terminate(int unused_event, void *unused_context) +{ + exit(0); +} + +/* usage - explain */ + +static void usage(char *myname) +{ + msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname); +} + +MAIL_VERSION_STAMP_DECLARE; + +int main(int argc, char **argv) +{ + int sock; + int backlog; + int ch; + int ttl; + const char *protocols = INET_PROTO_NAME_ALL; + + /* + * Fingerprint executables and core dumps. + */ + MAIL_VERSION_STAMP_ALLOCATE; + + /* + * Fix 20051207. + */ + signal(SIGPIPE, SIG_IGN); + + /* + * Initialize diagnostics. + */ + msg_vstream_init(argv[0], VSTREAM_ERR); + + /* + * Parse JCL. + */ + while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) { + switch (ch) { + case '4': + protocols = INET_PROTO_NAME_IPV4; + break; + case '6': + protocols = INET_PROTO_NAME_IPV6; + break; + case 'c': + count_deliveries++; + break; + case 'v': + msg_verbose++; + break; + case 'x': + if ((ttl = atoi(optarg)) <= 0) + usage(argv[0]); + event_request_timer(terminate, (void *) 0, ttl); + break; + default: + usage(argv[0]); + } + } + if (argc - optind != 2) + usage(argv[0]); + if ((backlog = atoi(argv[optind + 1])) <= 0) + usage(argv[0]); + + /* + * Initialize. + */ + (void) inet_proto_init("protocols", protocols); + buffer = vstring_alloc(1024); + if (strncmp(argv[optind], "unix:", 5) == 0) { + sock = unix_listen(argv[optind] + 5, backlog, BLOCKING); + } else { + if (strncmp(argv[optind], "inet:", 5) == 0) + argv[optind] += 5; + sock = inet_listen(argv[optind], backlog, BLOCKING); + } + + /* + * Start the event handler. + */ + event_enable_read(sock, connect_event, CAST_INT_TO_VOID_PTR(sock)); + for (;;) + event_loop(-1); +} |