diff options
Diffstat (limited to 'src/lib/ostream-wrapper.c')
-rw-r--r-- | src/lib/ostream-wrapper.c | 1259 |
1 files changed, 1259 insertions, 0 deletions
diff --git a/src/lib/ostream-wrapper.c b/src/lib/ostream-wrapper.c new file mode 100644 index 0000000..dfd6699 --- /dev/null +++ b/src/lib/ostream-wrapper.c @@ -0,0 +1,1259 @@ +/* Copyright (c) 2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "ostream-private.h" + +#include "ostream-wrapper.h" + +static int wrapper_ostream_flush(struct ostream_private *stream); +static void +wrapper_ostream_switch_ioloop_to(struct ostream_private *stream, + struct ioloop *ioloop); + +/* + * Buffer + */ + +/* Determine the optimum buffer size for the wrapper stream itself. */ +static inline size_t +wrapper_ostream_optimal_size(struct wrapper_ostream *wostream) +{ + size_t optimal_size = wostream->ostream.max_buffer_size; + + if (wostream->output != NULL) { + optimal_size = I_MIN( + o_stream_get_max_buffer_size(wostream->output), + optimal_size); + } + if (optimal_size == SIZE_MAX) + optimal_size = IO_BLOCK_SIZE; + + return optimal_size; +} + +/* Return the current size of the wrapper output stream buffer. */ +static inline size_t wrapper_ostream_size(struct wrapper_ostream *wostream) +{ + buffer_t *buffer = wostream->buffer; + + if (buffer == NULL) + return 0; + return buffer->used; +} + +/* Return TRUE when the wrapper stream's internal buffer is empty. */ +static inline bool wrapper_ostream_is_empty(struct wrapper_ostream *wostream) +{ + return (wrapper_ostream_size(wostream) == 0); +} +/* Return TRUE when the wrapper stream's internal buffer is filled to the + maximum. */ +static inline bool wrapper_ostream_is_full(struct wrapper_ostream *wostream) +{ + return (wrapper_ostream_size(wostream) >= + wostream->ostream.max_buffer_size); +} +/* Return TRUE when the wrapper stream's internal buffer is filled at or beyond + the optimum. */ +static inline bool wrapper_ostream_is_filled(struct wrapper_ostream *wostream) +{ + return (wrapper_ostream_size(wostream) >= + wrapper_ostream_optimal_size(wostream)); +} + +/* + * Underlying output + */ + +/* Handle error in the underlying output stream (the parent). */ +static void +wrapper_ostream_copy_parent_error(struct wrapper_ostream *wostream) +{ + i_assert(wostream->output != NULL); + i_assert(wostream->output->stream_errno != 0); + + wostream->ostream.ostream.stream_errno = + wostream->output->stream_errno; + wostream->ostream.ostream.overflow = + wostream->output->overflow; +} + +static void +wrapper_ostream_handle_parent_error(struct wrapper_ostream *wostream) +{ + wrapper_ostream_copy_parent_error(wostream); + + if (wostream->output->closed) + o_stream_close(&wostream->ostream.ostream); + + if (wostream->output_error != NULL) + wostream->output_error(wostream); +} + +static void wrapper_ostream_closed(struct wrapper_ostream *wostream) +{ + wostream->ostream.ostream.closed = TRUE; +} + +/* Drop the underlying output. */ +static void wrapper_ostream_output_close(struct wrapper_ostream *wostream) +{ + o_stream_unref(&wostream->output); + wostream->output_finished = TRUE; + wostream->output_closed = TRUE; + wostream->output_closed_api = TRUE; +} + +/* Method calls */ + +/* Called when the implementation should start making the parent output stream + available, e.g. connect to the server (see output_start() method). + */ +static void wrapper_ostream_output_start(struct wrapper_ostream *wostream) +{ + if (wostream->output_started) + return; + wostream->output_started = TRUE; + if (wostream->output_start != NULL) + wostream->output_start(wostream); +} + +/* Returns TRUE when the output is ready for data (see output_ready() method). + */ +static bool wrapper_ostream_output_ready(struct wrapper_ostream *wostream) +{ + i_assert(wostream->output_ready != NULL); + return wostream->output_ready(wostream); +} + +/* Finish the underlying output (see output_finish() method).*/ +static int wrapper_ostream_output_finish(struct wrapper_ostream *wostream) +{ + i_assert(wostream->output_finish != NULL); + return wostream->output_finish(wostream); +} + +/* Called when the wrapper ostream does not need write to parent output stream. + (see output_halt() method). + */ +static void wrapper_ostream_output_halt(struct wrapper_ostream *wostream) +{ + if (wostream->output_closed) + return; + if (wostream->output_halt != NULL) + wostream->output_halt(wostream); +} + +/* Called when the wrapper ostream has data available for the parent output and + wants wrapper_ostream_continue() to be called when the parent stream is + writeable (see output_resume() method). */ +static void wrapper_ostream_output_resume(struct wrapper_ostream *wostream) +{ + if (wostream->output_closed) + return; + if (wostream->output_resume != NULL) + wostream->output_resume(wostream); +} + +/* Update any timeouts for the underlying (parent) output (see + output_update_timeouts() method). */ +static void +wrapper_ostream_output_update_timeouts(struct wrapper_ostream *wostream) +{ + struct ostream_private *stream = &wostream->ostream; + bool sender_blocking; + + if (wostream->output_closed) + return; + if (wostream->output_update_timeouts == NULL) + return; + + sender_blocking = (!stream->finished && + (wrapper_ostream_is_empty(wostream) || + (stream->corked && + !wrapper_ostream_is_filled(wostream)))); + wostream->output_update_timeouts(wostream, sender_blocking); +} + +/* + * Wrapper + */ + +/* Halt/resume the underlying output based on the state of the wrapper stream. + */ +static void +wrapper_ostream_output_manage(struct wrapper_ostream *wostream, bool sending) +{ + struct ostream_private *stream = &wostream->ostream; + bool must_flush, no_data; + + if (wostream->output_closed) + return; + + must_flush = (sending || stream->finished || wostream->flush_pending); + no_data = (wrapper_ostream_is_empty(wostream) || + (stream->corked && !wrapper_ostream_is_filled(wostream))); + + if (!must_flush && (no_data || stream->ostream.closed)) + wrapper_ostream_output_halt(wostream); + else { + wrapper_ostream_output_resume(wostream); + if (wostream->output != NULL && must_flush) + o_stream_set_flush_pending(wostream->output, TRUE); + } +} + +/* Handle any pending error by making it available to the application through + the output stream API. */ +static int +wrapper_ostream_handle_pending_error(struct wrapper_ostream *wostream) +{ + struct ostream_private *stream = &wostream->ostream; + + if (wostream->pending_errno != 0) { + if (wostream->pending_error != NULL) { + io_stream_set_error(&stream->iostream, + "%s", wostream->pending_error); + } + stream->ostream.stream_errno = wostream->pending_errno; + wostream->pending_errno = 0; + wostream->returned_error = TRUE; + wrapper_ostream_closed(wostream); + i_free_and_null(wostream->pending_error); + return -1; + } + return 0; +} + +/* Called when the wrapper stream is first finished using o_stream_finish(). */ +static int wrapper_ostream_finish(struct wrapper_ostream *wostream) +{ + int ret; + + if (wostream->output_closed) { + if (wrapper_ostream_handle_pending_error(wostream) < 0) + return -1; + return 1; + } + + if (!wrapper_ostream_output_ready(wostream)) { + return 0; + } + + wostream->output_finished = TRUE; + if (wostream->output != NULL) { + if (o_stream_uncork_flush(wostream->output) < 0) { + wrapper_ostream_handle_parent_error(wostream); + o_stream_unref(&wostream->output); + return -1; + } + } + + /* Finished sending payload; now also finish the underlying output. */ + ret = wrapper_ostream_output_finish(wostream); + if (ret == 0) + return ret; + if (ret < 0 && wostream->ostream.ostream.stream_errno != 0) { + wrapper_ostream_copy_parent_error(wostream); + return -1; + } + if (wrapper_ostream_handle_pending_error(wostream) < 0 || ret < 0) { + i_assert(wostream->ostream.ostream.stream_errno != 0); + return -1; + } + wrapper_ostream_output_close(wostream); + return 1; +} + +/* Wait in ioloop until underlying (parent) output can be flushed. This is + called only when the wrapper stream is blocking. */ +static int +wrapper_ostream_flush_wait(struct wrapper_ostream *wostream) +{ + struct ostream_private *stream = &wostream->ostream; + struct ioloop *ioloop, *prev_ioloop; + bool was_corked = FALSE; + + wrapper_ostream_output_manage(wostream, !wostream->flushing); + + /* Cannot be already waiting */ + i_assert(!wostream->flush_waiting); + i_assert(wostream->flush_ioloop == NULL); + + i_assert(wostream->wait_begin != NULL); + i_assert(wostream->wait_end != NULL); + + if (wostream->output != NULL && o_stream_is_corked(wostream->output)) { + /* Make sure parent is uncorked here to make sure output IO is + active. */ + if (o_stream_uncork_flush(wostream->output) < 0) { + wrapper_ostream_handle_parent_error(wostream); + return -1; + } + was_corked = TRUE; + } + + wostream->flush_ioloop = ioloop = io_loop_create(); + prev_ioloop = wostream->wait_begin(wostream, ioloop); + o_stream_switch_ioloop_to(&wostream->ostream.ostream, ioloop); + + /* Either we're waiting for network I/O or we're getting out of a + callback using timeout_add_short(0) */ + i_assert(io_loop_have_ios(ioloop) || + io_loop_have_immediate_timeouts(ioloop)); + + wostream->flush_waiting = TRUE; + do { + e_debug(wostream->event, "Waiting for output flush"); + io_loop_run(ioloop); + } while (wostream->flush_waiting); + + e_debug(wostream->event, "Can now flush output"); + + o_stream_switch_ioloop_to(&wostream->ostream.ostream, prev_ioloop); + wostream->wait_end(wostream, prev_ioloop); + io_loop_destroy(&ioloop); + wostream->flush_ioloop = NULL; + + if (stream->ostream.blocking) + wrapper_ostream_output_halt(wostream); + + if (was_corked && wostream->output != NULL) + o_stream_cork(wostream->output); + + if (wrapper_ostream_handle_pending_error(wostream) < 0) { + /* Stream already hit an error */ + return -1; + } + return 0; +} + +/* Try to flush the underlying (parent) output. */ +static int wrapper_ostream_flush_parent(struct wrapper_ostream *wostream) +{ + struct ostream *parent; + + if (wostream->output_closed) { + /* Output already dropped; nothing to flush */ + return 1; + } + if (!wrapper_ostream_output_ready(wostream)) { + /* There is no parent ostream yet */ + return 1; + } + + parent = wostream->output; + if (parent == NULL) { + /* There is no parent ostream anymore */ + i_assert(wostream->buffer == NULL || + wostream->buffer->used == 0); + return 1; + } + if (o_stream_get_buffer_used_size(parent) >= IO_BLOCK_SIZE) { + /* We already have quite a lot of data in parent stream. + unless we can flush it, don't add any more to it or we + could keep wasting memory by just increasing the buffer + size all the time. */ + if (o_stream_flush(parent) < 0) { + wrapper_ostream_handle_parent_error(wostream); + return -1; + } + if (o_stream_get_buffer_used_size(parent) >= IO_BLOCK_SIZE) + return 0; + } + + return 1; +} + +/* Try to write data to underlying (parent) output. */ +static ssize_t +wrapper_ostream_writev(struct wrapper_ostream *wostream, + const struct const_iovec *iov, unsigned int iov_count) +{ + struct ostream *parent = wostream->output; + ssize_t sent; + + i_assert(!wostream->output_closed); + i_assert(!wostream->output_finished); + + if (!wrapper_ostream_output_ready(wostream)) + return 0; + + /* Send more data to parent ostream */ + i_assert(parent != NULL); + o_stream_set_max_buffer_size(parent, IO_BLOCK_SIZE); + sent = o_stream_sendv(parent, iov, iov_count); + o_stream_set_max_buffer_size(parent, SIZE_MAX); + if (sent < 0) { + wrapper_ostream_handle_parent_error(wostream); + return -1; + } + + return sent; +} + +/* Try to write data to underlying (parent) output and implement blocking + behavior by running an ioloop. */ +static ssize_t +wrapper_ostream_writev_full(struct wrapper_ostream *wostream, + const struct const_iovec *iov, + unsigned int iov_count) +{ + struct ostream_private *stream = &wostream->ostream; + unsigned int i; + ssize_t sent, sent_total; + + if (!stream->ostream.blocking) { + /* Not blocking; send what we can */ + return wrapper_ostream_writev(wostream, iov, iov_count); + } + + /* Blocking; loop and wait until all is sent */ + + sent_total = 0; + for (;;) { + struct const_iovec niov; + size_t iov_pos; + + i_assert(iov_count > 0); + + /* Send iovec with complete entries */ + sent = wrapper_ostream_writev(wostream, iov, iov_count); + if (sent < 0) + return -1; + if (sent == 0) { + if (wrapper_ostream_flush_wait(wostream) < 0) + return -1; + i_assert(!wostream->output_closed); + continue; + } + + /* Determine what was sent */ + sent_total += sent; + iov_pos = (size_t)sent; + for (i = 0; i < iov_count && iov_pos >= iov[i].iov_len; i++) + iov_pos -= iov[i].iov_len; + if (i >= iov_count) { + /* All sent */ + i_assert(iov_pos == 0); + return sent_total; + } + + iov = &iov[i]; + iov_count -= i; + if (iov_pos == 0) { + /* Nicely sent until an iovec boundary */ + continue; + } + + /* Send partial iovec entry */ + i_zero(&niov); + niov = iov[0]; + i_assert(iov_pos < niov.iov_len); + niov.iov_base = CONST_PTR_OFFSET(niov.iov_base, iov_pos); + niov.iov_len -= iov_pos; + + while (niov.iov_len > 0) { + sent = wrapper_ostream_writev(wostream, &niov, 1); + if (sent < 0) + return sent; + if (sent == 0) { + if (wrapper_ostream_flush_wait(wostream) < 0) + return -1; + i_assert(!wostream->output_closed); + continue; + } + i_assert((size_t)sent <= niov.iov_len); + niov.iov_base = CONST_PTR_OFFSET(niov.iov_base, sent); + niov.iov_len -= sent; + sent_total += sent; + } + + if (iov_count == 1) { + i_assert(sent_total != 0); + return sent_total; + } + + /* Now sent until an iovec boundary */ + iov = &iov[1]; + iov_count--; + } + + i_unreached(); +} + +/* Try to flush wrapper stream's buffer content. */ +static int wrapper_ostream_flush_buffer(struct wrapper_ostream *wostream) +{ + struct ostream_private *stream = &wostream->ostream; + buffer_t *buffer = wostream->buffer; + struct const_iovec iov; + ssize_t sent; + + if (wostream->output_closed) { + /* Ostream already finished */ + i_assert(wostream->ostream.finished); + return 1; + } + + if (buffer == NULL || buffer->used == 0) { + /* Buffer already empty */ + return 1; + } + + do { + /* Try to flush whole buffer */ + iov.iov_base = buffer->data; + iov.iov_len = buffer->used; + sent = wrapper_ostream_writev_full(wostream, &iov, 1); + if (sent < 0) + return -1; + + /* Remove sent data from buffer */ + buffer_delete(buffer, 0, sent); + + /* More aggressively flush the buffer when this stream is + finished + */ + } while (wostream->ostream.finished && sent > 0 && buffer->used > 0); + + if (buffer->used == 0 || + (stream->corked && !wrapper_ostream_is_filled(wostream))) + wrapper_ostream_output_halt(wostream); + + return (buffer->used == 0 ? 1 : 0); +} + +static int wrapper_ostream_flush_real(struct wrapper_ostream *wostream) +{ + struct ostream_private *stream = &wostream->ostream; + int ret; + + if (wrapper_ostream_handle_pending_error(wostream) < 0) { + /* Stream already hit an error */ + return -1; + } + wrapper_ostream_output_start(wostream); + + if ((ret = wrapper_ostream_flush_parent(wostream)) <= 0) { + /* Try to flush parent stream first to make room for more + data */ + return ret; + } + if ((ret = wrapper_ostream_flush_buffer(wostream)) <= 0) { + /* Try sending data we already buffered */ + return ret; + } + + if (wostream->output_closed || wostream->output_finished) { + /* Already finished the ostream */ + i_assert(stream->finished); + return 1; + } + + if (!wrapper_ostream_output_ready(wostream)) { + return ((wostream->buffer == NULL || + wostream->buffer->used == 0) ? 1 : 0); + } + + if (wostream->output == NULL) { + i_assert(wrapper_ostream_is_empty(wostream)); + ret = 1; + } else { + ret = o_stream_flush(wostream->output); + if (ret < 0) + wrapper_ostream_handle_parent_error(wostream); + } + + return ret; +} + +static bool +wrapper_ostream_send_prepare(struct wrapper_ostream *wostream, size_t size) +{ + struct ostream_private *stream = &wostream->ostream; + + if (wostream->output_closed || wostream->output_started) + return TRUE; + + if (stream->corked && !stream->finished) { + if (wostream->buffer == NULL) + return FALSE; + if ((wostream->buffer->used + size) < stream->max_buffer_size) + return FALSE; + } + wrapper_ostream_output_start(wostream); + return TRUE; +} + +/* Add data to the wrapper stream's internal buffer. */ +static size_t +wrapper_ostream_add(struct wrapper_ostream *wostream, + const struct const_iovec *iov, + unsigned int iov_count, unsigned int *iov_idx, + size_t *iov_idx_pos) +{ + buffer_t *buffer = wostream->buffer; + unsigned int i; + size_t added = 0; + + /* Create buffer */ + if (buffer == NULL) { + wostream->buffer = buffer = + buffer_create_dynamic(default_pool, IO_BLOCK_SIZE); + } + + for (i = *iov_idx; i < iov_count; i++) { + size_t iov_len, iov_add, space; + const unsigned char *iov_data; + + iov_len = iov[i].iov_len; + iov_data = iov[i].iov_base; + space = wostream->ostream.max_buffer_size - buffer->used; + + i_assert(*iov_idx_pos < iov_len); + if (*iov_idx_pos > 0) { + iov_len -= *iov_idx_pos; + iov_data += *iov_idx_pos; + } + iov_add = I_MIN(space, iov_len); + buffer_append(buffer, iov_data, iov_add); + added += iov_add; + if (iov_add < iov_len) { + /* Buffer is full */ + *iov_idx_pos += iov_add; + break; + } + *iov_idx_pos = 0; + } + + *iov_idx = i; + return added; +} + +static ssize_t +wrapper_ostream_sendv_real(struct wrapper_ostream *wostream, + const struct const_iovec *iov, + unsigned int iov_count) +{ + struct ostream_private *stream = &wostream->ostream; + ssize_t written; + size_t size, iov_pos, sent; + unsigned int i; + int ret; + + if (wrapper_ostream_handle_pending_error(wostream) < 0) { + /* Stream already hit an error */ + return -1; + } + + i_assert(!wostream->output_closed); + i_assert(!wostream->output_finished); + + /* Determine total size of data to send */ + size = 0; + for (i = 0; i < iov_count; i++) + size += iov[i].iov_len; + + /* Flush buffer if required */ + if (!wrapper_ostream_is_empty(wostream) && + (!stream->corked || wrapper_ostream_is_filled(wostream)) && + wrapper_ostream_send_prepare(wostream, size) && + wrapper_ostream_flush_buffer(wostream) < 0) + return -1; + + if (!stream->corked && wrapper_ostream_is_full(wostream)) { + /* No space in buffer for more data */ + i_assert(!stream->ostream.blocking); + return 0; + } + + /* Send data to connection directly if possible */ + i = 0; + sent = iov_pos = 0; + if (wrapper_ostream_is_empty(wostream) && + (!stream->corked || + size >= wrapper_ostream_optimal_size(wostream)) && + wrapper_ostream_send_prepare(wostream, size)) { + written = wrapper_ostream_writev_full(wostream, iov, iov_count); + if (written < 0) + return -1; + sent += written; + if (sent == size) { + /* All sent */ + return (ssize_t)sent; + } + + i_assert(!stream->ostream.blocking); + + /* Determine send position */ + iov_pos = sent; + for (; i < iov_count && iov_pos >= iov[i].iov_len; i++) + iov_pos -= iov[i].iov_len; + i_assert(i < iov_count); + } + + /* Fill buffer with remainder that was not sent directly */ + for (;;) { + sent += wrapper_ostream_add(wostream, iov, iov_count, + &i, &iov_pos); + i_assert(sent <= size); + + if (!stream->corked || !wrapper_ostream_is_filled(wostream)) + break; + + /* Flush corked full buffer */ + wrapper_ostream_output_start(wostream); + if ((ret = wrapper_ostream_flush_buffer(wostream)) < 0) + return -1; + if (ret == 0) + break; + } + + i_assert(!stream->ostream.blocking || sent == size); + return sent; +} + +/* Run the flush callback for the wrapper stream. */ +static int wrapper_ostream_callback(struct wrapper_ostream *wostream) +{ + int ret; + + if (wostream->ostream.callback != NULL) { + if (wostream->callback_pre != NULL) + wostream->callback_pre(wostream); + ret = wostream->ostream.callback(wostream->ostream.context); + if (wostream->callback_post != NULL) + wostream->callback_post(wostream); + } else { + ret = wrapper_ostream_flush(&wostream->ostream); + } + return ret; +} + +/* Handle an event by running wrapper_ostream_continue(). This called from + ioloop on a zero timeout. */ +static void wrapper_ostream_handle_event(struct wrapper_ostream *wostream) +{ + timeout_remove(&wostream->to_event); + (void)wrapper_ostream_continue(wostream); +} + +/* + * iostream methods + */ + +static void +wrapper_ostream_close(struct iostream_private *stream, + bool close_parent ATTR_UNUSED) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream.iostream); + + timeout_remove(&wostream->to_event); + wrapper_ostream_output_close(wostream); + if (wostream->close != NULL) + wostream->close(wostream); +} + +static void wrapper_ostream_destroy(struct iostream_private *stream) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream.iostream); + + timeout_remove(&wostream->to_event); + i_free(wostream->pending_error); + + if (wostream->destroy != NULL) + wostream->destroy(wostream); + buffer_free(&wostream->buffer); + o_stream_unref(&wostream->output); + event_unref(&wostream->event); +} + +/* + * ostream methods + */ + +static void wrapper_ostream_cork(struct ostream_private *stream, bool set) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream); + int ret; + + if (stream->ostream.closed || wostream->pending_errno != 0) + return; + + if (wostream->output_closed) { + i_assert(wostream->ostream.finished); + return; + } + + if (set) { + if (wostream->output != NULL) + o_stream_cork(wostream->output); + } else { + /* Buffer flushing might close the stream */ + ret = wrapper_ostream_flush_buffer(wostream); + stream->last_errors_not_checked = TRUE; + + if (wostream->output != NULL) { + if (o_stream_uncork_flush(wostream->output) < 0) { + wrapper_ostream_handle_parent_error(wostream); + ret = -1; + } + } + if ((ret == 0 || wostream->flush_pending) && + !stream->ostream.closed) + wrapper_ostream_output_resume(wostream); + } + stream->corked = set; + + wrapper_ostream_output_manage(wostream, FALSE); +} + +static ssize_t +wrapper_ostream_sendv(struct ostream_private *stream, + const struct const_iovec *iov, unsigned int iov_count) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream); + bool must_uncork = FALSE; + ssize_t sret; + + if (wrapper_ostream_handle_pending_error(wostream) < 0) { + /* Stream already hit an error */ + return -1; + } + + /* Cork parent ostream if necessary */ + if (!wostream->output_closed && wostream->output != NULL && + !o_stream_is_corked(wostream->output)) { + o_stream_cork(wostream->output); + must_uncork = TRUE; + } + + sret = wrapper_ostream_sendv_real(wostream, iov, iov_count); + if (sret > 0) + stream->ostream.offset += (ssize_t)sret; + + /* Uncork the parent ostream */ + if (must_uncork && !wostream->output_closed && + wostream->output != NULL) { + if (o_stream_uncork_flush(wostream->output) < 0 && + sret >= 0) { + wrapper_ostream_handle_parent_error(wostream); + sret = -1; + } + } + + if (sret >= 0) { + wrapper_ostream_output_update_timeouts(wostream); + if (!stream->ostream.blocking) + wrapper_ostream_output_manage(wostream, FALSE); + } + + return sret; +} + +static int wrapper_ostream_flush(struct ostream_private *stream) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream); + struct ostream *ostream = &stream->ostream; + bool must_uncork = FALSE; + int ret; + + if (wrapper_ostream_handle_pending_error(wostream) < 0) { + /* Stream already hit an error */ + return -1; + } + + if (wostream->output_closed) { + if (!stream->finished || !wrapper_ostream_is_empty(wostream)) { + stream->ostream.stream_errno = EPIPE; + return -1; + } + /* Already finished the ostream */ + return 1; + } + + if (wostream->flushing) { + /* Prevent recursion while finishing output */ + return 1; + } + wostream->flushing = TRUE; + o_stream_ref(ostream); + + /* Cork parent ostream if necessary */ + if (wostream->output != NULL && !o_stream_is_corked(wostream->output)) { + o_stream_cork(wostream->output); + must_uncork = TRUE; + } + + /* If blocking: loop until all is flushed; otherwise try once */ + do { + /* Try to flush */ + if ((ret = wrapper_ostream_flush_real(wostream)) < 0) { + ret = -1; + break; + } + + if (ret == 0 && stream->ostream.blocking) { + /* Block until we can write more */ + if (wrapper_ostream_flush_wait(wostream) < 0) { + ret = -1; + break; + } + } + + if (stream->ostream.closed) { + /* Ostream was closed in the mean time */ + ret = -1; + break; + } + + if (wostream->output_closed) { + /* Already finished the ostream */ + i_assert(stream->finished); + ret = 1; + break; + } + } while (ret == 0 && stream->ostream.blocking); + + if (ret > 0 && stream->finished) { + /* This was an o_stream_finish() call or subsequent flush */ + i_assert(wrapper_ostream_is_empty(wostream)); + while ((ret = wrapper_ostream_finish(wostream)) == 0) { + if (!stream->ostream.blocking) { + /* Not yet finished completely */ + break; + } + /* Block until we can write more */ + if (wrapper_ostream_flush_wait(wostream) < 0) { + ret = -1; + break; + } + } + } + wrapper_ostream_output_update_timeouts(wostream); + wostream->flushing = FALSE; + + if (ret >= 0 && !ostream->blocking) + wrapper_ostream_output_manage(wostream, FALSE); + + if (wostream->output_closed) { + i_assert(ret < 0 || ostream->stream_errno == 0 || + ostream->closed); + i_assert(ret >= 0 || ostream->stream_errno != 0); + o_stream_unref(&ostream); + return (ret >= 0 ? 1 : -1); + } + + if (!must_uncork || wostream->output == NULL) { + /* Nothing */ + } else if (ret >= 0) { + /* Uncork the parent ostream */ + if (o_stream_uncork_flush(wostream->output) < 0) { + wrapper_ostream_handle_parent_error(wostream); + ret = -1; + } + } else { + o_stream_uncork(wostream->output); + } + + i_assert(ret >= 0 || ostream->stream_errno != 0); + o_stream_unref(&ostream); + return ret; +} + +static void +wrapper_ostream_set_flush_callback(struct ostream_private *stream, + stream_flush_callback_t *callback, + void *context) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream); + + stream->callback = callback; + stream->context = context; + + if (!stream->ostream.blocking && stream->callback == NULL) { + /* Application is currently not interested in flush events and + that includes request events like errors. */ + timeout_remove(&wostream->to_event); + } else if (wostream->pending_error != NULL && + wostream->to_event == NULL) { + /* Schedule flush callback to notify application of events */ + wostream->to_event = timeout_add_short( + 0, wrapper_ostream_handle_event, wostream); + } +} + +static void +wrapper_ostream_flush_pending(struct ostream_private *stream, bool set) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream); + + wostream->flush_pending = set; + if (!set) + return; + if (wostream->output_closed) { + i_assert(wostream->ostream.ostream.closed); + return; + } + if (wostream->to_event == NULL) { + wostream->to_event = timeout_add_short( + 0, wrapper_ostream_handle_event, wostream); + } +} + +static size_t +wrapper_ostream_get_buffer_used_size(const struct ostream_private *stream) +{ + const struct wrapper_ostream *wostream = + container_of(stream, const struct wrapper_ostream, ostream); + size_t size = 0; + + if (wostream->buffer != NULL) + size += wostream->buffer->used; + if (wostream->output != NULL) + size += o_stream_get_buffer_used_size(wostream->output); + return size; +} + +static size_t +wrapper_ostream_get_buffer_avail_size(const struct ostream_private *stream) +{ + const struct wrapper_ostream *wostream = + container_of(stream, const struct wrapper_ostream, ostream); + size_t size = 0; + + if (wostream->ostream.max_buffer_size == SIZE_MAX) + return SIZE_MAX; + + if (wostream->buffer == NULL) + size = wostream->ostream.max_buffer_size; + else if (wostream->buffer->used < wostream->ostream.max_buffer_size) { + size = (wostream->ostream.max_buffer_size - + wostream->buffer->used); + } + + if (wostream->output != NULL) + size += o_stream_get_buffer_avail_size(wostream->output); + + return size; +} + +static void +wrapper_ostream_switch_ioloop_to(struct ostream_private *stream, + struct ioloop *ioloop) +{ + struct wrapper_ostream *wostream = + container_of(stream, struct wrapper_ostream, ostream); + + if (wostream->flush_ioloop != ioloop && + wostream->switch_ioloop_to != NULL) + wostream->switch_ioloop_to(wostream, ioloop); + + if (wostream->to_event != NULL) { + wostream->to_event = + io_loop_move_timeout_to(ioloop, &wostream->to_event); + } +} + +/* + * API + */ + +struct ostream * +wrapper_ostream_create(struct wrapper_ostream *wostream, + size_t max_buffer_size, bool blocking, + struct event *event) +{ + wostream->ostream.iostream.close = wrapper_ostream_close; + wostream->ostream.iostream.destroy = wrapper_ostream_destroy; + + wostream->ostream.ostream.blocking = blocking; + wostream->ostream.max_buffer_size = max_buffer_size; + wostream->ostream.cork = wrapper_ostream_cork; + wostream->ostream.sendv = wrapper_ostream_sendv; + wostream->ostream.flush = wrapper_ostream_flush; + wostream->ostream.set_flush_callback = + wrapper_ostream_set_flush_callback; + wostream->ostream.flush_pending = wrapper_ostream_flush_pending; + wostream->ostream.get_buffer_used_size = + wrapper_ostream_get_buffer_used_size; + wostream->ostream.get_buffer_avail_size = + wrapper_ostream_get_buffer_avail_size; + wostream->ostream.switch_ioloop_to = + wrapper_ostream_switch_ioloop_to; + + wostream->event = event_create(event); + + return o_stream_create(&wostream->ostream, NULL, -1); +} + +int wrapper_ostream_continue(struct wrapper_ostream *wostream) +{ + struct ostream_private *stream = &wostream->ostream; + struct ostream *ostream = &stream->ostream; + struct ioloop *ioloop = NULL; + bool use_cork = !stream->corked; + int ret = 1; + + if (wostream->flush_waiting) { + /* Inside wrapper_ostream_flush_wait() */ + ioloop = wostream->flush_ioloop; + } + if (stream->ostream.closed || + (stream->finished && wrapper_ostream_is_empty(wostream) && + wostream->output != NULL && + o_stream_get_buffer_used_size(wostream->output) == 0)) { + /* Already finished */ + ret = wrapper_ostream_finish(wostream); + if (ret == 0) + return 0; + } + if (wostream->flush_waiting) { + i_assert(ioloop != NULL); + io_loop_stop(ioloop); + wostream->flush_waiting = FALSE; + return ret; + } + + /* Set flush_pending = FALSE first before calling the flush callback, + and change it to TRUE only if callback returns 0. That way the + callback can call o_stream_set_flush_pending() again and we don't + forget it even if flush callback returns 1. */ + wostream->flush_pending = FALSE; + + o_stream_ref(ostream); + wostream->continuing = TRUE; + for (;;) { + if (use_cork) + o_stream_cork(ostream); + ret = wrapper_ostream_callback(wostream); + if (use_cork && !wostream->output_closed) { + int fret = o_stream_uncork_flush(ostream); + if (ret == 0 && fret > 0) + continue; + if (fret < 0 && ret >= 0) { + i_assert(ostream->stream_errno != 0); + (void)wrapper_ostream_callback(wostream); + ret = -1; + } + } + break; + } + wostream->continuing = FALSE; + if (wostream->output_closed) + o_stream_close(ostream); + + if (ret == 0) + wostream->flush_pending = TRUE; + + if (!stream->ostream.blocking) + wrapper_ostream_output_manage(wostream, FALSE); + + if (ret < 0 || ostream->stream_errno != 0 || + wostream->pending_errno != 0) + ret = -1; + else if (wostream->output_closed) + ret = 1; + else if (!wrapper_ostream_is_empty(wostream) && + (!stream->corked || wrapper_ostream_is_filled(wostream))) + ret = 0; + else if (wostream->flush_pending) + ret = 0; + + o_stream_unref(&ostream); + + return ret; +} + +void wrapper_ostream_trigger_flush(struct wrapper_ostream *wostream) +{ + struct ostream *ostream = &wostream->ostream.ostream; + + if (ostream->closed) + return; + if (wostream->to_event != NULL) + return; + if (!wostream->flush_waiting && wostream->ostream.callback == NULL) + return; + + wostream->to_event = timeout_add_short( + 0, wrapper_ostream_handle_event, wostream); +} + +bool wrapper_ostream_get_buffered_size(struct wrapper_ostream *wostream, + uoff_t *size_r) +{ + buffer_t *buffer = wostream->buffer; + + if (!wostream->ostream.finished) + return FALSE; + + *size_r = (buffer == NULL ? 0 : (uoff_t)buffer->used); + i_assert(*size_r == wostream->ostream.ostream.offset); + return TRUE; +} + +void wrapper_ostream_output_available(struct wrapper_ostream *wostream, + struct ostream *output) +{ + i_assert(!wostream->output_closed); + i_assert(!wostream->output_finished); + i_assert(wostream->output == NULL); + wostream->output = output; + if (output != NULL) { + if (wostream->ostream.corked) + o_stream_cork(wostream->output); + o_stream_ref(output); + } +} + +void wrapper_ostream_output_destroyed(struct wrapper_ostream *wostream) +{ + struct ostream *ostream = &wostream->ostream.ostream; + + wrapper_ostream_trigger_flush(wostream); + o_stream_set_no_error_handling(ostream, TRUE); + + o_stream_unref(&wostream->output); + wostream->output_closed = TRUE; + wostream->output_finished = TRUE; +} + +void wrapper_ostream_set_error(struct wrapper_ostream *wostream, + int stream_errno, const char *stream_error) +{ + struct ostream *ostream = &wostream->ostream.ostream; + + if (ostream->closed || wostream->pending_errno != 0 || + wostream->returned_error) + return; + + i_assert(wostream->pending_error == NULL); + wostream->pending_errno = stream_errno; + wostream->pending_error = i_strdup(stream_error); + + wrapper_ostream_trigger_flush(wostream); +} + +void wrapper_ostream_notify_error(struct wrapper_ostream *wostream) +{ + struct ostream *ostream = &wostream->ostream.ostream; + + if (ostream->closed || ostream->blocking || + wostream->output_closed_api || wostream->returned_error || + wostream->continuing) + return; + if (wostream->pending_errno == 0) + return; + wostream->returned_error = TRUE; + (void)wrapper_ostream_callback(wostream); +} |