diff options
Diffstat (limited to 'src/lib/ostream-multiplex.c')
-rw-r--r-- | src/lib/ostream-multiplex.c | 367 |
1 files changed, 367 insertions, 0 deletions
diff --git a/src/lib/ostream-multiplex.c b/src/lib/ostream-multiplex.c new file mode 100644 index 0000000..6c3bf85 --- /dev/null +++ b/src/lib/ostream-multiplex.c @@ -0,0 +1,367 @@ +/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "ostream-private.h" +#include "ostream-multiplex.h" + +/* all multiplex packets are [1 byte cid][4 byte length][data] */ + +struct multiplex_ostream; + +struct multiplex_ochannel { + struct ostream_private ostream; + struct multiplex_ostream *mstream; + uint8_t cid; + buffer_t *buf; + uint64_t last_sent_counter; + bool closed:1; + bool corked:1; +}; + +struct multiplex_ostream { + struct ostream *parent; + + stream_flush_callback_t *old_flush_callback; + void *old_flush_context; + + /* channel 0 is main channel */ + uint8_t cur_channel; + unsigned int remain; + size_t bufsize; + uint64_t send_counter; + ARRAY(struct multiplex_ochannel *) channels; + + bool destroyed:1; +}; + +static struct multiplex_ochannel * +get_channel(struct multiplex_ostream *mstream, uint8_t cid) +{ + struct multiplex_ochannel *channel; + i_assert(mstream != NULL); + array_foreach_elem(&mstream->channels, channel) { + if (channel != NULL && channel->cid == cid) + return channel; + } + return NULL; +} + +static void propagate_error(struct multiplex_ostream *mstream, int stream_errno) +{ + struct multiplex_ochannel *channel; + array_foreach_elem(&mstream->channels, channel) + if (channel != NULL) + channel->ostream.ostream.stream_errno = stream_errno; +} + +static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream) +{ + struct multiplex_ochannel *oldest_channel = NULL; + struct multiplex_ochannel *channel; + uint64_t last_counter = mstream->send_counter; + + array_foreach_elem(&mstream->channels, channel) { + if (channel != NULL && + channel->last_sent_counter <= last_counter && + channel->buf->used > 0) { + last_counter = channel->last_sent_counter; + oldest_channel = channel; + } + } + return oldest_channel; +} + +static bool +o_stream_multiplex_sendv(struct multiplex_ostream *mstream) +{ + struct multiplex_ochannel *channel; + ssize_t ret = 0; + bool all_sent = TRUE; + + while((channel = get_next_channel(mstream)) != NULL) { + if (channel->buf->used == 0) + continue; + if (o_stream_get_buffer_avail_size(mstream->parent) < 6) { + all_sent = FALSE; + break; + } + /* check parent stream capacity */ + size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5; + /* ensure it fits into 32 bit int */ + size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used)); + /* ensure amt fits */ + if (tmp == 0) + break; + /* delay corking here now that we are going to send something */ + if (!o_stream_is_corked(mstream->parent)) + o_stream_cork(mstream->parent); + uint32_t len = cpu32_to_be(amt); + const struct const_iovec vec[] = { + { &channel->cid, 1 }, + { &len, 4 }, + { channel->buf->data, amt } + }; + if ((ret = o_stream_sendv(mstream->parent, vec, N_ELEMENTS(vec))) < 0) { + propagate_error(mstream, mstream->parent->stream_errno); + break; + } + i_assert((size_t)ret == 1 + 4 + amt); + buffer_delete(channel->buf, 0, amt); + channel->last_sent_counter = ++mstream->send_counter; + } + if (o_stream_is_corked(mstream->parent)) + o_stream_uncork(mstream->parent); + return all_sent; +} + +static int o_stream_multiplex_flush(struct multiplex_ostream *mstream) +{ + int ret = o_stream_flush(mstream->parent); + if (ret >= 0) { + if (!o_stream_multiplex_sendv(mstream)) + return 0; + } + + /* a) Everything is flushed. See if one of the callbacks' flush + callbacks wants to write more data. + b) ostream failed. Notify the callbacks in case they need to know. */ + struct multiplex_ochannel *channel; + bool unfinished = FALSE; + bool failed = FALSE; + array_foreach_elem(&mstream->channels, channel) { + if (channel != NULL && channel->ostream.callback != NULL) { + ret = channel->ostream.callback(channel->ostream.context); + if (ret < 0) + failed = TRUE; + else if (ret == 0) + unfinished = TRUE; + } + } + return failed ? -1 : + (unfinished ? 0 : 1); +} + +static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream) +{ + ssize_t ret; + struct multiplex_ochannel *channel = + container_of(stream, struct multiplex_ochannel, ostream); + struct multiplex_ostream *mstream = channel->mstream; + + /* flush parent stream always, so there is room for more. */ + if ((ret = o_stream_flush(mstream->parent)) <= 0) { + if (ret == -1) + propagate_error(mstream, mstream->parent->stream_errno); + return ret; + } + + /* send all channels */ + o_stream_multiplex_sendv(mstream); + + if (channel->buf->used > 0) + return 0; + return 1; +} + +static void o_stream_multiplex_ochannel_cork(struct ostream_private *stream, bool set) +{ + struct multiplex_ochannel *channel = + container_of(stream, struct multiplex_ochannel, ostream); + if (channel->corked != set && !set) { + /* flush */ + (void)o_stream_multiplex_ochannel_flush(stream); + } + channel->corked = set; +} + +static ssize_t +o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, + const struct const_iovec *iov, unsigned int iov_count) +{ + struct multiplex_ochannel *channel = + container_of(stream, struct multiplex_ochannel, ostream); + size_t total = 0, avail = o_stream_get_buffer_avail_size(&stream->ostream); + size_t optimal_size = I_MIN(IO_BLOCK_SIZE, avail); + + for (unsigned int i = 0; i < iov_count; i++) + total += iov[i].iov_len; + + if (avail < total) { + o_stream_multiplex_sendv(channel->mstream); + avail = o_stream_get_buffer_avail_size(&stream->ostream); + if (avail == 0) + return 0; + } + + total = 0; + + for (unsigned int i = 0; i < iov_count; i++) { + /* copy data to buffer */ + size_t tmp = avail - total; + if (tmp == 0) + break; + buffer_append(channel->buf, iov[i].iov_base, + I_MIN(tmp, iov[i].iov_len)); + total += I_MIN(tmp, iov[i].iov_len); + } + + stream->ostream.offset += total; + + /* will send later */ + if (channel->corked && channel->buf->used < optimal_size) + return total; + + o_stream_multiplex_sendv(channel->mstream); + return total; +} + +static void +o_stream_multiplex_ochannel_set_flush_callback(struct ostream_private *stream, + stream_flush_callback_t *callback, + void *context) +{ + /* We have overwritten our parent's flush-callback. Don't change it. */ + stream->callback = callback; + stream->context = context; +} + +static size_t +o_stream_multiplex_ochannel_get_buffer_used_size(const struct ostream_private *stream) +{ + const struct multiplex_ochannel *channel = + container_of(stream, const struct multiplex_ochannel, ostream); + + return channel->buf->used + + o_stream_get_buffer_used_size(channel->mstream->parent); +} + +static size_t +o_stream_multiplex_ochannel_get_buffer_avail_size(const struct ostream_private *stream) +{ + const struct multiplex_ochannel *channel = + container_of(stream, const struct multiplex_ochannel, ostream); + size_t max_avail = I_MIN(channel->mstream->bufsize, + o_stream_get_buffer_avail_size(stream->parent)); + + /* There is 5-byte overhead per message, so take that into account */ + return max_avail <= (channel->buf->used + 5) ? 0 : + max_avail - (channel->buf->used + 5); +} + +static void +o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent) +{ + struct multiplex_ochannel *arr_channel; + struct multiplex_ochannel *channel = + container_of(stream, struct multiplex_ochannel, ostream.iostream); + + channel->closed = TRUE; + if (close_parent) { + array_foreach_elem(&channel->mstream->channels, arr_channel) + if (arr_channel != NULL && !arr_channel->closed) + return; + o_stream_close(channel->mstream->parent); + } +} + +static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream) +{ + struct multiplex_ochannel *channel; + /* can't do anything until they are all closed */ + array_foreach_elem(&mstream->channels, channel) + if (channel != NULL) + return; + + i_assert(mstream->parent->real_stream->callback == + (stream_flush_callback_t *)o_stream_multiplex_flush); + o_stream_set_flush_callback(mstream->parent, + *mstream->old_flush_callback, + mstream->old_flush_context); + o_stream_unref(&mstream->parent); + array_free(&mstream->channels); + i_free(mstream); +} + +static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream) +{ + struct multiplex_ochannel **channelp; + struct multiplex_ochannel *channel = + container_of(stream, struct multiplex_ochannel, ostream.iostream); + o_stream_unref(&channel->ostream.parent); + if (channel->buf != NULL) + buffer_free(&channel->buf); + /* delete the channel */ + array_foreach_modifiable(&channel->mstream->channels, channelp) { + if (*channelp != NULL && (*channelp)->cid == channel->cid) { + *channelp = NULL; + break; + } + } + o_stream_multiplex_try_destroy(channel->mstream); +} + +static struct ostream * +o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) +{ + struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1); + channel->cid = cid; + channel->buf = buffer_create_dynamic(default_pool, 256); + channel->mstream = mstream; + channel->ostream.cork = o_stream_multiplex_ochannel_cork; + channel->ostream.flush = o_stream_multiplex_ochannel_flush; + channel->ostream.sendv = o_stream_multiplex_ochannel_sendv; + channel->ostream.set_flush_callback = + o_stream_multiplex_ochannel_set_flush_callback; + channel->ostream.get_buffer_used_size = + o_stream_multiplex_ochannel_get_buffer_used_size; + channel->ostream.get_buffer_avail_size = + o_stream_multiplex_ochannel_get_buffer_avail_size; + channel->ostream.iostream.close = o_stream_multiplex_ochannel_close; + channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy; + channel->ostream.fd = o_stream_get_fd(mstream->parent); + array_push_back(&channel->mstream->channels, &channel); + + (void)o_stream_create(&channel->ostream, mstream->parent, -1); + /* o_stream_create() defaults the flush_callback to parent's callback. + Here it points to o_stream_multiplex_flush(), which just causes + infinite looping. */ + channel->ostream.callback = NULL; + channel->ostream.context = NULL; + return &channel->ostream.ostream; +} + +struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid) +{ + struct multiplex_ochannel *chan = + container_of(stream->real_stream, struct multiplex_ochannel, + ostream); + i_assert(get_channel(chan->mstream, cid) == NULL); + + return o_stream_add_channel_real(chan->mstream, cid); +} + +struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize) +{ + struct multiplex_ostream *mstream; + + mstream = i_new(struct multiplex_ostream, 1); + mstream->parent = parent; + mstream->bufsize = bufsize; + mstream->old_flush_callback = parent->real_stream->callback; + mstream->old_flush_context = parent->real_stream->context; + o_stream_set_flush_callback(parent, o_stream_multiplex_flush, mstream); + i_array_init(&mstream->channels, 8); + o_stream_ref(parent); + + return o_stream_add_channel_real(mstream, 0); +} + +uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream) +{ + struct multiplex_ochannel *channel = + container_of(stream->real_stream, struct multiplex_ochannel, + ostream); + return channel->cid; +} |