summaryrefslogtreecommitdiffstats
path: root/src/lib/istream-chain.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/istream-chain.c')
-rw-r--r--src/lib/istream-chain.c349
1 files changed, 349 insertions, 0 deletions
diff --git a/src/lib/istream-chain.c b/src/lib/istream-chain.c
new file mode 100644
index 0000000..3b9a87b
--- /dev/null
+++ b/src/lib/istream-chain.c
@@ -0,0 +1,349 @@
+/* Copyright (c) 2003-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "llist.h"
+#include "memarea.h"
+#include "istream-private.h"
+#include "istream-chain.h"
+
+struct chain_istream;
+
+struct istream_chain_link {
+ struct istream_chain_link *prev, *next;
+
+ struct istream *stream;
+ bool eof;
+};
+
+struct istream_chain {
+ struct istream_chain_link *head, *tail;
+
+ struct chain_istream *stream;
+};
+
+struct chain_istream {
+ struct istream_private istream;
+
+ /* how much of the previous link's stream still exists at the
+ beginning of our buffer. skipping through this should point to
+ the beginning of the current link's stream. */
+ size_t prev_stream_left;
+ size_t prev_skip;
+
+ struct istream_chain chain;
+};
+
+static void ATTR_NULL(2)
+i_stream_chain_append_internal(struct istream_chain *chain,
+ struct istream *stream)
+{
+ struct istream_chain_link *link;
+
+ if (stream == NULL && chain->tail != NULL && chain->tail->stream == NULL)
+ return;
+
+ link = i_new(struct istream_chain_link, 1);
+ link->stream = stream;
+ link->eof = stream == NULL;
+
+ if (stream != NULL)
+ i_stream_ref(stream);
+
+ if (chain->head == NULL && stream != NULL) {
+ i_stream_set_max_buffer_size(stream,
+ chain->stream->istream.max_buffer_size);
+ }
+ DLLIST2_APPEND(&chain->head, &chain->tail, link);
+ /* if io_add_istream() has been added to this chain stream, notify
+ the callback that we have more data available. */
+ if (stream != NULL)
+ i_stream_set_input_pending(stream, TRUE);
+}
+
+void i_stream_chain_append(struct istream_chain *chain, struct istream *stream)
+{
+ i_stream_chain_append_internal(chain, stream);
+}
+
+void i_stream_chain_append_eof(struct istream_chain *chain)
+{
+ i_stream_chain_append_internal(chain, NULL);
+}
+
+static void
+i_stream_chain_set_max_buffer_size(struct iostream_private *stream,
+ size_t max_size)
+{
+ struct chain_istream *cstream =
+ container_of(stream, struct chain_istream, istream.iostream);
+ struct istream_chain_link *link = cstream->chain.head;
+
+ cstream->istream.max_buffer_size = max_size;
+ while (link != NULL) {
+ if (link->stream != NULL)
+ i_stream_set_max_buffer_size(link->stream, max_size);
+ link = link->next;
+ }
+}
+
+static void i_stream_chain_destroy(struct iostream_private *stream)
+{
+ struct chain_istream *cstream =
+ container_of(stream, struct chain_istream, istream.iostream);
+ struct istream_chain_link *link = cstream->chain.head;
+
+ while (link != NULL) {
+ struct istream_chain_link *next = link->next;
+
+ i_stream_unref(&link->stream);
+ i_free(link);
+ link = next;
+ }
+ i_stream_free_buffer(&cstream->istream);
+}
+
+static void i_stream_chain_read_next(struct chain_istream *cstream)
+{
+ struct istream_chain_link *link = cstream->chain.head;
+ struct istream *prev_input;
+ const unsigned char *data;
+ size_t data_size, cur_data_pos;
+
+ i_assert(link != NULL && link->stream != NULL);
+ i_assert(link->stream->eof);
+
+ prev_input = link->stream;
+ data = i_stream_get_data(prev_input, &data_size);
+
+ DLLIST2_REMOVE(&cstream->chain.head, &cstream->chain.tail, link);
+ i_free(link);
+
+ /* a) we have more streams, b) we have EOF, c) we need to wait
+ for more streams */
+ link = cstream->chain.head;
+ if (link != NULL && link->stream != NULL)
+ i_stream_seek(link->stream, 0);
+
+ if (cstream->prev_stream_left > 0) {
+ /* we've already buffered some of the prev_input. continue
+ appending the rest to it. if it's already at EOF, there's
+ nothing more to append. */
+ cur_data_pos = cstream->istream.pos -
+ (cstream->istream.skip + cstream->prev_stream_left);
+ i_assert(cur_data_pos <= data_size);
+ data += cur_data_pos;
+ data_size -= cur_data_pos;
+ /* the stream has now become "previous", so its contents in
+ buffer are now part of prev_stream_left. */
+ cstream->prev_stream_left += cur_data_pos;
+ } else {
+ cstream->istream.pos = 0;
+ cstream->istream.skip = 0;
+ cstream->prev_stream_left = 0;
+ }
+
+ if (data_size > 0) {
+ if (cstream->istream.memarea != NULL &&
+ memarea_get_refcount(cstream->istream.memarea) > 1)
+ i_stream_memarea_detach(&cstream->istream);
+ memcpy(i_stream_alloc(&cstream->istream, data_size),
+ data, data_size);
+ cstream->istream.pos += data_size;
+ cstream->prev_stream_left += data_size;
+ }
+
+ i_stream_skip(prev_input, i_stream_get_data_size(prev_input));
+ i_stream_unref(&prev_input);
+}
+
+static bool i_stream_chain_skip(struct chain_istream *cstream)
+{
+ struct istream_private *stream = &cstream->istream;
+ struct istream_chain_link *link = cstream->chain.head;
+ size_t bytes_skipped;
+
+ i_assert(stream->skip >= cstream->prev_skip);
+ bytes_skipped = stream->skip - cstream->prev_skip;
+
+ if (cstream->prev_stream_left == 0) {
+ /* no need to worry about buffers, skip everything */
+ } else if (bytes_skipped < cstream->prev_stream_left) {
+ /* we're still skipping inside buffer */
+ cstream->prev_stream_left -= bytes_skipped;
+ bytes_skipped = 0;
+ } else {
+ /* done with the buffer */
+ bytes_skipped -= cstream->prev_stream_left;
+ cstream->prev_stream_left = 0;
+ }
+ if (bytes_skipped > 0) {
+ i_assert(stream->buffer != NULL);
+ stream->pos -= bytes_skipped;
+ stream->skip -= bytes_skipped;
+ stream->buffer += bytes_skipped;
+ }
+ cstream->prev_skip = stream->skip;
+ if (link == NULL || link->eof) {
+ i_assert(bytes_skipped == 0);
+ return FALSE;
+ }
+ i_stream_skip(link->stream, bytes_skipped);
+ return TRUE;
+}
+
+static ssize_t i_stream_chain_read(struct istream_private *stream)
+{
+ struct chain_istream *cstream =
+ container_of(stream, struct chain_istream, istream);
+ struct istream_chain_link *link = cstream->chain.head;
+ const unsigned char *data;
+ size_t data_size, cur_data_pos, new_pos;
+ size_t new_bytes_count;
+ ssize_t ret;
+
+ if (link != NULL && link->eof) {
+ stream->istream.eof = TRUE;
+ return -1;
+ }
+
+ if (!i_stream_chain_skip(cstream))
+ return 0;
+ i_assert(link != NULL);
+
+ i_assert(stream->pos >= stream->skip + cstream->prev_stream_left);
+ cur_data_pos = stream->pos - (stream->skip + cstream->prev_stream_left);
+
+ data = i_stream_get_data(link->stream, &data_size);
+ if (data_size > cur_data_pos)
+ ret = 0;
+ else {
+ /* need to read more */
+ i_assert(cur_data_pos == data_size);
+ ret = i_stream_read_memarea(link->stream);
+ if (ret == -2 || ret == 0)
+ return ret;
+
+ if (ret == -1) {
+ if (link->stream->stream_errno != 0) {
+ io_stream_set_error(&stream->iostream,
+ "read(%s) failed: %s",
+ i_stream_get_name(link->stream),
+ i_stream_get_error(link->stream));
+ stream->istream.stream_errno =
+ link->stream->stream_errno;
+ return -1;
+ }
+ /* EOF of this stream, go to next stream */
+ i_stream_chain_read_next(cstream);
+ cstream->prev_skip = stream->skip;
+ return i_stream_chain_read(stream);
+ }
+ /* we read something */
+ data = i_stream_get_data(link->stream, &data_size);
+ }
+
+ if (data_size == cur_data_pos) {
+ /* nothing new read - preserve the buffer as it was */
+ i_assert(ret == 0 || ret == -1);
+ return ret;
+ }
+ if (cstream->prev_stream_left == 0) {
+ /* we can point directly to the current stream's buffers */
+ stream->buffer = data;
+ stream->pos -= stream->skip;
+ stream->skip = 0;
+ new_pos = data_size;
+ } else {
+ /* we still have some of the previous stream left. merge the
+ new data with it. */
+ i_assert(data_size > cur_data_pos);
+ new_bytes_count = data_size - cur_data_pos;
+ memcpy(i_stream_alloc(stream, new_bytes_count),
+ data + cur_data_pos, new_bytes_count);
+ stream->buffer = stream->w_buffer;
+ new_pos = stream->pos + new_bytes_count;
+ }
+
+ i_assert(new_pos > stream->pos);
+ ret = (ssize_t)(new_pos - stream->pos);
+ stream->pos = new_pos;
+ cstream->prev_skip = stream->skip;
+ return ret;
+}
+
+static void i_stream_chain_close(struct iostream_private *stream,
+ bool close_parent)
+{
+ struct chain_istream *cstream =
+ container_of(stream, struct chain_istream, istream.iostream);
+
+ /* seek to the correct position in parent stream in case it didn't
+ end with EOF */
+ (void)i_stream_chain_skip(cstream);
+
+ if (close_parent) {
+ struct istream_chain_link *link = cstream->chain.head;
+ while (link != NULL) {
+ i_stream_close(link->stream);
+ link = link->next;
+ }
+ }
+}
+
+static struct istream_snapshot *
+i_stream_chain_snapshot(struct istream_private *stream,
+ struct istream_snapshot *prev_snapshot)
+{
+ if (stream->buffer == stream->w_buffer) {
+ /* Two or more istreams have been combined. Snapshot the
+ w_buffer's contents that contains their data. */
+ i_assert(stream->memarea != NULL);
+ return i_stream_default_snapshot(stream, prev_snapshot);
+ }
+ /* Individual istreams are being read. Snapshot the istream directly. */
+ struct chain_istream *cstream =
+ container_of(stream, struct chain_istream, istream);
+ struct istream_chain_link *link = cstream->chain.head;
+ if (link == NULL || link->stream == NULL)
+ return prev_snapshot;
+
+ struct istream_private *_link_stream = link->stream->real_stream;
+ struct istream_snapshot *snapshot = i_new(struct istream_snapshot, 1);
+ snapshot->prev_snapshot =
+ _link_stream->snapshot(_link_stream, prev_snapshot);
+ if (snapshot->prev_snapshot == prev_snapshot) {
+ /* The link stream didn't implement snapshotting in any way.
+ This could cause trouble if the link stream is freed while
+ it's still referred to in this snapshot. Fix this by
+ referencing the link istream. Normally avoid doing this,
+ since the extra references can cause unexpected problems. */
+ snapshot->istream = link->stream;
+ i_stream_ref(snapshot->istream);
+ }
+ return snapshot;
+}
+
+struct istream *i_stream_create_chain(struct istream_chain **chain_r,
+ size_t max_buffer_size)
+{
+ struct chain_istream *cstream;
+
+ cstream = i_new(struct chain_istream, 1);
+ cstream->chain.stream = cstream;
+ cstream->istream.max_buffer_size = max_buffer_size;
+
+ cstream->istream.iostream.close = i_stream_chain_close;
+ cstream->istream.iostream.destroy = i_stream_chain_destroy;
+ cstream->istream.iostream.set_max_buffer_size =
+ i_stream_chain_set_max_buffer_size;
+
+ cstream->istream.read = i_stream_chain_read;
+ cstream->istream.snapshot = i_stream_chain_snapshot;
+
+ cstream->istream.istream.readable_fd = FALSE;
+ cstream->istream.istream.blocking = FALSE;
+ cstream->istream.istream.seekable = FALSE;
+
+ *chain_r = &cstream->chain;
+ return i_stream_create(&cstream->istream, NULL, -1, 0);
+}