summaryrefslogtreecommitdiffstats
path: root/src/lib/istream-tee.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/istream-tee.c')
-rw-r--r--src/lib/istream-tee.c258
1 files changed, 258 insertions, 0 deletions
diff --git a/src/lib/istream-tee.c b/src/lib/istream-tee.c
new file mode 100644
index 0000000..858afd7
--- /dev/null
+++ b/src/lib/istream-tee.c
@@ -0,0 +1,258 @@
+/* Copyright (c) 2006-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "istream-private.h"
+#include "istream-tee.h"
+
+struct tee_istream {
+ struct istream *input;
+ struct tee_child_istream *children;
+
+ uoff_t max_read_offset;
+};
+
+struct tee_child_istream {
+ struct istream_private istream;
+
+ struct tee_istream *tee;
+ struct tee_child_istream *next;
+
+ bool last_read_waiting:1;
+};
+
+static void tee_streams_update_buffer(struct tee_istream *tee)
+{
+ struct tee_child_istream *tstream = tee->children;
+ const unsigned char *data;
+ size_t size, old_used;
+
+ data = i_stream_get_data(tee->input, &size);
+ for (; tstream != NULL; tstream = tstream->next) {
+ if (tstream->istream.istream.closed) {
+ tstream->istream.skip = tstream->istream.pos = 0;
+ continue;
+ }
+ old_used = tstream->istream.pos - tstream->istream.skip;
+
+ tstream->istream.buffer = data;
+ i_assert(tstream->istream.istream.v_offset >= tee->input->v_offset);
+ tstream->istream.skip = tstream->istream.istream.v_offset -
+ tee->input->v_offset;
+ i_assert(tstream->istream.skip + old_used <= size);
+ tstream->istream.pos = tstream->istream.skip + old_used;
+
+ tstream->istream.parent_expected_offset =
+ tee->input->v_offset;
+ tstream->istream.access_counter =
+ tee->input->real_stream->access_counter;
+ }
+}
+
+static void tee_streams_skip(struct tee_istream *tee)
+{
+ struct tee_child_istream *tstream = tee->children;
+ size_t min_skip;
+
+ min_skip = SIZE_MAX;
+ for (; tstream != NULL; tstream = tstream->next) {
+ if (tstream->istream.skip < min_skip &&
+ !tstream->istream.istream.closed)
+ min_skip = tstream->istream.skip;
+ }
+
+ if (min_skip > 0 && min_skip != SIZE_MAX) {
+ i_stream_skip(tee->input, min_skip);
+ tee_streams_update_buffer(tee);
+ }
+}
+
+static void i_stream_tee_close(struct iostream_private *stream,
+ bool close_parent ATTR_UNUSED)
+{
+ struct tee_child_istream *tstream =
+ container_of(stream, struct tee_child_istream,
+ istream.iostream);
+
+ tee_streams_skip(tstream->tee);
+}
+
+static void i_stream_tee_destroy(struct iostream_private *stream)
+{
+ struct tee_child_istream *tstream =
+ container_of(stream, struct tee_child_istream,
+ istream.iostream);
+ struct tee_istream *tee = tstream->tee;
+ struct tee_child_istream **p;
+
+ if (tstream->istream.istream.v_offset > tee->max_read_offset)
+ tee->max_read_offset = tstream->istream.istream.v_offset;
+
+ for (p = &tee->children; *p != NULL; p = &(*p)->next) {
+ if (*p == tstream) {
+ *p = tstream->next;
+ break;
+ }
+ }
+
+ if (tee->children == NULL) {
+ /* last child. the tee is now destroyed */
+ i_assert(tee->input->v_offset <= tee->max_read_offset);
+ i_stream_skip(tee->input,
+ tee->max_read_offset - tee->input->v_offset);
+
+ i_stream_unref(&tee->input);
+ i_free(tee);
+ } else {
+ tee_streams_skip(tstream->tee);
+ }
+ /* i_stream_unref() shouldn't unref the parent */
+ tstream->istream.parent = NULL;
+}
+
+static void
+i_stream_tee_set_max_buffer_size(struct iostream_private *stream,
+ size_t max_size)
+{
+ struct tee_child_istream *tstream =
+ container_of(stream, struct tee_child_istream,
+ istream.iostream);
+
+ tstream->istream.max_buffer_size = max_size;
+ i_stream_set_max_buffer_size(tstream->tee->input, max_size);
+}
+
+static ssize_t i_stream_tee_read(struct istream_private *stream)
+{
+ struct tee_child_istream *tstream =
+ container_of(stream, struct tee_child_istream, istream);
+ struct istream *input = tstream->tee->input;
+ const unsigned char *data;
+ size_t size;
+ uoff_t last_high_offset;
+ ssize_t ret;
+
+ tstream->last_read_waiting = FALSE;
+ if (stream->buffer == NULL) {
+ /* initial read */
+ tee_streams_update_buffer(tstream->tee);
+ }
+ data = i_stream_get_data(input, &size);
+
+ /* last_high_offset contains how far we have read this child tee stream
+ so far. input->v_offset + size contains how much is available in
+ the parent stream without having to read more. */
+ last_high_offset = stream->istream.v_offset +
+ (stream->pos - stream->skip);
+ if (stream->pos == size) {
+ /* we've read everything, need to read more */
+ i_assert(last_high_offset == input->v_offset + size);
+ tee_streams_skip(tstream->tee);
+ ret = i_stream_read(input);
+ if (ret <= 0) {
+ size = i_stream_get_data_size(input);
+ if (ret == -2 && stream->skip != 0) {
+ /* someone else is holding the data,
+ wait for it */
+ tstream->last_read_waiting = TRUE;
+ return 0;
+ }
+ stream->istream.stream_errno = input->stream_errno;
+ stream->istream.eof = input->eof;
+ return ret;
+ }
+ tee_streams_update_buffer(tstream->tee);
+ data = i_stream_get_data(input, &size);
+ } else {
+ /* there's still some data available from parent */
+ i_assert(last_high_offset < input->v_offset + size);
+ tee_streams_update_buffer(tstream->tee);
+ i_assert(stream->pos < size);
+ }
+
+ i_assert(stream->buffer == data);
+ ret = size - stream->pos;
+ i_assert(ret > 0);
+ stream->pos = size;
+
+ i_assert(stream->istream.v_offset + (stream->pos - stream->skip) ==
+ input->v_offset + size);
+ return ret;
+}
+
+static int
+i_stream_tee_stat(struct istream_private *stream, bool exact)
+{
+ struct tee_child_istream *tstream =
+ container_of(stream, struct tee_child_istream, istream);
+ const struct stat *st;
+
+ if (i_stream_stat(tstream->tee->input, exact, &st) < 0)
+ return -1;
+ stream->statbuf = *st;
+ return 0;
+}
+
+static void i_stream_tee_sync(struct istream_private *stream)
+{
+ struct tee_child_istream *tstream =
+ container_of(stream, struct tee_child_istream, istream);
+
+ tee_streams_skip(tstream->tee);
+ if (i_stream_get_data_size(tstream->tee->input) != 0) {
+ i_panic("tee-istream: i_stream_sync() called "
+ "with data still buffered");
+ }
+ i_stream_sync(tstream->tee->input);
+}
+
+struct tee_istream *tee_i_stream_create(struct istream *input)
+{
+ struct tee_istream *tee;
+
+ tee = i_new(struct tee_istream, 1);
+ if (input->v_offset == 0) {
+ i_stream_ref(input);
+ tee->input = input;
+ } else {
+ tee->input = i_stream_create_limit(input, UOFF_T_MAX);
+ }
+ return tee;
+}
+
+struct istream *tee_i_stream_create_child(struct tee_istream *tee)
+{
+ struct tee_child_istream *tstream;
+ struct istream *ret, *input = tee->input;
+
+ tstream = i_new(struct tee_child_istream, 1);
+ tstream->tee = tee;
+
+ tstream->istream.max_buffer_size = input->real_stream->max_buffer_size;
+ tstream->istream.iostream.close = i_stream_tee_close;
+ tstream->istream.iostream.destroy = i_stream_tee_destroy;
+ tstream->istream.iostream.set_max_buffer_size =
+ i_stream_tee_set_max_buffer_size;
+
+ tstream->istream.read = i_stream_tee_read;
+ tstream->istream.stat = i_stream_tee_stat;
+ tstream->istream.sync = i_stream_tee_sync;
+
+ tstream->next = tee->children;
+ tee->children = tstream;
+
+ ret = i_stream_create(&tstream->istream, input, i_stream_get_fd(input),
+ ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT);
+ i_stream_set_name(&tstream->istream.istream, i_stream_get_name(input));
+ /* we keep the reference in tee stream, no need for extra references */
+ i_stream_unref(&input);
+ return ret;
+}
+
+bool tee_i_stream_child_is_waiting(struct istream *input)
+{
+ struct tee_child_istream *tstream =
+ container_of(input->real_stream,
+ struct tee_child_istream, istream);
+
+ return tstream->last_read_waiting;
+}