diff options
Diffstat (limited to '')
-rw-r--r-- | src/lib/istream-multiplex.c | 298 |
1 files changed, 298 insertions, 0 deletions
diff --git a/src/lib/istream-multiplex.c b/src/lib/istream-multiplex.c new file mode 100644 index 0000000..c70d0cc --- /dev/null +++ b/src/lib/istream-multiplex.c @@ -0,0 +1,298 @@ +/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "istream-private.h" +#include "istream-multiplex.h" + +/* all multiplex packets are [1 byte cid][4 byte length][data] */ + +struct multiplex_istream; + +struct multiplex_ichannel { + struct istream_private istream; + struct multiplex_istream *mstream; + uint8_t cid; + size_t pending_pos; + bool closed:1; +}; + +struct multiplex_istream { + struct istream *parent; + + /* channel 0 is main channel */ + uint8_t cur_channel; + unsigned int remain; + size_t bufsize; + ARRAY(struct multiplex_ichannel *) channels; + + bool blocking:1; +}; + +static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream); + +static struct multiplex_ichannel * +get_channel(struct multiplex_istream *mstream, uint8_t cid) +{ + struct multiplex_ichannel *channel; + i_assert(mstream != NULL); + array_foreach_elem(&mstream->channels, channel) { + if (channel != NULL && channel->cid == cid) + return channel; + } + return NULL; +} + +static void propagate_error(struct multiplex_istream *mstream, int stream_errno) +{ + struct multiplex_ichannel *channel; + array_foreach_elem(&mstream->channels, channel) + if (channel != NULL) + channel->istream.istream.stream_errno = stream_errno; +} + +static void propagate_eof(struct multiplex_istream *mstream) +{ + struct multiplex_ichannel *channel; + array_foreach_elem(&mstream->channels, channel) { + if (channel == NULL) + continue; + + channel->istream.istream.eof = TRUE; + if (mstream->remain > 0) { + channel->istream.istream.stream_errno = EPIPE; + io_stream_set_error(&channel->istream.iostream, + "Unexpected EOF - %u bytes remaining in packet", + mstream->remain); + } + } +} + +static ssize_t +i_stream_multiplex_read(struct multiplex_istream *mstream, + struct multiplex_ichannel *req_channel) +{ + const unsigned char *data; + size_t len = 0, used, wanted, avail; + ssize_t ret, got = 0; + + if (mstream->parent == NULL) { + req_channel->istream.istream.eof = TRUE; + return -1; + } + + (void)i_stream_get_data(mstream->parent, &len); + + if (len == 0 && mstream->parent->closed) { + req_channel->istream.istream.eof = TRUE; + return -1; + } + + if (((mstream->remain > 0 && len == 0) || + (mstream->remain == 0 && len < 5)) && + (ret = i_stream_read_memarea(mstream->parent)) <= 0) { + propagate_error(mstream, mstream->parent->stream_errno); + if (mstream->parent->eof) + propagate_eof(mstream); + return ret; + } + + for(;;) { + data = i_stream_get_data(mstream->parent, &len); + if (len == 0) { + if (got == 0 && mstream->blocking) { + /* can't return 0 with blocking istreams, + so try again from the beginning. */ + return i_stream_multiplex_read(mstream, req_channel); + } + break; + } + + if (mstream->remain > 0) { + struct multiplex_ichannel *channel = + get_channel(mstream, mstream->cur_channel); + wanted = I_MIN(len, mstream->remain); + /* is it open? */ + if (channel != NULL && !channel->closed) { + struct istream_private *stream = &channel->istream; + stream->pos += channel->pending_pos; + bool alloc_ret = i_stream_try_alloc(stream, wanted, &avail); + stream->pos -= channel->pending_pos; + if (!alloc_ret) { + i_stream_set_input_pending(&stream->istream, TRUE); + if (channel->cid != req_channel->cid) + return 0; + if (got > 0) + break; + return -2; + } + + used = I_MIN(wanted, avail); + + /* dump into buffer */ + if (channel->cid != req_channel->cid) { + i_assert(stream->pos + channel->pending_pos + used <= stream->buffer_size); + memcpy(stream->w_buffer + stream->pos + channel->pending_pos, + data, used); + channel->pending_pos += used; + i_stream_set_input_pending(&stream->istream, TRUE); + } else { + i_assert(stream->pos + used <= stream->buffer_size); + memcpy(stream->w_buffer + stream->pos, data, used); + stream->pos += used; + got += used; + } + } else { + used = wanted; + } + mstream->remain -= used; + i_stream_skip(mstream->parent, used); + /* see if there is more to read */ + continue; + } + if (mstream->remain == 0) { + /* need more data */ + if (len < 5) { + ret = i_stream_multiplex_ichannel_read(&req_channel->istream); + if (ret > 0) + got += ret; + break; + } + /* channel ID */ + mstream->cur_channel = data[0]; + /* data length */ + mstream->remain = be32_to_cpu_unaligned(data+1); + i_stream_skip(mstream->parent, 5); + } + } + + propagate_error(mstream, mstream->parent->stream_errno); + if (mstream->parent->eof) + propagate_eof(mstream); + + return got; +} + +static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream) +{ + struct multiplex_ichannel *channel = + container_of(stream, struct multiplex_ichannel, istream); + /* if previous multiplex read dumped data for us + actually serve it here. */ + if (channel->pending_pos > 0) { + ssize_t ret = channel->pending_pos; + stream->pos += channel->pending_pos; + channel->pending_pos = 0; + return ret; + } + return i_stream_multiplex_read(channel->mstream, channel); +} + +static void +i_stream_multiplex_ichannel_switch_ioloop_to(struct istream_private *stream, + struct ioloop *ioloop) +{ + struct multiplex_ichannel *channel = + container_of(stream, struct multiplex_ichannel, istream); + + i_stream_switch_ioloop_to(channel->mstream->parent, ioloop); +} + +static void +i_stream_multiplex_ichannel_close(struct iostream_private *stream, bool close_parent) +{ + struct multiplex_ichannel *arr_channel; + struct multiplex_ichannel *channel = + container_of(stream, struct multiplex_ichannel, + istream.iostream); + channel->closed = TRUE; + if (close_parent) { + array_foreach_elem(&channel->mstream->channels, arr_channel) + if (arr_channel != NULL && !arr_channel->closed) + return; + i_stream_close(channel->mstream->parent); + } +} + +static void i_stream_multiplex_try_destroy(struct multiplex_istream *mstream) +{ + struct multiplex_ichannel *channel; + /* can't do anything until they are all closed */ + array_foreach_elem(&mstream->channels, channel) + if (channel != NULL) + return; + i_stream_unref(&mstream->parent); + array_free(&mstream->channels); + i_free(mstream); +} + +static void i_stream_multiplex_ichannel_destroy(struct iostream_private *stream) +{ + struct multiplex_ichannel **channelp; + struct multiplex_ichannel *channel = + container_of(stream, struct multiplex_ichannel, + istream.iostream); + i_stream_multiplex_ichannel_close(stream, TRUE); + i_stream_free_buffer(&channel->istream); + array_foreach_modifiable(&channel->mstream->channels, channelp) { + if (*channelp == channel) { + *channelp = NULL; + break; + } + } + i_stream_multiplex_try_destroy(channel->mstream); +} + +static struct istream * +i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid) +{ + struct multiplex_ichannel *channel = i_new(struct multiplex_ichannel, 1); + channel->cid = cid; + channel->mstream = mstream; + channel->istream.read = i_stream_multiplex_ichannel_read; + channel->istream.switch_ioloop_to = i_stream_multiplex_ichannel_switch_ioloop_to; + channel->istream.iostream.close = i_stream_multiplex_ichannel_close; + channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy; + channel->istream.max_buffer_size = mstream->bufsize; + channel->istream.istream.blocking = mstream->blocking; + if (cid == 0) + channel->istream.fd = i_stream_get_fd(mstream->parent); + else + channel->istream.fd = -1; + array_push_back(&channel->mstream->channels, &channel); + + return i_stream_create(&channel->istream, NULL, channel->istream.fd, 0); +} + +struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid) +{ + struct multiplex_ichannel *chan = + container_of(stream->real_stream, + struct multiplex_ichannel, istream); + i_assert(get_channel(chan->mstream, cid) == NULL); + + return i_stream_add_channel_real(chan->mstream, cid); +} + +struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize) +{ + struct multiplex_istream *mstream; + + mstream = i_new(struct multiplex_istream, 1); + mstream->parent = parent; + mstream->bufsize = bufsize; + mstream->blocking = parent->blocking; + i_array_init(&mstream->channels, 8); + i_stream_ref(parent); + + return i_stream_add_channel_real(mstream, 0); +} + +uint8_t i_stream_multiplex_get_channel_id(struct istream *stream) +{ + struct multiplex_ichannel *channel = + container_of(stream->real_stream, + struct multiplex_ichannel, istream); + return channel->cid; +} |