summaryrefslogtreecommitdiffstats
path: root/src/lib/istream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/istream.c')
-rw-r--r--src/lib/istream.c1316
1 files changed, 1316 insertions, 0 deletions
diff --git a/src/lib/istream.c b/src/lib/istream.c
new file mode 100644
index 0000000..5fc5112
--- /dev/null
+++ b/src/lib/istream.c
@@ -0,0 +1,1316 @@
+/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "array.h"
+#include "str.h"
+#include "memarea.h"
+#include "istream-private.h"
+
+static bool i_stream_is_buffer_invalid(const struct istream_private *stream);
+
+void i_stream_set_name(struct istream *stream, const char *name)
+{
+ i_free(stream->real_stream->iostream.name);
+ stream->real_stream->iostream.name = i_strdup(name);
+}
+
+const char *i_stream_get_name(struct istream *stream)
+{
+ while (stream->real_stream->iostream.name == NULL) {
+ stream = stream->real_stream->parent;
+ if (stream == NULL)
+ return "";
+ }
+ return stream->real_stream->iostream.name;
+}
+
+static void i_stream_close_full(struct istream *stream, bool close_parents)
+{
+ io_stream_close(&stream->real_stream->iostream, close_parents);
+ stream->closed = TRUE;
+
+ if (stream->stream_errno == 0)
+ stream->stream_errno = EPIPE;
+}
+
+void i_stream_destroy(struct istream **stream)
+{
+ if (*stream == NULL)
+ return;
+
+ i_stream_close_full(*stream, FALSE);
+ i_stream_unref(stream);
+}
+
+void i_stream_ref(struct istream *stream)
+{
+ io_stream_ref(&stream->real_stream->iostream);
+}
+
+void i_stream_unref(struct istream **stream)
+{
+ struct istream_private *_stream;
+
+ if (*stream == NULL)
+ return;
+
+ _stream = (*stream)->real_stream;
+
+ if (_stream->iostream.refcount > 1) {
+ if (!io_stream_unref(&_stream->iostream))
+ i_unreached();
+ } else {
+ /* The snapshot may contain pointers to the parent istreams.
+ Free it before io_stream_unref() frees the parents. */
+ i_stream_snapshot_free(&_stream->prev_snapshot);
+
+ if (io_stream_unref(&_stream->iostream))
+ i_unreached();
+ str_free(&_stream->line_str);
+ i_stream_unref(&_stream->parent);
+ io_stream_free(&_stream->iostream);
+ }
+ *stream = NULL;
+}
+
+#undef i_stream_add_destroy_callback
+void i_stream_add_destroy_callback(struct istream *stream,
+ istream_callback_t *callback, void *context)
+{
+ io_stream_add_destroy_callback(&stream->real_stream->iostream,
+ callback, context);
+}
+
+void i_stream_remove_destroy_callback(struct istream *stream,
+ void (*callback)())
+{
+ io_stream_remove_destroy_callback(&stream->real_stream->iostream,
+ callback);
+}
+
+int i_stream_get_fd(struct istream *stream)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ return _stream->fd;
+}
+
+void i_stream_copy_fd(struct istream *dest, struct istream *source)
+{
+ int fd = i_stream_get_fd(source);
+
+ i_assert(fd != -1);
+ i_assert(dest->real_stream->fd == -1);
+ dest->real_stream->fd = fd;
+ dest->readable_fd = source->readable_fd;
+}
+
+const char *i_stream_get_error(struct istream *stream)
+{
+ struct istream *s;
+
+ /* we'll only return errors for streams that have stream_errno set or
+ that have reached EOF. we might be returning unintended error
+ otherwise. */
+ if (stream->stream_errno == 0)
+ return stream->eof ? "EOF" : "<no error>";
+
+ for (s = stream; s != NULL; s = s->real_stream->parent) {
+ if (s->stream_errno == 0)
+ break;
+ if (s->real_stream->iostream.error != NULL)
+ return s->real_stream->iostream.error;
+ }
+ return strerror(stream->stream_errno);
+}
+
+const char *i_stream_get_disconnect_reason(struct istream *stream)
+{
+ return io_stream_get_disconnect_reason(stream, NULL);
+}
+
+void i_stream_close(struct istream *stream)
+{
+ if (stream != NULL)
+ i_stream_close_full(stream, TRUE);
+}
+
+void i_stream_set_init_buffer_size(struct istream *stream, size_t size)
+{
+ stream->real_stream->init_buffer_size = size;
+}
+
+void i_stream_set_max_buffer_size(struct istream *stream, size_t max_size)
+{
+ io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
+}
+
+size_t i_stream_get_max_buffer_size(struct istream *stream)
+{
+ size_t max_size = 0;
+
+ do {
+ if (max_size < stream->real_stream->max_buffer_size)
+ max_size = stream->real_stream->max_buffer_size;
+ stream = stream->real_stream->parent;
+ } while (stream != NULL);
+ return max_size;
+}
+
+void i_stream_set_return_partial_line(struct istream *stream, bool set)
+{
+ stream->real_stream->return_nolf_line = set;
+}
+
+void i_stream_set_persistent_buffers(struct istream *stream, bool set)
+{
+ do {
+ stream->real_stream->nonpersistent_buffers = !set;
+ stream = stream->real_stream->parent;
+ } while (stream != NULL);
+}
+
+void i_stream_set_blocking(struct istream *stream, bool blocking)
+{
+ int prev_fd = -1;
+
+ do {
+ stream->blocking = blocking;
+ if (stream->real_stream->fd != -1 &&
+ stream->real_stream->fd != prev_fd) {
+ fd_set_nonblock(stream->real_stream->fd, !blocking);
+ prev_fd = stream->real_stream->fd;
+ }
+ stream = stream->real_stream->parent;
+ } while (stream != NULL);
+}
+
+static void i_stream_update(struct istream_private *stream)
+{
+ if (stream->parent == NULL)
+ stream->access_counter++;
+ else {
+ stream->access_counter =
+ stream->parent->real_stream->access_counter;
+ stream->parent_expected_offset = stream->parent->v_offset;
+ }
+}
+
+static bool snapshot_has_memarea(struct istream_snapshot *snapshot,
+ struct memarea *memarea)
+{
+ if (snapshot->old_memarea == memarea)
+ return TRUE;
+ if (snapshot->prev_snapshot != NULL)
+ return snapshot_has_memarea(snapshot->prev_snapshot, memarea);
+ return FALSE;
+}
+
+struct istream_snapshot *
+i_stream_default_snapshot(struct istream_private *stream,
+ struct istream_snapshot *prev_snapshot)
+{
+ struct istream_snapshot *snapshot;
+
+ if (stream->memarea != NULL) {
+ if (prev_snapshot != NULL) {
+ if (snapshot_has_memarea(prev_snapshot, stream->memarea))
+ return prev_snapshot;
+ }
+ /* This stream has a memarea. Reference it, so we can later on
+ rollback if needed. */
+ snapshot = i_new(struct istream_snapshot, 1);
+ snapshot->old_memarea = stream->memarea;
+ snapshot->prev_snapshot = prev_snapshot;
+ memarea_ref(snapshot->old_memarea);
+ return snapshot;
+ }
+ if (stream->parent == NULL) {
+ if (stream->nonpersistent_buffers) {
+ /* Assume that memarea would be used normally, but
+ now it's NULL because the buffer is empty and
+ empty buffers are freed. */
+ i_assert(stream->skip == stream->pos);
+ return prev_snapshot;
+ }
+ i_panic("%s is missing istream.snapshot() implementation",
+ i_stream_get_name(&stream->istream));
+ }
+ struct istream_private *_parent_stream =
+ stream->parent->real_stream;
+ return _parent_stream->snapshot(_parent_stream, prev_snapshot);
+}
+
+void i_stream_snapshot_free(struct istream_snapshot **_snapshot)
+{
+ struct istream_snapshot *snapshot = *_snapshot;
+
+ if (*_snapshot == NULL)
+ return;
+ *_snapshot = NULL;
+
+ i_stream_snapshot_free(&snapshot->prev_snapshot);
+ if (snapshot->free != NULL)
+ snapshot->free(snapshot);
+ else {
+ if (snapshot->old_memarea != NULL)
+ memarea_unref(&snapshot->old_memarea);
+ i_stream_unref(&snapshot->istream);
+ i_free(snapshot);
+ }
+}
+
+static struct istream_snapshot *
+i_stream_noop_snapshot(struct istream_private *stream ATTR_UNUSED,
+ struct istream_snapshot *prev_snapshot)
+{
+ return prev_snapshot;
+}
+
+ssize_t i_stream_read(struct istream *stream)
+{
+ struct istream_private *_stream = stream->real_stream;
+ ssize_t ret;
+#ifdef DEBUG
+ unsigned char prev_buf[4];
+ const unsigned char *prev_data = _stream->buffer;
+ size_t prev_skip = _stream->skip, prev_pos = _stream->pos;
+ bool invalid = i_stream_is_buffer_invalid(_stream);
+
+ i_assert(prev_skip <= prev_pos);
+ if (invalid)
+ ;
+ else if (prev_pos - prev_skip <= 4)
+ memcpy(prev_buf, prev_data + prev_skip, prev_pos - prev_skip);
+ else {
+ memcpy(prev_buf, prev_data + prev_skip, 2);
+ memcpy(prev_buf+2, prev_data + prev_pos - 2, 2);
+ }
+#endif
+
+ if (_stream->skip != _stream->pos || _stream->prev_snapshot != NULL) {
+ _stream->prev_snapshot =
+ _stream->snapshot(_stream, _stream->prev_snapshot);
+ }
+ ret = i_stream_read_memarea(stream);
+ if (ret > 0)
+ i_stream_snapshot_free(&_stream->prev_snapshot);
+#ifdef DEBUG
+ else if (!invalid) {
+ i_assert((_stream->pos - _stream->skip) == (prev_pos - prev_skip) ||
+ prev_pos == prev_skip);
+ if (prev_pos - prev_skip <= 4)
+ i_assert(memcmp(prev_buf, prev_data + prev_skip, prev_pos - prev_skip) == 0);
+ else {
+ i_assert(memcmp(prev_buf, prev_data + prev_skip, 2) == 0);
+ i_assert(memcmp(prev_buf+2, prev_data + prev_pos - 2, 2) == 0);
+ }
+ }
+#endif
+ return ret;
+}
+
+ssize_t i_stream_read_memarea(struct istream *stream)
+{
+ struct istream_private *_stream = stream->real_stream;
+ size_t old_size;
+ ssize_t ret;
+
+ if (unlikely(stream->closed || stream->stream_errno != 0)) {
+ stream->eof = TRUE;
+ errno = stream->stream_errno;
+ return -1;
+ }
+
+ stream->eof = FALSE;
+
+ if (_stream->parent != NULL)
+ i_stream_seek(_stream->parent, _stream->parent_expected_offset);
+
+ old_size = _stream->pos - _stream->skip;
+ if (_stream->pos < _stream->high_pos) {
+ /* we're here because we seeked back within the read buffer. */
+ ret = _stream->high_pos - _stream->pos;
+ _stream->pos = _stream->high_pos;
+ _stream->high_pos = 0;
+ } else {
+ _stream->high_pos = 0;
+ ret = _stream->read(_stream);
+ }
+ i_assert(old_size <= _stream->pos - _stream->skip);
+ switch (ret) {
+ case -2:
+ i_assert(_stream->skip != _stream->pos);
+ break;
+ case -1:
+ if (stream->stream_errno != 0) {
+ /* error handling should be easier if we now just
+ assume the stream is now at EOF */
+ stream->eof = TRUE;
+ errno = stream->stream_errno;
+ } else {
+ i_assert(stream->eof);
+ i_assert(old_size == _stream->pos - _stream->skip);
+ }
+ break;
+ case 0:
+ i_assert(!stream->blocking);
+ break;
+ default:
+ i_assert(ret > 0);
+ i_assert(_stream->skip < _stream->pos);
+ i_assert((size_t)ret+old_size == _stream->pos - _stream->skip);
+ _stream->last_read_timeval = ioloop_timeval;
+ break;
+ }
+
+ if (stream->stream_errno != 0) {
+ /* error handling should be easier if we now just
+ assume the stream is now at EOF. Note that we could get here
+ even if read() didn't return -1, although that's a little
+ bit sloppy istream implementation. */
+ stream->eof = TRUE;
+ }
+
+ i_stream_update(_stream);
+ /* verify that parents' access_counters are valid. the parent's
+ i_stream_read() should guarantee this. */
+ i_assert(!i_stream_is_buffer_invalid(_stream));
+ return ret;
+}
+
+int i_stream_read_more_memarea(struct istream *stream,
+ const unsigned char **data_r, size_t *size_r)
+{
+ *data_r = i_stream_get_data(stream, size_r);
+ if (*size_r > 0)
+ return 1;
+
+ int ret = i_stream_read_memarea(stream);
+ *data_r = i_stream_get_data(stream, size_r);
+ return ret;
+}
+
+void i_stream_get_last_read_time(struct istream *stream, struct timeval *tv_r)
+{
+ *tv_r = stream->real_stream->last_read_timeval;
+}
+
+ssize_t i_stream_read_copy_from_parent(struct istream *istream)
+{
+ struct istream_private *stream = istream->real_stream;
+ size_t pos;
+ ssize_t ret;
+
+ stream->pos -= stream->skip;
+ stream->skip = 0;
+
+ stream->buffer = i_stream_get_data(stream->parent, &pos);
+ if (pos > stream->pos)
+ ret = 0;
+ else do {
+ ret = i_stream_read_memarea(stream->parent);
+ stream->istream.stream_errno = stream->parent->stream_errno;
+ stream->istream.eof = stream->parent->eof;
+ stream->buffer = i_stream_get_data(stream->parent, &pos);
+ /* check again, in case the parent stream had been seeked
+ backwards and the previous read() didn't get us far
+ enough. */
+ } while (pos <= stream->pos && ret > 0);
+ if (ret == -2) {
+ i_stream_update(stream);
+ return -2;
+ }
+
+ ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) :
+ (ret == 0 ? 0 : -1);
+ stream->pos = pos;
+ i_assert(ret != -1 || stream->istream.eof ||
+ stream->istream.stream_errno != 0);
+ i_stream_update(stream);
+ return ret;
+}
+
+void i_stream_free_buffer(struct istream_private *stream)
+{
+ if (stream->memarea != NULL) {
+ memarea_unref(&stream->memarea);
+ stream->w_buffer = NULL;
+ } else if (stream->w_buffer != NULL) {
+ i_free_and_null(stream->w_buffer);
+ } else {
+ /* don't know how to free it */
+ return;
+ }
+ stream->buffer_size = 0;
+}
+
+void i_stream_skip(struct istream *stream, uoff_t count)
+{
+ struct istream_private *_stream = stream->real_stream;
+ size_t data_size;
+
+ data_size = _stream->pos - _stream->skip;
+ if (count <= data_size) {
+ /* within buffer */
+ stream->v_offset += count;
+ _stream->skip += count;
+ if (_stream->nonpersistent_buffers &&
+ _stream->skip == _stream->pos) {
+ _stream->skip = _stream->pos = 0;
+ i_stream_free_buffer(_stream);
+ }
+ return;
+ }
+
+ /* have to seek forward */
+ count -= data_size;
+ _stream->skip = _stream->pos;
+ stream->v_offset += data_size;
+
+ if (unlikely(stream->closed || stream->stream_errno != 0))
+ return;
+
+ _stream->seek(_stream, stream->v_offset + count, FALSE);
+}
+
+static bool i_stream_can_optimize_seek(struct istream_private *stream)
+{
+ if (stream->parent == NULL)
+ return TRUE;
+
+ /* use the fast route only if the parent stream hasn't been changed */
+ if (stream->access_counter !=
+ stream->parent->real_stream->access_counter)
+ return FALSE;
+
+ return i_stream_can_optimize_seek(stream->parent->real_stream);
+}
+
+void i_stream_seek(struct istream *stream, uoff_t v_offset)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (v_offset >= stream->v_offset &&
+ i_stream_can_optimize_seek(_stream))
+ i_stream_skip(stream, v_offset - stream->v_offset);
+ else {
+ if (unlikely(stream->closed || stream->stream_errno != 0)) {
+ stream->eof = TRUE;
+ return;
+ }
+ stream->eof = FALSE;
+ _stream->seek(_stream, v_offset, FALSE);
+ }
+ i_stream_update(_stream);
+}
+
+void i_stream_seek_mark(struct istream *stream, uoff_t v_offset)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (unlikely(stream->closed || stream->stream_errno != 0))
+ return;
+
+ stream->eof = FALSE;
+ _stream->seek(_stream, v_offset, TRUE);
+ i_stream_update(_stream);
+}
+
+void i_stream_sync(struct istream *stream)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (unlikely(stream->closed || stream->stream_errno != 0))
+ return;
+
+ if (_stream->sync != NULL) {
+ _stream->sync(_stream);
+ i_stream_update(_stream);
+ }
+}
+
+int i_stream_stat(struct istream *stream, bool exact, const struct stat **st_r)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (unlikely(stream->closed || stream->stream_errno != 0))
+ return -1;
+
+ if (_stream->stat(_stream, exact) < 0) {
+ stream->eof = TRUE;
+ return -1;
+ }
+ *st_r = &_stream->statbuf;
+ return 0;
+}
+
+int i_stream_get_size(struct istream *stream, bool exact, uoff_t *size_r)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (unlikely(stream->closed || stream->stream_errno != 0))
+ return -1;
+
+ int ret;
+ if ((ret = _stream->get_size(_stream, exact, size_r)) < 0)
+ stream->eof = TRUE;
+ return ret;
+}
+
+bool i_stream_have_bytes_left(struct istream *stream)
+{
+ return i_stream_get_data_size(stream) > 0 || !stream->eof;
+}
+
+bool i_stream_read_eof(struct istream *stream)
+{
+ if (i_stream_get_data_size(stream) == 0)
+ (void)i_stream_read(stream);
+ return !i_stream_have_bytes_left(stream);
+}
+
+uoff_t i_stream_get_absolute_offset(struct istream *stream)
+{
+ uoff_t abs_offset = stream->v_offset;
+ while (stream != NULL) {
+ abs_offset += stream->real_stream->start_offset;
+ stream = stream->real_stream->parent;
+ }
+ return abs_offset;
+}
+
+static char *i_stream_next_line_finish(struct istream_private *stream, size_t i)
+{
+ char *ret;
+ size_t end;
+
+ if (i > stream->skip && stream->buffer[i-1] == '\r') {
+ end = i - 1;
+ stream->line_crlf = TRUE;
+ } else {
+ end = i;
+ stream->line_crlf = FALSE;
+ }
+
+ if (stream->buffer == stream->w_buffer &&
+ end < stream->buffer_size) {
+ /* modify the buffer directly */
+ stream->w_buffer[end] = '\0';
+ ret = (char *)stream->w_buffer + stream->skip;
+ } else {
+ /* use a temporary string to return it */
+ if (stream->line_str == NULL)
+ stream->line_str = str_new(default_pool, 256);
+ str_truncate(stream->line_str, 0);
+ if (stream->skip < end)
+ str_append_data(stream->line_str, stream->buffer + stream->skip,
+ end - stream->skip);
+ ret = str_c_modifiable(stream->line_str);
+ }
+
+ if (i < stream->pos)
+ i++;
+ stream->istream.v_offset += i - stream->skip;
+ stream->skip = i;
+ return ret;
+}
+
+static char *i_stream_last_line(struct istream_private *_stream)
+{
+ if (_stream->istream.eof && _stream->skip != _stream->pos &&
+ _stream->return_nolf_line) {
+ /* the last line is missing LF and we want to return it. */
+ return i_stream_next_line_finish(_stream, _stream->pos);
+ }
+ return NULL;
+}
+
+char *i_stream_next_line(struct istream *stream)
+{
+ struct istream_private *_stream = stream->real_stream;
+ const unsigned char *pos;
+
+ if (_stream->skip >= _stream->pos)
+ return NULL;
+
+ pos = memchr(_stream->buffer + _stream->skip, '\n',
+ _stream->pos - _stream->skip);
+ if (pos != NULL) {
+ return i_stream_next_line_finish(_stream,
+ pos - _stream->buffer);
+ } else {
+ return i_stream_last_line(_stream);
+ }
+}
+
+char *i_stream_read_next_line(struct istream *stream)
+{
+ char *line;
+
+ for (;;) {
+ line = i_stream_next_line(stream);
+ if (line != NULL)
+ break;
+
+ switch (i_stream_read(stream)) {
+ case -2:
+ io_stream_set_error(&stream->real_stream->iostream,
+ "Line is too long (over %zu"
+ " bytes at offset %"PRIuUOFF_T")",
+ i_stream_get_data_size(stream), stream->v_offset);
+ stream->stream_errno = errno = ENOBUFS;
+ stream->eof = TRUE;
+ return NULL;
+ case -1:
+ return i_stream_last_line(stream->real_stream);
+ case 0:
+ return NULL;
+ }
+ }
+ return line;
+}
+
+bool i_stream_last_line_crlf(struct istream *stream)
+{
+ return stream->real_stream->line_crlf;
+}
+
+static bool i_stream_is_buffer_invalid(const struct istream_private *stream)
+{
+ if (stream->parent == NULL) {
+ /* the buffer can't point to parent, because it doesn't exist */
+ return FALSE;
+ }
+ if (stream->w_buffer != NULL) {
+ /* we can pretty safely assume that the stream is using its
+ own private buffer, so it can never become invalid. */
+ return FALSE;
+ }
+ if (stream->access_counter !=
+ stream->parent->real_stream->access_counter) {
+ /* parent has been modified behind this stream, we can't trust
+ that our buffer is valid */
+ return TRUE;
+ }
+ return i_stream_is_buffer_invalid(stream->parent->real_stream);
+}
+
+const unsigned char *
+i_stream_get_data(struct istream *stream, size_t *size_r)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (_stream->skip >= _stream->pos) {
+ *size_r = 0;
+ return uchar_empty_ptr;
+ }
+
+ if (unlikely(i_stream_is_buffer_invalid(_stream))) {
+ /* This stream may be using parent's buffer directly as
+ _stream->buffer, but the parent stream has already been
+ modified indirectly. This means that the buffer might no
+ longer point to where we assume it points to. So we'll
+ just return the stream as empty until it's read again.
+
+ It's a bit ugly to suddenly drop data from the stream that
+ was already read, but since this happens only with shared
+ parent istreams the caller is hopefully aware enough that
+ something like this might happen. The other solutions would
+ be to a) try to automatically read the data back (but we
+ can't handle errors..) or b) always copy data to stream's
+ own buffer instead of pointing to parent's buffer (but this
+ causes data copying that is nearly always unnecessary). */
+ *size_r = 0;
+ /* if we had already read until EOF, mark the stream again as
+ not being at the end of file. */
+ if (stream->stream_errno == 0) {
+ _stream->skip = _stream->pos = 0;
+ stream->eof = FALSE;
+ }
+ return uchar_empty_ptr;
+ }
+
+ *size_r = _stream->pos - _stream->skip;
+ return _stream->buffer + _stream->skip;
+}
+
+size_t i_stream_get_data_size(struct istream *stream)
+{
+ size_t size;
+
+ (void)i_stream_get_data(stream, &size);
+ return size;
+}
+
+unsigned char *i_stream_get_modifiable_data(struct istream *stream,
+ size_t *size_r)
+{
+ struct istream_private *_stream = stream->real_stream;
+
+ if (_stream->skip >= _stream->pos || _stream->w_buffer == NULL) {
+ *size_r = 0;
+ return NULL;
+ }
+
+ *size_r = _stream->pos - _stream->skip;
+ return _stream->w_buffer + _stream->skip;
+}
+
+int i_stream_read_data(struct istream *stream, const unsigned char **data_r,
+ size_t *size_r, size_t threshold)
+{
+ ssize_t ret = 0;
+ bool read_more = FALSE;
+
+ do {
+ *data_r = i_stream_get_data(stream, size_r);
+ if (*size_r > threshold)
+ return 1;
+
+ /* we need more data */
+ ret = i_stream_read(stream);
+ if (ret > 0)
+ read_more = TRUE;
+ } while (ret > 0);
+
+ *data_r = i_stream_get_data(stream, size_r);
+ if (ret == -2)
+ return -2;
+
+ if (ret == 0) {
+ /* need to read more */
+ i_assert(!stream->blocking);
+ return 0;
+ }
+ if (stream->eof) {
+ if (read_more) {
+ /* we read at least some new data */
+ return 0;
+ }
+ } else {
+ i_assert(stream->stream_errno != 0);
+ }
+ return -1;
+}
+
+int i_stream_read_limited(struct istream *stream, const unsigned char **data_r,
+ size_t *size_r, size_t limit)
+{
+ struct istream_private *_stream = stream->real_stream;
+ int ret;
+
+ *data_r = i_stream_get_data(stream, size_r);
+ if (*size_r >= limit) {
+ *size_r = limit;
+ return 1;
+ }
+
+ _stream->data_limit = limit;
+ ret = i_stream_read_more(stream, data_r, size_r);
+ _stream->data_limit = 0;
+
+ if (*size_r >= limit)
+ *size_r = limit;
+ return ret;
+}
+
+void i_stream_compress(struct istream_private *stream)
+{
+ i_assert(stream->memarea == NULL ||
+ memarea_get_refcount(stream->memarea) == 1);
+
+ if (stream->skip != stream->pos) {
+ memmove(stream->w_buffer, stream->w_buffer + stream->skip,
+ stream->pos - stream->skip);
+ }
+ stream->pos -= stream->skip;
+
+ stream->skip = 0;
+}
+
+static void i_stream_w_buffer_free(void *buf)
+{
+ i_free(buf);
+}
+
+static void
+i_stream_w_buffer_realloc(struct istream_private *stream, size_t old_size)
+{
+ void *new_buffer;
+
+ if (stream->memarea != NULL &&
+ memarea_get_refcount(stream->memarea) == 1) {
+ /* Nobody else is referencing the memarea.
+ We can just reallocate it. */
+ memarea_free_without_callback(&stream->memarea);
+ new_buffer = i_realloc(stream->w_buffer, old_size,
+ stream->buffer_size);
+ } else {
+ new_buffer = i_malloc(stream->buffer_size);
+ if (old_size > 0) {
+ i_assert(stream->w_buffer != NULL);
+ memcpy(new_buffer, stream->w_buffer, old_size);
+ }
+ if (stream->memarea != NULL)
+ memarea_unref(&stream->memarea);
+ }
+
+ stream->w_buffer = new_buffer;
+ stream->buffer = new_buffer;
+
+ stream->memarea = memarea_init(stream->w_buffer, stream->buffer_size,
+ i_stream_w_buffer_free, new_buffer);
+}
+
+void i_stream_grow_buffer(struct istream_private *stream, size_t bytes)
+{
+ size_t old_size, max_size;
+
+ old_size = stream->buffer_size;
+
+ stream->buffer_size = stream->pos + bytes;
+ if (stream->buffer_size <= stream->init_buffer_size)
+ stream->buffer_size = stream->init_buffer_size;
+ else
+ stream->buffer_size = nearest_power(stream->buffer_size);
+
+ max_size = i_stream_get_max_buffer_size(&stream->istream);
+ i_assert(max_size > 0);
+ if (stream->buffer_size > max_size)
+ stream->buffer_size = max_size;
+
+ if (stream->buffer_size <= old_size)
+ stream->buffer_size = old_size;
+ else
+ i_stream_w_buffer_realloc(stream, old_size);
+}
+
+bool i_stream_try_alloc(struct istream_private *stream,
+ size_t wanted_size, size_t *size_r)
+{
+ i_assert(wanted_size > 0);
+ i_assert(stream->buffer_size >= stream->pos);
+
+ if (wanted_size > stream->buffer_size - stream->pos) {
+ if (stream->skip > 0) {
+ /* remove the unused bytes from beginning of buffer */
+ if (stream->memarea != NULL &&
+ memarea_get_refcount(stream->memarea) > 1) {
+ /* The memarea is still referenced. We can't
+ overwrite data until extra references are
+ gone. */
+ i_stream_w_buffer_realloc(stream, stream->buffer_size);
+ }
+ i_stream_compress(stream);
+ } else if (stream->buffer_size < i_stream_get_max_buffer_size(&stream->istream)) {
+ /* buffer is full - grow it */
+ i_stream_grow_buffer(stream, I_STREAM_MIN_SIZE);
+ }
+ }
+
+ if (stream->data_limit == 0 ||
+ (stream->buffer_size - stream->skip) < stream->data_limit)
+ *size_r = stream->buffer_size - stream->pos;
+ else {
+ size_t buffered = (stream->pos - stream->skip);
+
+ if (buffered >= stream->data_limit)
+ *size_r = 0;
+ else
+ *size_r = stream->data_limit - buffered;
+ }
+ i_assert(stream->w_buffer != NULL || *size_r == 0);
+ return *size_r > 0;
+}
+
+bool ATTR_NOWARN_UNUSED_RESULT
+i_stream_try_alloc_avoid_compress(struct istream_private *stream,
+ size_t wanted_size, size_t *size_r)
+{
+ size_t old_skip = stream->skip;
+
+ /* try first with skip=0, so no compression is done */
+ stream->skip = 0;
+ bool ret = i_stream_try_alloc(stream, wanted_size, size_r);
+ stream->skip = old_skip;
+ if (ret || old_skip == 0)
+ return ret;
+ /* it's full. try with compression. */
+ return i_stream_try_alloc(stream, wanted_size, size_r);
+}
+
+void *i_stream_alloc(struct istream_private *stream, size_t size)
+{
+ size_t old_size, avail_size;
+
+ (void)i_stream_try_alloc(stream, size, &avail_size);
+ if (avail_size < size) {
+ old_size = stream->buffer_size;
+ stream->buffer_size = nearest_power(stream->pos + size);
+ i_stream_w_buffer_realloc(stream, old_size);
+
+ (void)i_stream_try_alloc(stream, size, &avail_size);
+ i_assert(avail_size >= size);
+ }
+ return stream->w_buffer + stream->pos;
+}
+
+void i_stream_memarea_detach(struct istream_private *stream)
+{
+ if (stream->memarea != NULL) {
+ /* Don't overwrite data in a snapshot. Allocate a new
+ buffer instead. */
+ memarea_unref(&stream->memarea);
+ stream->buffer_size = 0;
+ stream->buffer = NULL;
+ stream->w_buffer = NULL;
+ }
+}
+
+bool i_stream_add_data(struct istream *_stream, const unsigned char *data,
+ size_t size)
+{
+ struct istream_private *stream = _stream->real_stream;
+ size_t size2;
+
+ (void)i_stream_try_alloc(stream, size, &size2);
+ if (size > size2)
+ return FALSE;
+
+ memcpy(stream->w_buffer + stream->pos, data, size);
+ stream->pos += size;
+ return TRUE;
+}
+
+struct istream *i_stream_get_root_io(struct istream *stream)
+{
+ while (stream->real_stream->parent != NULL) {
+ i_assert(stream->real_stream->io == NULL);
+ stream = stream->real_stream->parent;
+ }
+ return stream;
+}
+
+void i_stream_set_input_pending(struct istream *stream, bool pending)
+{
+ if (!pending)
+ return;
+
+ stream = i_stream_get_root_io(stream);
+ if (stream->real_stream->io != NULL)
+ io_set_pending(stream->real_stream->io);
+ else
+ stream->real_stream->io_pending = TRUE;
+}
+
+void i_stream_switch_ioloop_to(struct istream *stream, struct ioloop *ioloop)
+{
+ io_stream_switch_ioloop_to(&stream->real_stream->iostream, ioloop);
+
+ do {
+ if (stream->real_stream->switch_ioloop_to != NULL) {
+ stream->real_stream->switch_ioloop_to(
+ stream->real_stream, ioloop);
+ }
+ stream = stream->real_stream->parent;
+ } while (stream != NULL);
+}
+
+void i_stream_switch_ioloop(struct istream *stream)
+{
+ i_stream_switch_ioloop_to(stream, current_ioloop);
+}
+
+void i_stream_set_io(struct istream *stream, struct io *io)
+{
+ stream = i_stream_get_root_io(stream);
+
+ i_assert(stream->real_stream->io == NULL);
+ stream->real_stream->io = io;
+ if (stream->real_stream->io_pending) {
+ io_set_pending(io);
+ stream->real_stream->io_pending = FALSE;
+ }
+}
+
+void i_stream_unset_io(struct istream *stream, struct io *io)
+{
+ stream = i_stream_get_root_io(stream);
+
+ i_assert(stream->real_stream->io == io);
+ if (io_is_pending(io))
+ stream->real_stream->io_pending = TRUE;
+ stream->real_stream->io = NULL;
+}
+
+static void
+i_stream_default_set_max_buffer_size(struct iostream_private *stream,
+ size_t max_size)
+{
+ struct istream_private *_stream =
+ container_of(stream, struct istream_private, iostream);
+
+ _stream->max_buffer_size = max_size;
+ if (_stream->parent != NULL)
+ i_stream_set_max_buffer_size(_stream->parent, max_size);
+}
+
+static void i_stream_default_close(struct iostream_private *stream,
+ bool close_parent)
+{
+ struct istream_private *_stream =
+ container_of(stream, struct istream_private, iostream);
+
+ if (close_parent)
+ i_stream_close(_stream->parent);
+}
+
+static void i_stream_default_destroy(struct iostream_private *stream)
+{
+ struct istream_private *_stream =
+ container_of(stream, struct istream_private, iostream);
+
+ i_stream_free_buffer(_stream);
+ i_stream_unref(&_stream->parent);
+}
+
+static void
+i_stream_default_seek_seekable(struct istream_private *stream,
+ uoff_t v_offset, bool mark ATTR_UNUSED)
+{
+ stream->istream.v_offset = v_offset;
+ stream->skip = stream->pos = 0;
+}
+
+void i_stream_default_seek_nonseekable(struct istream_private *stream,
+ uoff_t v_offset, bool mark ATTR_UNUSED)
+{
+ size_t available;
+
+ if (stream->istream.v_offset > v_offset)
+ i_panic("stream %s doesn't support seeking backwards",
+ i_stream_get_name(&stream->istream));
+
+ while (stream->istream.v_offset < v_offset) {
+ (void)i_stream_read(&stream->istream);
+
+ available = stream->pos - stream->skip;
+ if (available == 0) {
+ if (stream->istream.stream_errno != 0) {
+ /* read failed */
+ return;
+ }
+ io_stream_set_error(&stream->iostream,
+ "Can't seek to offset %"PRIuUOFF_T
+ ", because we have data only up to offset %"
+ PRIuUOFF_T" (eof=%d)", v_offset,
+ stream->istream.v_offset, stream->istream.eof ? 1 : 0);
+ stream->istream.stream_errno = ESPIPE;
+ return;
+ }
+ if (available <= v_offset - stream->istream.v_offset)
+ i_stream_skip(&stream->istream, available);
+ else {
+ i_stream_skip(&stream->istream,
+ v_offset - stream->istream.v_offset);
+ }
+ }
+}
+
+bool i_stream_nonseekable_try_seek(struct istream_private *stream,
+ uoff_t v_offset)
+{
+ uoff_t start_offset = stream->istream.v_offset - stream->skip;
+
+ if (v_offset < start_offset) {
+ /* have to seek backwards */
+ i_stream_seek(stream->parent, stream->parent_start_offset);
+ stream->parent_expected_offset = stream->parent_start_offset;
+ stream->skip = stream->pos = 0;
+ stream->istream.v_offset = 0;
+ stream->high_pos = 0;
+ return FALSE;
+ }
+
+ if (v_offset <= start_offset + stream->pos) {
+ /* seeking backwards within what's already cached */
+ stream->skip = v_offset - start_offset;
+ stream->istream.v_offset = v_offset;
+ if (stream->high_pos == 0)
+ stream->high_pos = stream->pos;
+ stream->pos = stream->skip;
+ } else {
+ /* read forward */
+ i_stream_default_seek_nonseekable(stream, v_offset, FALSE);
+ }
+ return TRUE;
+}
+
+static int
+seekable_i_stream_get_size(struct istream_private *stream)
+{
+ if (stream->cached_stream_size == UOFF_T_MAX) {
+ uoff_t old_offset = stream->istream.v_offset;
+ ssize_t ret;
+
+ do {
+ i_stream_skip(&stream->istream,
+ i_stream_get_data_size(&stream->istream));
+ } while ((ret = i_stream_read(&stream->istream)) > 0);
+ i_assert(ret == -1);
+ if (stream->istream.stream_errno != 0)
+ return -1;
+
+ stream->cached_stream_size = stream->istream.v_offset;
+ i_stream_seek(&stream->istream, old_offset);
+ }
+ stream->statbuf.st_size = stream->cached_stream_size;
+ return 0;
+}
+
+static int
+i_stream_default_stat(struct istream_private *stream, bool exact)
+{
+ const struct stat *st;
+
+ if (stream->parent == NULL)
+ return stream->istream.stream_errno == 0 ? 0 : -1;
+
+ if (i_stream_stat(stream->parent, exact, &st) < 0) {
+ stream->istream.stream_errno = stream->parent->stream_errno;
+ return -1;
+ }
+ stream->statbuf = *st;
+ if (exact && !stream->stream_size_passthrough) {
+ /* exact size is not known, even if parent returned something */
+ stream->statbuf.st_size = -1;
+ if (stream->istream.seekable) {
+ if (seekable_i_stream_get_size(stream) < 0)
+ return -1;
+ }
+ } else {
+ /* When exact=FALSE always return the parent stat's size, even
+ if we know the exact value. This is necessary because
+ otherwise e.g. mbox code can see two different values and
+ think that the mbox file keeps changing. */
+ }
+ return 0;
+}
+
+static int
+i_stream_default_get_size(struct istream_private *stream,
+ bool exact, uoff_t *size_r)
+{
+ if (stream->stat(stream, exact) < 0)
+ return -1;
+ if (stream->statbuf.st_size == -1)
+ return 0;
+
+ *size_r = stream->statbuf.st_size;
+ return 1;
+}
+
+void i_stream_init_parent(struct istream_private *_stream,
+ struct istream *parent)
+{
+ _stream->access_counter = parent->real_stream->access_counter;
+ _stream->parent = parent;
+ _stream->parent_start_offset = parent->v_offset;
+ _stream->parent_expected_offset = parent->v_offset;
+ _stream->start_offset = parent->v_offset;
+ /* if parent stream is an istream-error, copy the error */
+ _stream->istream.stream_errno = parent->stream_errno;
+ _stream->istream.eof = parent->eof;
+ i_stream_ref(parent);
+}
+
+struct istream *
+i_stream_create(struct istream_private *_stream, struct istream *parent, int fd,
+ enum istream_create_flag flags)
+{
+ bool noop_snapshot = (flags & ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT) != 0;
+
+ _stream->fd = fd;
+ if (parent != NULL)
+ i_stream_init_parent(_stream, parent);
+ else if (_stream->memarea == NULL && !noop_snapshot) {
+ /* The stream has no parent and no memarea yet. We'll assume
+ that it wants to be using memareas for the reads. */
+ _stream->memarea = memarea_init_empty();
+ }
+ _stream->istream.real_stream = _stream;
+
+ if (_stream->iostream.close == NULL)
+ _stream->iostream.close = i_stream_default_close;
+ if (_stream->iostream.destroy == NULL)
+ _stream->iostream.destroy = i_stream_default_destroy;
+ if (_stream->seek == NULL) {
+ _stream->seek = _stream->istream.seekable ?
+ i_stream_default_seek_seekable :
+ i_stream_default_seek_nonseekable;
+ }
+ if (_stream->stat == NULL)
+ _stream->stat = i_stream_default_stat;
+ if (_stream->get_size == NULL)
+ _stream->get_size = i_stream_default_get_size;
+ if (_stream->snapshot == NULL) {
+ _stream->snapshot = noop_snapshot ?
+ i_stream_noop_snapshot :
+ i_stream_default_snapshot;
+ }
+ if (_stream->iostream.set_max_buffer_size == NULL) {
+ _stream->iostream.set_max_buffer_size =
+ i_stream_default_set_max_buffer_size;
+ }
+ if (_stream->init_buffer_size == 0)
+ _stream->init_buffer_size = I_STREAM_MIN_SIZE;
+
+ i_zero(&_stream->statbuf);
+ _stream->statbuf.st_size = -1;
+ _stream->statbuf.st_atime =
+ _stream->statbuf.st_mtime =
+ _stream->statbuf.st_ctime = ioloop_time;
+ _stream->cached_stream_size = UOFF_T_MAX;
+
+ io_stream_init(&_stream->iostream);
+
+ if (_stream->istream.stream_errno != 0)
+ _stream->istream.eof = TRUE;
+
+ return &_stream->istream;
+}
+
+struct istream *i_stream_create_error(int stream_errno)
+{
+ struct istream_private *stream;
+
+ stream = i_new(struct istream_private, 1);
+ stream->istream.closed = TRUE;
+ stream->istream.readable_fd = FALSE;
+ stream->istream.blocking = TRUE;
+ stream->istream.seekable = TRUE;
+ stream->istream.eof = TRUE;
+ stream->istream.stream_errno = stream_errno;
+ /* Nothing can ever actually be read from this stream, but set a
+ reasonable max_buffer_size anyway since some filter istreams don't
+ behave properly otherwise. */
+ stream->max_buffer_size = IO_BLOCK_SIZE;
+ i_stream_create(stream, NULL, -1, 0);
+ i_stream_set_name(&stream->istream, "(error)");
+ return &stream->istream;
+}
+
+struct istream *
+i_stream_create_error_str(int stream_errno, const char *fmt, ...)
+{
+ struct istream *input;
+ va_list args;
+
+ va_start(args, fmt);
+ input = i_stream_create_error(stream_errno);
+ io_stream_set_verror(&input->real_stream->iostream, fmt, args);
+ va_end(args);
+ return input;
+}