summaryrefslogtreecommitdiffstats
path: root/src/lib/ostream-wrapper.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/ostream-wrapper.c')
-rw-r--r--src/lib/ostream-wrapper.c1259
1 files changed, 1259 insertions, 0 deletions
diff --git a/src/lib/ostream-wrapper.c b/src/lib/ostream-wrapper.c
new file mode 100644
index 0000000..dfd6699
--- /dev/null
+++ b/src/lib/ostream-wrapper.c
@@ -0,0 +1,1259 @@
+/* Copyright (c) 2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "ostream-private.h"
+
+#include "ostream-wrapper.h"
+
+static int wrapper_ostream_flush(struct ostream_private *stream);
+static void
+wrapper_ostream_switch_ioloop_to(struct ostream_private *stream,
+ struct ioloop *ioloop);
+
+/*
+ * Buffer
+ */
+
+/* Determine the optimum buffer size for the wrapper stream itself. */
+static inline size_t
+wrapper_ostream_optimal_size(struct wrapper_ostream *wostream)
+{
+ size_t optimal_size = wostream->ostream.max_buffer_size;
+
+ if (wostream->output != NULL) {
+ optimal_size = I_MIN(
+ o_stream_get_max_buffer_size(wostream->output),
+ optimal_size);
+ }
+ if (optimal_size == SIZE_MAX)
+ optimal_size = IO_BLOCK_SIZE;
+
+ return optimal_size;
+}
+
+/* Return the current size of the wrapper output stream buffer. */
+static inline size_t wrapper_ostream_size(struct wrapper_ostream *wostream)
+{
+ buffer_t *buffer = wostream->buffer;
+
+ if (buffer == NULL)
+ return 0;
+ return buffer->used;
+}
+
+/* Return TRUE when the wrapper stream's internal buffer is empty. */
+static inline bool wrapper_ostream_is_empty(struct wrapper_ostream *wostream)
+{
+ return (wrapper_ostream_size(wostream) == 0);
+}
+/* Return TRUE when the wrapper stream's internal buffer is filled to the
+ maximum. */
+static inline bool wrapper_ostream_is_full(struct wrapper_ostream *wostream)
+{
+ return (wrapper_ostream_size(wostream) >=
+ wostream->ostream.max_buffer_size);
+}
+/* Return TRUE when the wrapper stream's internal buffer is filled at or beyond
+ the optimum. */
+static inline bool wrapper_ostream_is_filled(struct wrapper_ostream *wostream)
+{
+ return (wrapper_ostream_size(wostream) >=
+ wrapper_ostream_optimal_size(wostream));
+}
+
+/*
+ * Underlying output
+ */
+
+/* Handle error in the underlying output stream (the parent). */
+static void
+wrapper_ostream_copy_parent_error(struct wrapper_ostream *wostream)
+{
+ i_assert(wostream->output != NULL);
+ i_assert(wostream->output->stream_errno != 0);
+
+ wostream->ostream.ostream.stream_errno =
+ wostream->output->stream_errno;
+ wostream->ostream.ostream.overflow =
+ wostream->output->overflow;
+}
+
+static void
+wrapper_ostream_handle_parent_error(struct wrapper_ostream *wostream)
+{
+ wrapper_ostream_copy_parent_error(wostream);
+
+ if (wostream->output->closed)
+ o_stream_close(&wostream->ostream.ostream);
+
+ if (wostream->output_error != NULL)
+ wostream->output_error(wostream);
+}
+
+static void wrapper_ostream_closed(struct wrapper_ostream *wostream)
+{
+ wostream->ostream.ostream.closed = TRUE;
+}
+
+/* Drop the underlying output. */
+static void wrapper_ostream_output_close(struct wrapper_ostream *wostream)
+{
+ o_stream_unref(&wostream->output);
+ wostream->output_finished = TRUE;
+ wostream->output_closed = TRUE;
+ wostream->output_closed_api = TRUE;
+}
+
+/* Method calls */
+
+/* Called when the implementation should start making the parent output stream
+ available, e.g. connect to the server (see output_start() method).
+ */
+static void wrapper_ostream_output_start(struct wrapper_ostream *wostream)
+{
+ if (wostream->output_started)
+ return;
+ wostream->output_started = TRUE;
+ if (wostream->output_start != NULL)
+ wostream->output_start(wostream);
+}
+
+/* Returns TRUE when the output is ready for data (see output_ready() method).
+ */
+static bool wrapper_ostream_output_ready(struct wrapper_ostream *wostream)
+{
+ i_assert(wostream->output_ready != NULL);
+ return wostream->output_ready(wostream);
+}
+
+/* Finish the underlying output (see output_finish() method).*/
+static int wrapper_ostream_output_finish(struct wrapper_ostream *wostream)
+{
+ i_assert(wostream->output_finish != NULL);
+ return wostream->output_finish(wostream);
+}
+
+/* Called when the wrapper ostream does not need write to parent output stream.
+ (see output_halt() method).
+ */
+static void wrapper_ostream_output_halt(struct wrapper_ostream *wostream)
+{
+ if (wostream->output_closed)
+ return;
+ if (wostream->output_halt != NULL)
+ wostream->output_halt(wostream);
+}
+
+/* Called when the wrapper ostream has data available for the parent output and
+ wants wrapper_ostream_continue() to be called when the parent stream is
+ writeable (see output_resume() method). */
+static void wrapper_ostream_output_resume(struct wrapper_ostream *wostream)
+{
+ if (wostream->output_closed)
+ return;
+ if (wostream->output_resume != NULL)
+ wostream->output_resume(wostream);
+}
+
+/* Update any timeouts for the underlying (parent) output (see
+ output_update_timeouts() method). */
+static void
+wrapper_ostream_output_update_timeouts(struct wrapper_ostream *wostream)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ bool sender_blocking;
+
+ if (wostream->output_closed)
+ return;
+ if (wostream->output_update_timeouts == NULL)
+ return;
+
+ sender_blocking = (!stream->finished &&
+ (wrapper_ostream_is_empty(wostream) ||
+ (stream->corked &&
+ !wrapper_ostream_is_filled(wostream))));
+ wostream->output_update_timeouts(wostream, sender_blocking);
+}
+
+/*
+ * Wrapper
+ */
+
+/* Halt/resume the underlying output based on the state of the wrapper stream.
+ */
+static void
+wrapper_ostream_output_manage(struct wrapper_ostream *wostream, bool sending)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ bool must_flush, no_data;
+
+ if (wostream->output_closed)
+ return;
+
+ must_flush = (sending || stream->finished || wostream->flush_pending);
+ no_data = (wrapper_ostream_is_empty(wostream) ||
+ (stream->corked && !wrapper_ostream_is_filled(wostream)));
+
+ if (!must_flush && (no_data || stream->ostream.closed))
+ wrapper_ostream_output_halt(wostream);
+ else {
+ wrapper_ostream_output_resume(wostream);
+ if (wostream->output != NULL && must_flush)
+ o_stream_set_flush_pending(wostream->output, TRUE);
+ }
+}
+
+/* Handle any pending error by making it available to the application through
+ the output stream API. */
+static int
+wrapper_ostream_handle_pending_error(struct wrapper_ostream *wostream)
+{
+ struct ostream_private *stream = &wostream->ostream;
+
+ if (wostream->pending_errno != 0) {
+ if (wostream->pending_error != NULL) {
+ io_stream_set_error(&stream->iostream,
+ "%s", wostream->pending_error);
+ }
+ stream->ostream.stream_errno = wostream->pending_errno;
+ wostream->pending_errno = 0;
+ wostream->returned_error = TRUE;
+ wrapper_ostream_closed(wostream);
+ i_free_and_null(wostream->pending_error);
+ return -1;
+ }
+ return 0;
+}
+
+/* Called when the wrapper stream is first finished using o_stream_finish(). */
+static int wrapper_ostream_finish(struct wrapper_ostream *wostream)
+{
+ int ret;
+
+ if (wostream->output_closed) {
+ if (wrapper_ostream_handle_pending_error(wostream) < 0)
+ return -1;
+ return 1;
+ }
+
+ if (!wrapper_ostream_output_ready(wostream)) {
+ return 0;
+ }
+
+ wostream->output_finished = TRUE;
+ if (wostream->output != NULL) {
+ if (o_stream_uncork_flush(wostream->output) < 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ o_stream_unref(&wostream->output);
+ return -1;
+ }
+ }
+
+ /* Finished sending payload; now also finish the underlying output. */
+ ret = wrapper_ostream_output_finish(wostream);
+ if (ret == 0)
+ return ret;
+ if (ret < 0 && wostream->ostream.ostream.stream_errno != 0) {
+ wrapper_ostream_copy_parent_error(wostream);
+ return -1;
+ }
+ if (wrapper_ostream_handle_pending_error(wostream) < 0 || ret < 0) {
+ i_assert(wostream->ostream.ostream.stream_errno != 0);
+ return -1;
+ }
+ wrapper_ostream_output_close(wostream);
+ return 1;
+}
+
+/* Wait in ioloop until underlying (parent) output can be flushed. This is
+ called only when the wrapper stream is blocking. */
+static int
+wrapper_ostream_flush_wait(struct wrapper_ostream *wostream)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ struct ioloop *ioloop, *prev_ioloop;
+ bool was_corked = FALSE;
+
+ wrapper_ostream_output_manage(wostream, !wostream->flushing);
+
+ /* Cannot be already waiting */
+ i_assert(!wostream->flush_waiting);
+ i_assert(wostream->flush_ioloop == NULL);
+
+ i_assert(wostream->wait_begin != NULL);
+ i_assert(wostream->wait_end != NULL);
+
+ if (wostream->output != NULL && o_stream_is_corked(wostream->output)) {
+ /* Make sure parent is uncorked here to make sure output IO is
+ active. */
+ if (o_stream_uncork_flush(wostream->output) < 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ return -1;
+ }
+ was_corked = TRUE;
+ }
+
+ wostream->flush_ioloop = ioloop = io_loop_create();
+ prev_ioloop = wostream->wait_begin(wostream, ioloop);
+ o_stream_switch_ioloop_to(&wostream->ostream.ostream, ioloop);
+
+ /* Either we're waiting for network I/O or we're getting out of a
+ callback using timeout_add_short(0) */
+ i_assert(io_loop_have_ios(ioloop) ||
+ io_loop_have_immediate_timeouts(ioloop));
+
+ wostream->flush_waiting = TRUE;
+ do {
+ e_debug(wostream->event, "Waiting for output flush");
+ io_loop_run(ioloop);
+ } while (wostream->flush_waiting);
+
+ e_debug(wostream->event, "Can now flush output");
+
+ o_stream_switch_ioloop_to(&wostream->ostream.ostream, prev_ioloop);
+ wostream->wait_end(wostream, prev_ioloop);
+ io_loop_destroy(&ioloop);
+ wostream->flush_ioloop = NULL;
+
+ if (stream->ostream.blocking)
+ wrapper_ostream_output_halt(wostream);
+
+ if (was_corked && wostream->output != NULL)
+ o_stream_cork(wostream->output);
+
+ if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+ /* Stream already hit an error */
+ return -1;
+ }
+ return 0;
+}
+
+/* Try to flush the underlying (parent) output. */
+static int wrapper_ostream_flush_parent(struct wrapper_ostream *wostream)
+{
+ struct ostream *parent;
+
+ if (wostream->output_closed) {
+ /* Output already dropped; nothing to flush */
+ return 1;
+ }
+ if (!wrapper_ostream_output_ready(wostream)) {
+ /* There is no parent ostream yet */
+ return 1;
+ }
+
+ parent = wostream->output;
+ if (parent == NULL) {
+ /* There is no parent ostream anymore */
+ i_assert(wostream->buffer == NULL ||
+ wostream->buffer->used == 0);
+ return 1;
+ }
+ if (o_stream_get_buffer_used_size(parent) >= IO_BLOCK_SIZE) {
+ /* We already have quite a lot of data in parent stream.
+ unless we can flush it, don't add any more to it or we
+ could keep wasting memory by just increasing the buffer
+ size all the time. */
+ if (o_stream_flush(parent) < 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ return -1;
+ }
+ if (o_stream_get_buffer_used_size(parent) >= IO_BLOCK_SIZE)
+ return 0;
+ }
+
+ return 1;
+}
+
+/* Try to write data to underlying (parent) output. */
+static ssize_t
+wrapper_ostream_writev(struct wrapper_ostream *wostream,
+ const struct const_iovec *iov, unsigned int iov_count)
+{
+ struct ostream *parent = wostream->output;
+ ssize_t sent;
+
+ i_assert(!wostream->output_closed);
+ i_assert(!wostream->output_finished);
+
+ if (!wrapper_ostream_output_ready(wostream))
+ return 0;
+
+ /* Send more data to parent ostream */
+ i_assert(parent != NULL);
+ o_stream_set_max_buffer_size(parent, IO_BLOCK_SIZE);
+ sent = o_stream_sendv(parent, iov, iov_count);
+ o_stream_set_max_buffer_size(parent, SIZE_MAX);
+ if (sent < 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ return -1;
+ }
+
+ return sent;
+}
+
+/* Try to write data to underlying (parent) output and implement blocking
+ behavior by running an ioloop. */
+static ssize_t
+wrapper_ostream_writev_full(struct wrapper_ostream *wostream,
+ const struct const_iovec *iov,
+ unsigned int iov_count)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ unsigned int i;
+ ssize_t sent, sent_total;
+
+ if (!stream->ostream.blocking) {
+ /* Not blocking; send what we can */
+ return wrapper_ostream_writev(wostream, iov, iov_count);
+ }
+
+ /* Blocking; loop and wait until all is sent */
+
+ sent_total = 0;
+ for (;;) {
+ struct const_iovec niov;
+ size_t iov_pos;
+
+ i_assert(iov_count > 0);
+
+ /* Send iovec with complete entries */
+ sent = wrapper_ostream_writev(wostream, iov, iov_count);
+ if (sent < 0)
+ return -1;
+ if (sent == 0) {
+ if (wrapper_ostream_flush_wait(wostream) < 0)
+ return -1;
+ i_assert(!wostream->output_closed);
+ continue;
+ }
+
+ /* Determine what was sent */
+ sent_total += sent;
+ iov_pos = (size_t)sent;
+ for (i = 0; i < iov_count && iov_pos >= iov[i].iov_len; i++)
+ iov_pos -= iov[i].iov_len;
+ if (i >= iov_count) {
+ /* All sent */
+ i_assert(iov_pos == 0);
+ return sent_total;
+ }
+
+ iov = &iov[i];
+ iov_count -= i;
+ if (iov_pos == 0) {
+ /* Nicely sent until an iovec boundary */
+ continue;
+ }
+
+ /* Send partial iovec entry */
+ i_zero(&niov);
+ niov = iov[0];
+ i_assert(iov_pos < niov.iov_len);
+ niov.iov_base = CONST_PTR_OFFSET(niov.iov_base, iov_pos);
+ niov.iov_len -= iov_pos;
+
+ while (niov.iov_len > 0) {
+ sent = wrapper_ostream_writev(wostream, &niov, 1);
+ if (sent < 0)
+ return sent;
+ if (sent == 0) {
+ if (wrapper_ostream_flush_wait(wostream) < 0)
+ return -1;
+ i_assert(!wostream->output_closed);
+ continue;
+ }
+ i_assert((size_t)sent <= niov.iov_len);
+ niov.iov_base = CONST_PTR_OFFSET(niov.iov_base, sent);
+ niov.iov_len -= sent;
+ sent_total += sent;
+ }
+
+ if (iov_count == 1) {
+ i_assert(sent_total != 0);
+ return sent_total;
+ }
+
+ /* Now sent until an iovec boundary */
+ iov = &iov[1];
+ iov_count--;
+ }
+
+ i_unreached();
+}
+
+/* Try to flush wrapper stream's buffer content. */
+static int wrapper_ostream_flush_buffer(struct wrapper_ostream *wostream)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ buffer_t *buffer = wostream->buffer;
+ struct const_iovec iov;
+ ssize_t sent;
+
+ if (wostream->output_closed) {
+ /* Ostream already finished */
+ i_assert(wostream->ostream.finished);
+ return 1;
+ }
+
+ if (buffer == NULL || buffer->used == 0) {
+ /* Buffer already empty */
+ return 1;
+ }
+
+ do {
+ /* Try to flush whole buffer */
+ iov.iov_base = buffer->data;
+ iov.iov_len = buffer->used;
+ sent = wrapper_ostream_writev_full(wostream, &iov, 1);
+ if (sent < 0)
+ return -1;
+
+ /* Remove sent data from buffer */
+ buffer_delete(buffer, 0, sent);
+
+ /* More aggressively flush the buffer when this stream is
+ finished
+ */
+ } while (wostream->ostream.finished && sent > 0 && buffer->used > 0);
+
+ if (buffer->used == 0 ||
+ (stream->corked && !wrapper_ostream_is_filled(wostream)))
+ wrapper_ostream_output_halt(wostream);
+
+ return (buffer->used == 0 ? 1 : 0);
+}
+
+static int wrapper_ostream_flush_real(struct wrapper_ostream *wostream)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ int ret;
+
+ if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+ /* Stream already hit an error */
+ return -1;
+ }
+ wrapper_ostream_output_start(wostream);
+
+ if ((ret = wrapper_ostream_flush_parent(wostream)) <= 0) {
+ /* Try to flush parent stream first to make room for more
+ data */
+ return ret;
+ }
+ if ((ret = wrapper_ostream_flush_buffer(wostream)) <= 0) {
+ /* Try sending data we already buffered */
+ return ret;
+ }
+
+ if (wostream->output_closed || wostream->output_finished) {
+ /* Already finished the ostream */
+ i_assert(stream->finished);
+ return 1;
+ }
+
+ if (!wrapper_ostream_output_ready(wostream)) {
+ return ((wostream->buffer == NULL ||
+ wostream->buffer->used == 0) ? 1 : 0);
+ }
+
+ if (wostream->output == NULL) {
+ i_assert(wrapper_ostream_is_empty(wostream));
+ ret = 1;
+ } else {
+ ret = o_stream_flush(wostream->output);
+ if (ret < 0)
+ wrapper_ostream_handle_parent_error(wostream);
+ }
+
+ return ret;
+}
+
+static bool
+wrapper_ostream_send_prepare(struct wrapper_ostream *wostream, size_t size)
+{
+ struct ostream_private *stream = &wostream->ostream;
+
+ if (wostream->output_closed || wostream->output_started)
+ return TRUE;
+
+ if (stream->corked && !stream->finished) {
+ if (wostream->buffer == NULL)
+ return FALSE;
+ if ((wostream->buffer->used + size) < stream->max_buffer_size)
+ return FALSE;
+ }
+ wrapper_ostream_output_start(wostream);
+ return TRUE;
+}
+
+/* Add data to the wrapper stream's internal buffer. */
+static size_t
+wrapper_ostream_add(struct wrapper_ostream *wostream,
+ const struct const_iovec *iov,
+ unsigned int iov_count, unsigned int *iov_idx,
+ size_t *iov_idx_pos)
+{
+ buffer_t *buffer = wostream->buffer;
+ unsigned int i;
+ size_t added = 0;
+
+ /* Create buffer */
+ if (buffer == NULL) {
+ wostream->buffer = buffer =
+ buffer_create_dynamic(default_pool, IO_BLOCK_SIZE);
+ }
+
+ for (i = *iov_idx; i < iov_count; i++) {
+ size_t iov_len, iov_add, space;
+ const unsigned char *iov_data;
+
+ iov_len = iov[i].iov_len;
+ iov_data = iov[i].iov_base;
+ space = wostream->ostream.max_buffer_size - buffer->used;
+
+ i_assert(*iov_idx_pos < iov_len);
+ if (*iov_idx_pos > 0) {
+ iov_len -= *iov_idx_pos;
+ iov_data += *iov_idx_pos;
+ }
+ iov_add = I_MIN(space, iov_len);
+ buffer_append(buffer, iov_data, iov_add);
+ added += iov_add;
+ if (iov_add < iov_len) {
+ /* Buffer is full */
+ *iov_idx_pos += iov_add;
+ break;
+ }
+ *iov_idx_pos = 0;
+ }
+
+ *iov_idx = i;
+ return added;
+}
+
+static ssize_t
+wrapper_ostream_sendv_real(struct wrapper_ostream *wostream,
+ const struct const_iovec *iov,
+ unsigned int iov_count)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ ssize_t written;
+ size_t size, iov_pos, sent;
+ unsigned int i;
+ int ret;
+
+ if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+ /* Stream already hit an error */
+ return -1;
+ }
+
+ i_assert(!wostream->output_closed);
+ i_assert(!wostream->output_finished);
+
+ /* Determine total size of data to send */
+ size = 0;
+ for (i = 0; i < iov_count; i++)
+ size += iov[i].iov_len;
+
+ /* Flush buffer if required */
+ if (!wrapper_ostream_is_empty(wostream) &&
+ (!stream->corked || wrapper_ostream_is_filled(wostream)) &&
+ wrapper_ostream_send_prepare(wostream, size) &&
+ wrapper_ostream_flush_buffer(wostream) < 0)
+ return -1;
+
+ if (!stream->corked && wrapper_ostream_is_full(wostream)) {
+ /* No space in buffer for more data */
+ i_assert(!stream->ostream.blocking);
+ return 0;
+ }
+
+ /* Send data to connection directly if possible */
+ i = 0;
+ sent = iov_pos = 0;
+ if (wrapper_ostream_is_empty(wostream) &&
+ (!stream->corked ||
+ size >= wrapper_ostream_optimal_size(wostream)) &&
+ wrapper_ostream_send_prepare(wostream, size)) {
+ written = wrapper_ostream_writev_full(wostream, iov, iov_count);
+ if (written < 0)
+ return -1;
+ sent += written;
+ if (sent == size) {
+ /* All sent */
+ return (ssize_t)sent;
+ }
+
+ i_assert(!stream->ostream.blocking);
+
+ /* Determine send position */
+ iov_pos = sent;
+ for (; i < iov_count && iov_pos >= iov[i].iov_len; i++)
+ iov_pos -= iov[i].iov_len;
+ i_assert(i < iov_count);
+ }
+
+ /* Fill buffer with remainder that was not sent directly */
+ for (;;) {
+ sent += wrapper_ostream_add(wostream, iov, iov_count,
+ &i, &iov_pos);
+ i_assert(sent <= size);
+
+ if (!stream->corked || !wrapper_ostream_is_filled(wostream))
+ break;
+
+ /* Flush corked full buffer */
+ wrapper_ostream_output_start(wostream);
+ if ((ret = wrapper_ostream_flush_buffer(wostream)) < 0)
+ return -1;
+ if (ret == 0)
+ break;
+ }
+
+ i_assert(!stream->ostream.blocking || sent == size);
+ return sent;
+}
+
+/* Run the flush callback for the wrapper stream. */
+static int wrapper_ostream_callback(struct wrapper_ostream *wostream)
+{
+ int ret;
+
+ if (wostream->ostream.callback != NULL) {
+ if (wostream->callback_pre != NULL)
+ wostream->callback_pre(wostream);
+ ret = wostream->ostream.callback(wostream->ostream.context);
+ if (wostream->callback_post != NULL)
+ wostream->callback_post(wostream);
+ } else {
+ ret = wrapper_ostream_flush(&wostream->ostream);
+ }
+ return ret;
+}
+
+/* Handle an event by running wrapper_ostream_continue(). This called from
+ ioloop on a zero timeout. */
+static void wrapper_ostream_handle_event(struct wrapper_ostream *wostream)
+{
+ timeout_remove(&wostream->to_event);
+ (void)wrapper_ostream_continue(wostream);
+}
+
+/*
+ * iostream methods
+ */
+
+static void
+wrapper_ostream_close(struct iostream_private *stream,
+ bool close_parent ATTR_UNUSED)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream.iostream);
+
+ timeout_remove(&wostream->to_event);
+ wrapper_ostream_output_close(wostream);
+ if (wostream->close != NULL)
+ wostream->close(wostream);
+}
+
+static void wrapper_ostream_destroy(struct iostream_private *stream)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream.iostream);
+
+ timeout_remove(&wostream->to_event);
+ i_free(wostream->pending_error);
+
+ if (wostream->destroy != NULL)
+ wostream->destroy(wostream);
+ buffer_free(&wostream->buffer);
+ o_stream_unref(&wostream->output);
+ event_unref(&wostream->event);
+}
+
+/*
+ * ostream methods
+ */
+
+static void wrapper_ostream_cork(struct ostream_private *stream, bool set)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream);
+ int ret;
+
+ if (stream->ostream.closed || wostream->pending_errno != 0)
+ return;
+
+ if (wostream->output_closed) {
+ i_assert(wostream->ostream.finished);
+ return;
+ }
+
+ if (set) {
+ if (wostream->output != NULL)
+ o_stream_cork(wostream->output);
+ } else {
+ /* Buffer flushing might close the stream */
+ ret = wrapper_ostream_flush_buffer(wostream);
+ stream->last_errors_not_checked = TRUE;
+
+ if (wostream->output != NULL) {
+ if (o_stream_uncork_flush(wostream->output) < 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ ret = -1;
+ }
+ }
+ if ((ret == 0 || wostream->flush_pending) &&
+ !stream->ostream.closed)
+ wrapper_ostream_output_resume(wostream);
+ }
+ stream->corked = set;
+
+ wrapper_ostream_output_manage(wostream, FALSE);
+}
+
+static ssize_t
+wrapper_ostream_sendv(struct ostream_private *stream,
+ const struct const_iovec *iov, unsigned int iov_count)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream);
+ bool must_uncork = FALSE;
+ ssize_t sret;
+
+ if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+ /* Stream already hit an error */
+ return -1;
+ }
+
+ /* Cork parent ostream if necessary */
+ if (!wostream->output_closed && wostream->output != NULL &&
+ !o_stream_is_corked(wostream->output)) {
+ o_stream_cork(wostream->output);
+ must_uncork = TRUE;
+ }
+
+ sret = wrapper_ostream_sendv_real(wostream, iov, iov_count);
+ if (sret > 0)
+ stream->ostream.offset += (ssize_t)sret;
+
+ /* Uncork the parent ostream */
+ if (must_uncork && !wostream->output_closed &&
+ wostream->output != NULL) {
+ if (o_stream_uncork_flush(wostream->output) < 0 &&
+ sret >= 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ sret = -1;
+ }
+ }
+
+ if (sret >= 0) {
+ wrapper_ostream_output_update_timeouts(wostream);
+ if (!stream->ostream.blocking)
+ wrapper_ostream_output_manage(wostream, FALSE);
+ }
+
+ return sret;
+}
+
+static int wrapper_ostream_flush(struct ostream_private *stream)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream);
+ struct ostream *ostream = &stream->ostream;
+ bool must_uncork = FALSE;
+ int ret;
+
+ if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+ /* Stream already hit an error */
+ return -1;
+ }
+
+ if (wostream->output_closed) {
+ if (!stream->finished || !wrapper_ostream_is_empty(wostream)) {
+ stream->ostream.stream_errno = EPIPE;
+ return -1;
+ }
+ /* Already finished the ostream */
+ return 1;
+ }
+
+ if (wostream->flushing) {
+ /* Prevent recursion while finishing output */
+ return 1;
+ }
+ wostream->flushing = TRUE;
+ o_stream_ref(ostream);
+
+ /* Cork parent ostream if necessary */
+ if (wostream->output != NULL && !o_stream_is_corked(wostream->output)) {
+ o_stream_cork(wostream->output);
+ must_uncork = TRUE;
+ }
+
+ /* If blocking: loop until all is flushed; otherwise try once */
+ do {
+ /* Try to flush */
+ if ((ret = wrapper_ostream_flush_real(wostream)) < 0) {
+ ret = -1;
+ break;
+ }
+
+ if (ret == 0 && stream->ostream.blocking) {
+ /* Block until we can write more */
+ if (wrapper_ostream_flush_wait(wostream) < 0) {
+ ret = -1;
+ break;
+ }
+ }
+
+ if (stream->ostream.closed) {
+ /* Ostream was closed in the mean time */
+ ret = -1;
+ break;
+ }
+
+ if (wostream->output_closed) {
+ /* Already finished the ostream */
+ i_assert(stream->finished);
+ ret = 1;
+ break;
+ }
+ } while (ret == 0 && stream->ostream.blocking);
+
+ if (ret > 0 && stream->finished) {
+ /* This was an o_stream_finish() call or subsequent flush */
+ i_assert(wrapper_ostream_is_empty(wostream));
+ while ((ret = wrapper_ostream_finish(wostream)) == 0) {
+ if (!stream->ostream.blocking) {
+ /* Not yet finished completely */
+ break;
+ }
+ /* Block until we can write more */
+ if (wrapper_ostream_flush_wait(wostream) < 0) {
+ ret = -1;
+ break;
+ }
+ }
+ }
+ wrapper_ostream_output_update_timeouts(wostream);
+ wostream->flushing = FALSE;
+
+ if (ret >= 0 && !ostream->blocking)
+ wrapper_ostream_output_manage(wostream, FALSE);
+
+ if (wostream->output_closed) {
+ i_assert(ret < 0 || ostream->stream_errno == 0 ||
+ ostream->closed);
+ i_assert(ret >= 0 || ostream->stream_errno != 0);
+ o_stream_unref(&ostream);
+ return (ret >= 0 ? 1 : -1);
+ }
+
+ if (!must_uncork || wostream->output == NULL) {
+ /* Nothing */
+ } else if (ret >= 0) {
+ /* Uncork the parent ostream */
+ if (o_stream_uncork_flush(wostream->output) < 0) {
+ wrapper_ostream_handle_parent_error(wostream);
+ ret = -1;
+ }
+ } else {
+ o_stream_uncork(wostream->output);
+ }
+
+ i_assert(ret >= 0 || ostream->stream_errno != 0);
+ o_stream_unref(&ostream);
+ return ret;
+}
+
+static void
+wrapper_ostream_set_flush_callback(struct ostream_private *stream,
+ stream_flush_callback_t *callback,
+ void *context)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream);
+
+ stream->callback = callback;
+ stream->context = context;
+
+ if (!stream->ostream.blocking && stream->callback == NULL) {
+ /* Application is currently not interested in flush events and
+ that includes request events like errors. */
+ timeout_remove(&wostream->to_event);
+ } else if (wostream->pending_error != NULL &&
+ wostream->to_event == NULL) {
+ /* Schedule flush callback to notify application of events */
+ wostream->to_event = timeout_add_short(
+ 0, wrapper_ostream_handle_event, wostream);
+ }
+}
+
+static void
+wrapper_ostream_flush_pending(struct ostream_private *stream, bool set)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream);
+
+ wostream->flush_pending = set;
+ if (!set)
+ return;
+ if (wostream->output_closed) {
+ i_assert(wostream->ostream.ostream.closed);
+ return;
+ }
+ if (wostream->to_event == NULL) {
+ wostream->to_event = timeout_add_short(
+ 0, wrapper_ostream_handle_event, wostream);
+ }
+}
+
+static size_t
+wrapper_ostream_get_buffer_used_size(const struct ostream_private *stream)
+{
+ const struct wrapper_ostream *wostream =
+ container_of(stream, const struct wrapper_ostream, ostream);
+ size_t size = 0;
+
+ if (wostream->buffer != NULL)
+ size += wostream->buffer->used;
+ if (wostream->output != NULL)
+ size += o_stream_get_buffer_used_size(wostream->output);
+ return size;
+}
+
+static size_t
+wrapper_ostream_get_buffer_avail_size(const struct ostream_private *stream)
+{
+ const struct wrapper_ostream *wostream =
+ container_of(stream, const struct wrapper_ostream, ostream);
+ size_t size = 0;
+
+ if (wostream->ostream.max_buffer_size == SIZE_MAX)
+ return SIZE_MAX;
+
+ if (wostream->buffer == NULL)
+ size = wostream->ostream.max_buffer_size;
+ else if (wostream->buffer->used < wostream->ostream.max_buffer_size) {
+ size = (wostream->ostream.max_buffer_size -
+ wostream->buffer->used);
+ }
+
+ if (wostream->output != NULL)
+ size += o_stream_get_buffer_avail_size(wostream->output);
+
+ return size;
+}
+
+static void
+wrapper_ostream_switch_ioloop_to(struct ostream_private *stream,
+ struct ioloop *ioloop)
+{
+ struct wrapper_ostream *wostream =
+ container_of(stream, struct wrapper_ostream, ostream);
+
+ if (wostream->flush_ioloop != ioloop &&
+ wostream->switch_ioloop_to != NULL)
+ wostream->switch_ioloop_to(wostream, ioloop);
+
+ if (wostream->to_event != NULL) {
+ wostream->to_event =
+ io_loop_move_timeout_to(ioloop, &wostream->to_event);
+ }
+}
+
+/*
+ * API
+ */
+
+struct ostream *
+wrapper_ostream_create(struct wrapper_ostream *wostream,
+ size_t max_buffer_size, bool blocking,
+ struct event *event)
+{
+ wostream->ostream.iostream.close = wrapper_ostream_close;
+ wostream->ostream.iostream.destroy = wrapper_ostream_destroy;
+
+ wostream->ostream.ostream.blocking = blocking;
+ wostream->ostream.max_buffer_size = max_buffer_size;
+ wostream->ostream.cork = wrapper_ostream_cork;
+ wostream->ostream.sendv = wrapper_ostream_sendv;
+ wostream->ostream.flush = wrapper_ostream_flush;
+ wostream->ostream.set_flush_callback =
+ wrapper_ostream_set_flush_callback;
+ wostream->ostream.flush_pending = wrapper_ostream_flush_pending;
+ wostream->ostream.get_buffer_used_size =
+ wrapper_ostream_get_buffer_used_size;
+ wostream->ostream.get_buffer_avail_size =
+ wrapper_ostream_get_buffer_avail_size;
+ wostream->ostream.switch_ioloop_to =
+ wrapper_ostream_switch_ioloop_to;
+
+ wostream->event = event_create(event);
+
+ return o_stream_create(&wostream->ostream, NULL, -1);
+}
+
+int wrapper_ostream_continue(struct wrapper_ostream *wostream)
+{
+ struct ostream_private *stream = &wostream->ostream;
+ struct ostream *ostream = &stream->ostream;
+ struct ioloop *ioloop = NULL;
+ bool use_cork = !stream->corked;
+ int ret = 1;
+
+ if (wostream->flush_waiting) {
+ /* Inside wrapper_ostream_flush_wait() */
+ ioloop = wostream->flush_ioloop;
+ }
+ if (stream->ostream.closed ||
+ (stream->finished && wrapper_ostream_is_empty(wostream) &&
+ wostream->output != NULL &&
+ o_stream_get_buffer_used_size(wostream->output) == 0)) {
+ /* Already finished */
+ ret = wrapper_ostream_finish(wostream);
+ if (ret == 0)
+ return 0;
+ }
+ if (wostream->flush_waiting) {
+ i_assert(ioloop != NULL);
+ io_loop_stop(ioloop);
+ wostream->flush_waiting = FALSE;
+ return ret;
+ }
+
+ /* Set flush_pending = FALSE first before calling the flush callback,
+ and change it to TRUE only if callback returns 0. That way the
+ callback can call o_stream_set_flush_pending() again and we don't
+ forget it even if flush callback returns 1. */
+ wostream->flush_pending = FALSE;
+
+ o_stream_ref(ostream);
+ wostream->continuing = TRUE;
+ for (;;) {
+ if (use_cork)
+ o_stream_cork(ostream);
+ ret = wrapper_ostream_callback(wostream);
+ if (use_cork && !wostream->output_closed) {
+ int fret = o_stream_uncork_flush(ostream);
+ if (ret == 0 && fret > 0)
+ continue;
+ if (fret < 0 && ret >= 0) {
+ i_assert(ostream->stream_errno != 0);
+ (void)wrapper_ostream_callback(wostream);
+ ret = -1;
+ }
+ }
+ break;
+ }
+ wostream->continuing = FALSE;
+ if (wostream->output_closed)
+ o_stream_close(ostream);
+
+ if (ret == 0)
+ wostream->flush_pending = TRUE;
+
+ if (!stream->ostream.blocking)
+ wrapper_ostream_output_manage(wostream, FALSE);
+
+ if (ret < 0 || ostream->stream_errno != 0 ||
+ wostream->pending_errno != 0)
+ ret = -1;
+ else if (wostream->output_closed)
+ ret = 1;
+ else if (!wrapper_ostream_is_empty(wostream) &&
+ (!stream->corked || wrapper_ostream_is_filled(wostream)))
+ ret = 0;
+ else if (wostream->flush_pending)
+ ret = 0;
+
+ o_stream_unref(&ostream);
+
+ return ret;
+}
+
+void wrapper_ostream_trigger_flush(struct wrapper_ostream *wostream)
+{
+ struct ostream *ostream = &wostream->ostream.ostream;
+
+ if (ostream->closed)
+ return;
+ if (wostream->to_event != NULL)
+ return;
+ if (!wostream->flush_waiting && wostream->ostream.callback == NULL)
+ return;
+
+ wostream->to_event = timeout_add_short(
+ 0, wrapper_ostream_handle_event, wostream);
+}
+
+bool wrapper_ostream_get_buffered_size(struct wrapper_ostream *wostream,
+ uoff_t *size_r)
+{
+ buffer_t *buffer = wostream->buffer;
+
+ if (!wostream->ostream.finished)
+ return FALSE;
+
+ *size_r = (buffer == NULL ? 0 : (uoff_t)buffer->used);
+ i_assert(*size_r == wostream->ostream.ostream.offset);
+ return TRUE;
+}
+
+void wrapper_ostream_output_available(struct wrapper_ostream *wostream,
+ struct ostream *output)
+{
+ i_assert(!wostream->output_closed);
+ i_assert(!wostream->output_finished);
+ i_assert(wostream->output == NULL);
+ wostream->output = output;
+ if (output != NULL) {
+ if (wostream->ostream.corked)
+ o_stream_cork(wostream->output);
+ o_stream_ref(output);
+ }
+}
+
+void wrapper_ostream_output_destroyed(struct wrapper_ostream *wostream)
+{
+ struct ostream *ostream = &wostream->ostream.ostream;
+
+ wrapper_ostream_trigger_flush(wostream);
+ o_stream_set_no_error_handling(ostream, TRUE);
+
+ o_stream_unref(&wostream->output);
+ wostream->output_closed = TRUE;
+ wostream->output_finished = TRUE;
+}
+
+void wrapper_ostream_set_error(struct wrapper_ostream *wostream,
+ int stream_errno, const char *stream_error)
+{
+ struct ostream *ostream = &wostream->ostream.ostream;
+
+ if (ostream->closed || wostream->pending_errno != 0 ||
+ wostream->returned_error)
+ return;
+
+ i_assert(wostream->pending_error == NULL);
+ wostream->pending_errno = stream_errno;
+ wostream->pending_error = i_strdup(stream_error);
+
+ wrapper_ostream_trigger_flush(wostream);
+}
+
+void wrapper_ostream_notify_error(struct wrapper_ostream *wostream)
+{
+ struct ostream *ostream = &wostream->ostream.ostream;
+
+ if (ostream->closed || ostream->blocking ||
+ wostream->output_closed_api || wostream->returned_error ||
+ wostream->continuing)
+ return;
+ if (wostream->pending_errno == 0)
+ return;
+ wostream->returned_error = TRUE;
+ (void)wrapper_ostream_callback(wostream);
+}