/*++ /* NAME /* nbbio 3 /* SUMMARY /* non-blocking buffered I/O /* SYNOPSIS /* #include /* /* NBBIO *nbbio_create(fd, bufsize, label, action, context) /* int fd; /* ssize_t bufsize; /* const char *label; /* void (*action)(int event, void *context); /* char *context; /* /* void nbbio_free(np) /* NBBIO *np; /* /* void nbbio_enable_read(np, timeout) /* NBBIO *np; /* int timeout; /* /* void nbbio_enable_write(np, timeout) /* NBBIO *np; /* int timeout; /* /* void nbbio_disable_readwrite(np) /* NBBIO *np; /* /* void nbbio_slumber(np, timeout) /* NBBIO *np; /* int timeout; /* /* int NBBIO_ACTIVE_FLAGS(np) /* NBBIO *np; /* /* int NBBIO_ERROR_FLAGS(np) /* NBBIO *np; /* /* const ssize_t NBBIO_BUFSIZE(np) /* NBBIO *np; /* /* ssize_t NBBIO_READ_PEND(np) /* NBBIO *np; /* /* char *NBBIO_READ_BUF(np) /* NBBIO *np; /* /* const ssize_t NBBIO_WRITE_PEND(np) /* NBBIO *np; /* /* char *NBBIO_WRITE_BUF(np) /* NBBIO *np; /* DESCRIPTION /* This module implements low-level support for event-driven /* I/O on a full-duplex stream. Read/write events are handled /* by pseudothreads that run under control by the events(5) /* module. After each I/O operation, the application is /* notified via a call-back routine. /* /* It is up to the call-back routine to turn on/off read/write /* events as appropriate. It is an error to leave read events /* enabled for a buffer that is full, or to leave write events /* enabled for a buffer that is empty. /* /* nbbio_create() creates a pair of buffers of the named size /* for the named stream. The label specifies the purpose of /* the stream, and is used for diagnostic messages. The /* nbbio(3) event handler invokes the application call-back /* routine with the current event type (EVENT_READ etc.) and /* with the application-specified context. /* /* nbbio_free() terminates any pseudothreads associated with /* the named buffer pair, closes the stream, and destroys the /* buffer pair. /* /* nbbio_enable_read() enables a read pseudothread (if one /* does not already exist) for the named buffer pair, and /* (re)starts the buffer pair's timer. It is an error to enable /* a read pseudothread while the read buffer is full, or while /* a write pseudothread is still enabled. /* /* nbbio_enable_write() enables a write pseudothread (if one /* does not already exist) for the named buffer pair, and /* (re)starts the buffer pair's timer. It is an error to enable /* a write pseudothread while the write buffer is empty, or /* while a read pseudothread is still enabled. /* /* nbbio_disable_readwrite() disables any read/write pseudothreads /* for the named buffer pair, including timeouts. To ensure /* buffer liveness, use nbbio_slumber() instead of /* nbbio_disable_readwrite(). It is no error to call this /* function while no read/write pseudothread is enabled. /* /* nbbio_slumber() disables any read/write pseudothreads for /* the named buffer pair, but keeps the timer active to ensure /* buffer liveness. It is no error to call this function while /* no read/write pseudothread is enabled. /* /* NBBIO_ERROR_FLAGS() returns the error flags for the named buffer /* pair: zero or more of NBBIO_FLAG_EOF (read EOF), NBBIO_FLAG_ERROR /* (read/write error) or NBBIO_FLAG_TIMEOUT (time limit /* exceeded). /* /* NBBIO_ACTIVE_FLAGS() returns the pseudothread flags for the /* named buffer pair: NBBIO_FLAG_READ (read pseudothread is /* active), NBBIO_FLAG_WRITE (write pseudothread is active), /* or zero (no pseudothread is active). /* /* NBBIO_WRITE_PEND() and NBBIO_WRITE_BUF() evaluate to the /* number of to-be-written bytes and the write buffer for the /* named buffer pair. NBBIO_WRITE_PEND() must be updated by /* the application code that fills the write buffer; no more /* than NBBIO_BUFSIZE() bytes may be filled. /* /* NBBIO_READ_PEND() and NBBIO_READ_BUF() evaluate to the /* number of unread bytes and the read buffer for the named /* buffer pair. NBBIO_READ_PEND() and NBBIO_READ_BUF() must /* be updated by the application code that drains the read /* buffer. /* SEE ALSO /* events(3) event manager /* DIAGNOSTICS /* Panic: interface violation. /* /* Fatal: out of memory. /* LICENSE /* .ad /* .fi /* The Secure Mailer license must be distributed with this software. /* AUTHOR(S) /* Wietse Venema /* IBM T.J. Watson Research /* P.O. Box 704 /* Yorktown Heights, NY 10598, USA /* /* Wietse Venema /* Google, Inc. /* 111 8th Avenue /* New York, NY 10011, USA /*--*/ /* * System library. */ #include #include #include #include /* memmove() */ /* * Utility library. */ #include #include #include #include /* nbbio_event - non-blocking event handler */ static void nbbio_event(int event, void *context) { const char *myname = "nbbio_event"; NBBIO *np = (NBBIO *) context; ssize_t count; switch (event) { /* * Read data into the read buffer. Leave it up to the application to * drain the buffer until it is empty. */ case EVENT_READ: if (np->read_pend == np->bufsize) msg_panic("%s: socket fd=%d: read buffer is full", myname, np->fd); if (np->read_pend < 0 || np->read_pend > np->bufsize) msg_panic("%s: socket fd=%d: bad pending read count %ld", myname, np->fd, (long) np->read_pend); count = read(np->fd, np->read_buf + np->read_pend, np->bufsize - np->read_pend); if (count > 0) { np->read_pend += count; if (msg_verbose) msg_info("%s: read %ld on %s fd=%d", myname, (long) count, np->label, np->fd); } else if (count == 0) { np->flags |= NBBIO_FLAG_EOF; if (msg_verbose) msg_info("%s: read EOF on %s fd=%d", myname, np->label, np->fd); } else { if (errno == EAGAIN) msg_warn("%s: read() returns EAGAIN on readable descriptor", myname); np->flags |= NBBIO_FLAG_ERROR; if (msg_verbose) msg_info("%s: read %s fd=%d: %m", myname, np->label, np->fd); } break; /* * Drain data from the output buffer. Notify the application * whenever some bytes are written. * * XXX Enforce a total time limit to ensure liveness when a hostile * receiver sets a very small TCP window size. */ case EVENT_WRITE: if (np->write_pend == 0) msg_panic("%s: socket fd=%d: empty write buffer", myname, np->fd); if (np->write_pend < 0 || np->write_pend > np->bufsize) msg_panic("%s: socket fd=%d: bad pending write count %ld", myname, np->fd, (long) np->write_pend); count = write(np->fd, np->write_buf, np->write_pend); if (count > 0) { np->write_pend -= count; if (np->write_pend > 0) memmove(np->write_buf, np->write_buf + count, np->write_pend); } else { if (errno == EAGAIN) msg_warn("%s: write() returns EAGAIN on writable descriptor", myname); np->flags |= NBBIO_FLAG_ERROR; if (msg_verbose) msg_info("%s: write %s fd=%d: %m", myname, np->label, np->fd); } break; /* * Something bad happened. */ case EVENT_XCPT: np->flags |= NBBIO_FLAG_ERROR; if (msg_verbose) msg_info("%s: error on %s fd=%d: %m", myname, np->label, np->fd); break; /* * Something good didn't happen. */ case EVENT_TIME: np->flags |= NBBIO_FLAG_TIMEOUT; if (msg_verbose) msg_info("%s: %s timeout on %s fd=%d", myname, NBBIO_OP_NAME(np), np->label, np->fd); break; default: msg_panic("%s: unknown event %d", myname, event); } /* * Application notification. The application will check for any error * flags, copy application data from or to our buffer pair, and decide * what I/O happens next. */ np->action(event, np->context); } /* nbbio_enable_read - enable reading from socket into buffer */ void nbbio_enable_read(NBBIO *np, int timeout) { const char *myname = "nbbio_enable_read"; /* * Sanity checks. */ if (np->flags & (NBBIO_MASK_ACTIVE & ~NBBIO_FLAG_READ)) msg_panic("%s: socket fd=%d is enabled for %s", myname, np->fd, NBBIO_OP_NAME(np)); if (timeout <= 0) msg_panic("%s: socket fd=%d: bad timeout %d", myname, np->fd, timeout); if (np->read_pend >= np->bufsize) msg_panic("%s: socket fd=%d: read buffer is full", myname, np->fd); /* * Enable events. */ if ((np->flags & NBBIO_FLAG_READ) == 0) { event_enable_read(np->fd, nbbio_event, (void *) np); np->flags |= NBBIO_FLAG_READ; } event_request_timer(nbbio_event, (void *) np, timeout); } /* nbbio_enable_write - enable writing from buffer to socket */ void nbbio_enable_write(NBBIO *np, int timeout) { const char *myname = "nbbio_enable_write"; /* * Sanity checks. */ if (np->flags & (NBBIO_MASK_ACTIVE & ~NBBIO_FLAG_WRITE)) msg_panic("%s: socket fd=%d is enabled for %s", myname, np->fd, NBBIO_OP_NAME(np)); if (timeout <= 0) msg_panic("%s: socket fd=%d: bad timeout %d", myname, np->fd, timeout); if (np->write_pend <= 0) msg_panic("%s: socket fd=%d: empty write buffer", myname, np->fd); /* * Enable events. */ if ((np->flags & NBBIO_FLAG_WRITE) == 0) { event_enable_write(np->fd, nbbio_event, (void *) np); np->flags |= NBBIO_FLAG_WRITE; } event_request_timer(nbbio_event, (void *) np, timeout); } /* nbbio_disable_readwrite - disable read/write/timer events */ void nbbio_disable_readwrite(NBBIO *np) { np->flags &= ~NBBIO_MASK_ACTIVE; event_disable_readwrite(np->fd); event_cancel_timer(nbbio_event, (void *) np); } /* nbbio_slumber - disable read/write events, keep timer */ void nbbio_slumber(NBBIO *np, int timeout) { np->flags &= ~NBBIO_MASK_ACTIVE; event_disable_readwrite(np->fd); event_request_timer(nbbio_event, (void *) np, timeout); } /* nbbio_create - create socket buffer */ NBBIO *nbbio_create(int fd, ssize_t bufsize, const char *label, NBBIO_ACTION action, void *context) { NBBIO *np; /* * Sanity checks. */ if (fd < 0) msg_panic("nbbio_create: bad file descriptor: %d", fd); if (bufsize <= 0) msg_panic("nbbio_create: bad buffer size: %ld", (long) bufsize); /* * Create a new buffer pair. */ np = (NBBIO *) mymalloc(sizeof(*np)); np->fd = fd; np->bufsize = bufsize; np->label = mystrdup(label); np->action = action; np->context = context; np->flags = 0; np->read_buf = mymalloc(bufsize); np->read_pend = 0; np->write_buf = mymalloc(bufsize); np->write_pend = 0; return (np); } /* nbbio_free - destroy socket buffer */ void nbbio_free(NBBIO *np) { nbbio_disable_readwrite(np); (void) close(np->fd); myfree(np->label); myfree(np->read_buf); myfree(np->write_buf); myfree((void *) np); }