diff options
Diffstat (limited to 'src/lib/istream-tee.c')
-rw-r--r-- | src/lib/istream-tee.c | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/src/lib/istream-tee.c b/src/lib/istream-tee.c new file mode 100644 index 0000000..858afd7 --- /dev/null +++ b/src/lib/istream-tee.c @@ -0,0 +1,258 @@ +/* Copyright (c) 2006-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "istream-private.h" +#include "istream-tee.h" + +struct tee_istream { + struct istream *input; + struct tee_child_istream *children; + + uoff_t max_read_offset; +}; + +struct tee_child_istream { + struct istream_private istream; + + struct tee_istream *tee; + struct tee_child_istream *next; + + bool last_read_waiting:1; +}; + +static void tee_streams_update_buffer(struct tee_istream *tee) +{ + struct tee_child_istream *tstream = tee->children; + const unsigned char *data; + size_t size, old_used; + + data = i_stream_get_data(tee->input, &size); + for (; tstream != NULL; tstream = tstream->next) { + if (tstream->istream.istream.closed) { + tstream->istream.skip = tstream->istream.pos = 0; + continue; + } + old_used = tstream->istream.pos - tstream->istream.skip; + + tstream->istream.buffer = data; + i_assert(tstream->istream.istream.v_offset >= tee->input->v_offset); + tstream->istream.skip = tstream->istream.istream.v_offset - + tee->input->v_offset; + i_assert(tstream->istream.skip + old_used <= size); + tstream->istream.pos = tstream->istream.skip + old_used; + + tstream->istream.parent_expected_offset = + tee->input->v_offset; + tstream->istream.access_counter = + tee->input->real_stream->access_counter; + } +} + +static void tee_streams_skip(struct tee_istream *tee) +{ + struct tee_child_istream *tstream = tee->children; + size_t min_skip; + + min_skip = SIZE_MAX; + for (; tstream != NULL; tstream = tstream->next) { + if (tstream->istream.skip < min_skip && + !tstream->istream.istream.closed) + min_skip = tstream->istream.skip; + } + + if (min_skip > 0 && min_skip != SIZE_MAX) { + i_stream_skip(tee->input, min_skip); + tee_streams_update_buffer(tee); + } +} + +static void i_stream_tee_close(struct iostream_private *stream, + bool close_parent ATTR_UNUSED) +{ + struct tee_child_istream *tstream = + container_of(stream, struct tee_child_istream, + istream.iostream); + + tee_streams_skip(tstream->tee); +} + +static void i_stream_tee_destroy(struct iostream_private *stream) +{ + struct tee_child_istream *tstream = + container_of(stream, struct tee_child_istream, + istream.iostream); + struct tee_istream *tee = tstream->tee; + struct tee_child_istream **p; + + if (tstream->istream.istream.v_offset > tee->max_read_offset) + tee->max_read_offset = tstream->istream.istream.v_offset; + + for (p = &tee->children; *p != NULL; p = &(*p)->next) { + if (*p == tstream) { + *p = tstream->next; + break; + } + } + + if (tee->children == NULL) { + /* last child. the tee is now destroyed */ + i_assert(tee->input->v_offset <= tee->max_read_offset); + i_stream_skip(tee->input, + tee->max_read_offset - tee->input->v_offset); + + i_stream_unref(&tee->input); + i_free(tee); + } else { + tee_streams_skip(tstream->tee); + } + /* i_stream_unref() shouldn't unref the parent */ + tstream->istream.parent = NULL; +} + +static void +i_stream_tee_set_max_buffer_size(struct iostream_private *stream, + size_t max_size) +{ + struct tee_child_istream *tstream = + container_of(stream, struct tee_child_istream, + istream.iostream); + + tstream->istream.max_buffer_size = max_size; + i_stream_set_max_buffer_size(tstream->tee->input, max_size); +} + +static ssize_t i_stream_tee_read(struct istream_private *stream) +{ + struct tee_child_istream *tstream = + container_of(stream, struct tee_child_istream, istream); + struct istream *input = tstream->tee->input; + const unsigned char *data; + size_t size; + uoff_t last_high_offset; + ssize_t ret; + + tstream->last_read_waiting = FALSE; + if (stream->buffer == NULL) { + /* initial read */ + tee_streams_update_buffer(tstream->tee); + } + data = i_stream_get_data(input, &size); + + /* last_high_offset contains how far we have read this child tee stream + so far. input->v_offset + size contains how much is available in + the parent stream without having to read more. */ + last_high_offset = stream->istream.v_offset + + (stream->pos - stream->skip); + if (stream->pos == size) { + /* we've read everything, need to read more */ + i_assert(last_high_offset == input->v_offset + size); + tee_streams_skip(tstream->tee); + ret = i_stream_read(input); + if (ret <= 0) { + size = i_stream_get_data_size(input); + if (ret == -2 && stream->skip != 0) { + /* someone else is holding the data, + wait for it */ + tstream->last_read_waiting = TRUE; + return 0; + } + stream->istream.stream_errno = input->stream_errno; + stream->istream.eof = input->eof; + return ret; + } + tee_streams_update_buffer(tstream->tee); + data = i_stream_get_data(input, &size); + } else { + /* there's still some data available from parent */ + i_assert(last_high_offset < input->v_offset + size); + tee_streams_update_buffer(tstream->tee); + i_assert(stream->pos < size); + } + + i_assert(stream->buffer == data); + ret = size - stream->pos; + i_assert(ret > 0); + stream->pos = size; + + i_assert(stream->istream.v_offset + (stream->pos - stream->skip) == + input->v_offset + size); + return ret; +} + +static int +i_stream_tee_stat(struct istream_private *stream, bool exact) +{ + struct tee_child_istream *tstream = + container_of(stream, struct tee_child_istream, istream); + const struct stat *st; + + if (i_stream_stat(tstream->tee->input, exact, &st) < 0) + return -1; + stream->statbuf = *st; + return 0; +} + +static void i_stream_tee_sync(struct istream_private *stream) +{ + struct tee_child_istream *tstream = + container_of(stream, struct tee_child_istream, istream); + + tee_streams_skip(tstream->tee); + if (i_stream_get_data_size(tstream->tee->input) != 0) { + i_panic("tee-istream: i_stream_sync() called " + "with data still buffered"); + } + i_stream_sync(tstream->tee->input); +} + +struct tee_istream *tee_i_stream_create(struct istream *input) +{ + struct tee_istream *tee; + + tee = i_new(struct tee_istream, 1); + if (input->v_offset == 0) { + i_stream_ref(input); + tee->input = input; + } else { + tee->input = i_stream_create_limit(input, UOFF_T_MAX); + } + return tee; +} + +struct istream *tee_i_stream_create_child(struct tee_istream *tee) +{ + struct tee_child_istream *tstream; + struct istream *ret, *input = tee->input; + + tstream = i_new(struct tee_child_istream, 1); + tstream->tee = tee; + + tstream->istream.max_buffer_size = input->real_stream->max_buffer_size; + tstream->istream.iostream.close = i_stream_tee_close; + tstream->istream.iostream.destroy = i_stream_tee_destroy; + tstream->istream.iostream.set_max_buffer_size = + i_stream_tee_set_max_buffer_size; + + tstream->istream.read = i_stream_tee_read; + tstream->istream.stat = i_stream_tee_stat; + tstream->istream.sync = i_stream_tee_sync; + + tstream->next = tee->children; + tee->children = tstream; + + ret = i_stream_create(&tstream->istream, input, i_stream_get_fd(input), + ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT); + i_stream_set_name(&tstream->istream.istream, i_stream_get_name(input)); + /* we keep the reference in tee stream, no need for extra references */ + i_stream_unref(&input); + return ret; +} + +bool tee_i_stream_child_is_waiting(struct istream *input) +{ + struct tee_child_istream *tstream = + container_of(input->real_stream, + struct tee_child_istream, istream); + + return tstream->last_read_waiting; +} |