/* Copyright (c) 2006-2018 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "istream-private.h" #include "istream-tee.h" struct tee_istream { struct istream *input; struct tee_child_istream *children; uoff_t max_read_offset; }; struct tee_child_istream { struct istream_private istream; struct tee_istream *tee; struct tee_child_istream *next; bool last_read_waiting:1; }; static void tee_streams_update_buffer(struct tee_istream *tee) { struct tee_child_istream *tstream = tee->children; const unsigned char *data; size_t size, old_used; data = i_stream_get_data(tee->input, &size); for (; tstream != NULL; tstream = tstream->next) { if (tstream->istream.istream.closed) { tstream->istream.skip = tstream->istream.pos = 0; continue; } old_used = tstream->istream.pos - tstream->istream.skip; tstream->istream.buffer = data; i_assert(tstream->istream.istream.v_offset >= tee->input->v_offset); tstream->istream.skip = tstream->istream.istream.v_offset - tee->input->v_offset; i_assert(tstream->istream.skip + old_used <= size); tstream->istream.pos = tstream->istream.skip + old_used; tstream->istream.parent_expected_offset = tee->input->v_offset; tstream->istream.access_counter = tee->input->real_stream->access_counter; } } static void tee_streams_skip(struct tee_istream *tee) { struct tee_child_istream *tstream = tee->children; size_t min_skip; min_skip = SIZE_MAX; for (; tstream != NULL; tstream = tstream->next) { if (tstream->istream.skip < min_skip && !tstream->istream.istream.closed) min_skip = tstream->istream.skip; } if (min_skip > 0 && min_skip != SIZE_MAX) { i_stream_skip(tee->input, min_skip); tee_streams_update_buffer(tee); } } static void i_stream_tee_close(struct iostream_private *stream, bool close_parent ATTR_UNUSED) { struct tee_child_istream *tstream = container_of(stream, struct tee_child_istream, istream.iostream); tee_streams_skip(tstream->tee); } static void i_stream_tee_destroy(struct iostream_private *stream) { struct tee_child_istream *tstream = container_of(stream, struct tee_child_istream, istream.iostream); struct tee_istream *tee = tstream->tee; struct tee_child_istream **p; if (tstream->istream.istream.v_offset > tee->max_read_offset) tee->max_read_offset = tstream->istream.istream.v_offset; for (p = &tee->children; *p != NULL; p = &(*p)->next) { if (*p == tstream) { *p = tstream->next; break; } } if (tee->children == NULL) { /* last child. the tee is now destroyed */ i_assert(tee->input->v_offset <= tee->max_read_offset); i_stream_skip(tee->input, tee->max_read_offset - tee->input->v_offset); i_stream_unref(&tee->input); i_free(tee); } else { tee_streams_skip(tstream->tee); } /* i_stream_unref() shouldn't unref the parent */ tstream->istream.parent = NULL; } static void i_stream_tee_set_max_buffer_size(struct iostream_private *stream, size_t max_size) { struct tee_child_istream *tstream = container_of(stream, struct tee_child_istream, istream.iostream); tstream->istream.max_buffer_size = max_size; i_stream_set_max_buffer_size(tstream->tee->input, max_size); } static ssize_t i_stream_tee_read(struct istream_private *stream) { struct tee_child_istream *tstream = container_of(stream, struct tee_child_istream, istream); struct istream *input = tstream->tee->input; const unsigned char *data; size_t size; uoff_t last_high_offset; ssize_t ret; tstream->last_read_waiting = FALSE; if (stream->buffer == NULL) { /* initial read */ tee_streams_update_buffer(tstream->tee); } data = i_stream_get_data(input, &size); /* last_high_offset contains how far we have read this child tee stream so far. input->v_offset + size contains how much is available in the parent stream without having to read more. */ last_high_offset = stream->istream.v_offset + (stream->pos - stream->skip); if (stream->pos == size) { /* we've read everything, need to read more */ i_assert(last_high_offset == input->v_offset + size); tee_streams_skip(tstream->tee); ret = i_stream_read(input); if (ret <= 0) { size = i_stream_get_data_size(input); if (ret == -2 && stream->skip != 0) { /* someone else is holding the data, wait for it */ tstream->last_read_waiting = TRUE; return 0; } stream->istream.stream_errno = input->stream_errno; stream->istream.eof = input->eof; return ret; } tee_streams_update_buffer(tstream->tee); data = i_stream_get_data(input, &size); } else { /* there's still some data available from parent */ i_assert(last_high_offset < input->v_offset + size); tee_streams_update_buffer(tstream->tee); i_assert(stream->pos < size); } i_assert(stream->buffer == data); ret = size - stream->pos; i_assert(ret > 0); stream->pos = size; i_assert(stream->istream.v_offset + (stream->pos - stream->skip) == input->v_offset + size); return ret; } static int i_stream_tee_stat(struct istream_private *stream, bool exact) { struct tee_child_istream *tstream = container_of(stream, struct tee_child_istream, istream); const struct stat *st; if (i_stream_stat(tstream->tee->input, exact, &st) < 0) return -1; stream->statbuf = *st; return 0; } static void i_stream_tee_sync(struct istream_private *stream) { struct tee_child_istream *tstream = container_of(stream, struct tee_child_istream, istream); tee_streams_skip(tstream->tee); if (i_stream_get_data_size(tstream->tee->input) != 0) { i_panic("tee-istream: i_stream_sync() called " "with data still buffered"); } i_stream_sync(tstream->tee->input); } struct tee_istream *tee_i_stream_create(struct istream *input) { struct tee_istream *tee; tee = i_new(struct tee_istream, 1); if (input->v_offset == 0) { i_stream_ref(input); tee->input = input; } else { tee->input = i_stream_create_limit(input, UOFF_T_MAX); } return tee; } struct istream *tee_i_stream_create_child(struct tee_istream *tee) { struct tee_child_istream *tstream; struct istream *ret, *input = tee->input; tstream = i_new(struct tee_child_istream, 1); tstream->tee = tee; tstream->istream.max_buffer_size = input->real_stream->max_buffer_size; tstream->istream.iostream.close = i_stream_tee_close; tstream->istream.iostream.destroy = i_stream_tee_destroy; tstream->istream.iostream.set_max_buffer_size = i_stream_tee_set_max_buffer_size; tstream->istream.read = i_stream_tee_read; tstream->istream.stat = i_stream_tee_stat; tstream->istream.sync = i_stream_tee_sync; tstream->next = tee->children; tee->children = tstream; ret = i_stream_create(&tstream->istream, input, i_stream_get_fd(input), ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT); i_stream_set_name(&tstream->istream.istream, i_stream_get_name(input)); /* we keep the reference in tee stream, no need for extra references */ i_stream_unref(&input); return ret; } bool tee_i_stream_child_is_waiting(struct istream *input) { struct tee_child_istream *tstream = container_of(input->real_stream, struct tee_child_istream, istream); return tstream->last_read_waiting; }