diff options
Diffstat (limited to 'src/lib/istream.c')
-rw-r--r-- | src/lib/istream.c | 1316 |
1 files changed, 1316 insertions, 0 deletions
diff --git a/src/lib/istream.c b/src/lib/istream.c new file mode 100644 index 0000000..5fc5112 --- /dev/null +++ b/src/lib/istream.c @@ -0,0 +1,1316 @@ +/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "str.h" +#include "memarea.h" +#include "istream-private.h" + +static bool i_stream_is_buffer_invalid(const struct istream_private *stream); + +void i_stream_set_name(struct istream *stream, const char *name) +{ + i_free(stream->real_stream->iostream.name); + stream->real_stream->iostream.name = i_strdup(name); +} + +const char *i_stream_get_name(struct istream *stream) +{ + while (stream->real_stream->iostream.name == NULL) { + stream = stream->real_stream->parent; + if (stream == NULL) + return ""; + } + return stream->real_stream->iostream.name; +} + +static void i_stream_close_full(struct istream *stream, bool close_parents) +{ + io_stream_close(&stream->real_stream->iostream, close_parents); + stream->closed = TRUE; + + if (stream->stream_errno == 0) + stream->stream_errno = EPIPE; +} + +void i_stream_destroy(struct istream **stream) +{ + if (*stream == NULL) + return; + + i_stream_close_full(*stream, FALSE); + i_stream_unref(stream); +} + +void i_stream_ref(struct istream *stream) +{ + io_stream_ref(&stream->real_stream->iostream); +} + +void i_stream_unref(struct istream **stream) +{ + struct istream_private *_stream; + + if (*stream == NULL) + return; + + _stream = (*stream)->real_stream; + + if (_stream->iostream.refcount > 1) { + if (!io_stream_unref(&_stream->iostream)) + i_unreached(); + } else { + /* The snapshot may contain pointers to the parent istreams. + Free it before io_stream_unref() frees the parents. */ + i_stream_snapshot_free(&_stream->prev_snapshot); + + if (io_stream_unref(&_stream->iostream)) + i_unreached(); + str_free(&_stream->line_str); + i_stream_unref(&_stream->parent); + io_stream_free(&_stream->iostream); + } + *stream = NULL; +} + +#undef i_stream_add_destroy_callback +void i_stream_add_destroy_callback(struct istream *stream, + istream_callback_t *callback, void *context) +{ + io_stream_add_destroy_callback(&stream->real_stream->iostream, + callback, context); +} + +void i_stream_remove_destroy_callback(struct istream *stream, + void (*callback)()) +{ + io_stream_remove_destroy_callback(&stream->real_stream->iostream, + callback); +} + +int i_stream_get_fd(struct istream *stream) +{ + struct istream_private *_stream = stream->real_stream; + + return _stream->fd; +} + +void i_stream_copy_fd(struct istream *dest, struct istream *source) +{ + int fd = i_stream_get_fd(source); + + i_assert(fd != -1); + i_assert(dest->real_stream->fd == -1); + dest->real_stream->fd = fd; + dest->readable_fd = source->readable_fd; +} + +const char *i_stream_get_error(struct istream *stream) +{ + struct istream *s; + + /* we'll only return errors for streams that have stream_errno set or + that have reached EOF. we might be returning unintended error + otherwise. */ + if (stream->stream_errno == 0) + return stream->eof ? "EOF" : "<no error>"; + + for (s = stream; s != NULL; s = s->real_stream->parent) { + if (s->stream_errno == 0) + break; + if (s->real_stream->iostream.error != NULL) + return s->real_stream->iostream.error; + } + return strerror(stream->stream_errno); +} + +const char *i_stream_get_disconnect_reason(struct istream *stream) +{ + return io_stream_get_disconnect_reason(stream, NULL); +} + +void i_stream_close(struct istream *stream) +{ + if (stream != NULL) + i_stream_close_full(stream, TRUE); +} + +void i_stream_set_init_buffer_size(struct istream *stream, size_t size) +{ + stream->real_stream->init_buffer_size = size; +} + +void i_stream_set_max_buffer_size(struct istream *stream, size_t max_size) +{ + io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size); +} + +size_t i_stream_get_max_buffer_size(struct istream *stream) +{ + size_t max_size = 0; + + do { + if (max_size < stream->real_stream->max_buffer_size) + max_size = stream->real_stream->max_buffer_size; + stream = stream->real_stream->parent; + } while (stream != NULL); + return max_size; +} + +void i_stream_set_return_partial_line(struct istream *stream, bool set) +{ + stream->real_stream->return_nolf_line = set; +} + +void i_stream_set_persistent_buffers(struct istream *stream, bool set) +{ + do { + stream->real_stream->nonpersistent_buffers = !set; + stream = stream->real_stream->parent; + } while (stream != NULL); +} + +void i_stream_set_blocking(struct istream *stream, bool blocking) +{ + int prev_fd = -1; + + do { + stream->blocking = blocking; + if (stream->real_stream->fd != -1 && + stream->real_stream->fd != prev_fd) { + fd_set_nonblock(stream->real_stream->fd, !blocking); + prev_fd = stream->real_stream->fd; + } + stream = stream->real_stream->parent; + } while (stream != NULL); +} + +static void i_stream_update(struct istream_private *stream) +{ + if (stream->parent == NULL) + stream->access_counter++; + else { + stream->access_counter = + stream->parent->real_stream->access_counter; + stream->parent_expected_offset = stream->parent->v_offset; + } +} + +static bool snapshot_has_memarea(struct istream_snapshot *snapshot, + struct memarea *memarea) +{ + if (snapshot->old_memarea == memarea) + return TRUE; + if (snapshot->prev_snapshot != NULL) + return snapshot_has_memarea(snapshot->prev_snapshot, memarea); + return FALSE; +} + +struct istream_snapshot * +i_stream_default_snapshot(struct istream_private *stream, + struct istream_snapshot *prev_snapshot) +{ + struct istream_snapshot *snapshot; + + if (stream->memarea != NULL) { + if (prev_snapshot != NULL) { + if (snapshot_has_memarea(prev_snapshot, stream->memarea)) + return prev_snapshot; + } + /* This stream has a memarea. Reference it, so we can later on + rollback if needed. */ + snapshot = i_new(struct istream_snapshot, 1); + snapshot->old_memarea = stream->memarea; + snapshot->prev_snapshot = prev_snapshot; + memarea_ref(snapshot->old_memarea); + return snapshot; + } + if (stream->parent == NULL) { + if (stream->nonpersistent_buffers) { + /* Assume that memarea would be used normally, but + now it's NULL because the buffer is empty and + empty buffers are freed. */ + i_assert(stream->skip == stream->pos); + return prev_snapshot; + } + i_panic("%s is missing istream.snapshot() implementation", + i_stream_get_name(&stream->istream)); + } + struct istream_private *_parent_stream = + stream->parent->real_stream; + return _parent_stream->snapshot(_parent_stream, prev_snapshot); +} + +void i_stream_snapshot_free(struct istream_snapshot **_snapshot) +{ + struct istream_snapshot *snapshot = *_snapshot; + + if (*_snapshot == NULL) + return; + *_snapshot = NULL; + + i_stream_snapshot_free(&snapshot->prev_snapshot); + if (snapshot->free != NULL) + snapshot->free(snapshot); + else { + if (snapshot->old_memarea != NULL) + memarea_unref(&snapshot->old_memarea); + i_stream_unref(&snapshot->istream); + i_free(snapshot); + } +} + +static struct istream_snapshot * +i_stream_noop_snapshot(struct istream_private *stream ATTR_UNUSED, + struct istream_snapshot *prev_snapshot) +{ + return prev_snapshot; +} + +ssize_t i_stream_read(struct istream *stream) +{ + struct istream_private *_stream = stream->real_stream; + ssize_t ret; +#ifdef DEBUG + unsigned char prev_buf[4]; + const unsigned char *prev_data = _stream->buffer; + size_t prev_skip = _stream->skip, prev_pos = _stream->pos; + bool invalid = i_stream_is_buffer_invalid(_stream); + + i_assert(prev_skip <= prev_pos); + if (invalid) + ; + else if (prev_pos - prev_skip <= 4) + memcpy(prev_buf, prev_data + prev_skip, prev_pos - prev_skip); + else { + memcpy(prev_buf, prev_data + prev_skip, 2); + memcpy(prev_buf+2, prev_data + prev_pos - 2, 2); + } +#endif + + if (_stream->skip != _stream->pos || _stream->prev_snapshot != NULL) { + _stream->prev_snapshot = + _stream->snapshot(_stream, _stream->prev_snapshot); + } + ret = i_stream_read_memarea(stream); + if (ret > 0) + i_stream_snapshot_free(&_stream->prev_snapshot); +#ifdef DEBUG + else if (!invalid) { + i_assert((_stream->pos - _stream->skip) == (prev_pos - prev_skip) || + prev_pos == prev_skip); + if (prev_pos - prev_skip <= 4) + i_assert(memcmp(prev_buf, prev_data + prev_skip, prev_pos - prev_skip) == 0); + else { + i_assert(memcmp(prev_buf, prev_data + prev_skip, 2) == 0); + i_assert(memcmp(prev_buf+2, prev_data + prev_pos - 2, 2) == 0); + } + } +#endif + return ret; +} + +ssize_t i_stream_read_memarea(struct istream *stream) +{ + struct istream_private *_stream = stream->real_stream; + size_t old_size; + ssize_t ret; + + if (unlikely(stream->closed || stream->stream_errno != 0)) { + stream->eof = TRUE; + errno = stream->stream_errno; + return -1; + } + + stream->eof = FALSE; + + if (_stream->parent != NULL) + i_stream_seek(_stream->parent, _stream->parent_expected_offset); + + old_size = _stream->pos - _stream->skip; + if (_stream->pos < _stream->high_pos) { + /* we're here because we seeked back within the read buffer. */ + ret = _stream->high_pos - _stream->pos; + _stream->pos = _stream->high_pos; + _stream->high_pos = 0; + } else { + _stream->high_pos = 0; + ret = _stream->read(_stream); + } + i_assert(old_size <= _stream->pos - _stream->skip); + switch (ret) { + case -2: + i_assert(_stream->skip != _stream->pos); + break; + case -1: + if (stream->stream_errno != 0) { + /* error handling should be easier if we now just + assume the stream is now at EOF */ + stream->eof = TRUE; + errno = stream->stream_errno; + } else { + i_assert(stream->eof); + i_assert(old_size == _stream->pos - _stream->skip); + } + break; + case 0: + i_assert(!stream->blocking); + break; + default: + i_assert(ret > 0); + i_assert(_stream->skip < _stream->pos); + i_assert((size_t)ret+old_size == _stream->pos - _stream->skip); + _stream->last_read_timeval = ioloop_timeval; + break; + } + + if (stream->stream_errno != 0) { + /* error handling should be easier if we now just + assume the stream is now at EOF. Note that we could get here + even if read() didn't return -1, although that's a little + bit sloppy istream implementation. */ + stream->eof = TRUE; + } + + i_stream_update(_stream); + /* verify that parents' access_counters are valid. the parent's + i_stream_read() should guarantee this. */ + i_assert(!i_stream_is_buffer_invalid(_stream)); + return ret; +} + +int i_stream_read_more_memarea(struct istream *stream, + const unsigned char **data_r, size_t *size_r) +{ + *data_r = i_stream_get_data(stream, size_r); + if (*size_r > 0) + return 1; + + int ret = i_stream_read_memarea(stream); + *data_r = i_stream_get_data(stream, size_r); + return ret; +} + +void i_stream_get_last_read_time(struct istream *stream, struct timeval *tv_r) +{ + *tv_r = stream->real_stream->last_read_timeval; +} + +ssize_t i_stream_read_copy_from_parent(struct istream *istream) +{ + struct istream_private *stream = istream->real_stream; + size_t pos; + ssize_t ret; + + stream->pos -= stream->skip; + stream->skip = 0; + + stream->buffer = i_stream_get_data(stream->parent, &pos); + if (pos > stream->pos) + ret = 0; + else do { + ret = i_stream_read_memarea(stream->parent); + stream->istream.stream_errno = stream->parent->stream_errno; + stream->istream.eof = stream->parent->eof; + stream->buffer = i_stream_get_data(stream->parent, &pos); + /* check again, in case the parent stream had been seeked + backwards and the previous read() didn't get us far + enough. */ + } while (pos <= stream->pos && ret > 0); + if (ret == -2) { + i_stream_update(stream); + return -2; + } + + ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : + (ret == 0 ? 0 : -1); + stream->pos = pos; + i_assert(ret != -1 || stream->istream.eof || + stream->istream.stream_errno != 0); + i_stream_update(stream); + return ret; +} + +void i_stream_free_buffer(struct istream_private *stream) +{ + if (stream->memarea != NULL) { + memarea_unref(&stream->memarea); + stream->w_buffer = NULL; + } else if (stream->w_buffer != NULL) { + i_free_and_null(stream->w_buffer); + } else { + /* don't know how to free it */ + return; + } + stream->buffer_size = 0; +} + +void i_stream_skip(struct istream *stream, uoff_t count) +{ + struct istream_private *_stream = stream->real_stream; + size_t data_size; + + data_size = _stream->pos - _stream->skip; + if (count <= data_size) { + /* within buffer */ + stream->v_offset += count; + _stream->skip += count; + if (_stream->nonpersistent_buffers && + _stream->skip == _stream->pos) { + _stream->skip = _stream->pos = 0; + i_stream_free_buffer(_stream); + } + return; + } + + /* have to seek forward */ + count -= data_size; + _stream->skip = _stream->pos; + stream->v_offset += data_size; + + if (unlikely(stream->closed || stream->stream_errno != 0)) + return; + + _stream->seek(_stream, stream->v_offset + count, FALSE); +} + +static bool i_stream_can_optimize_seek(struct istream_private *stream) +{ + if (stream->parent == NULL) + return TRUE; + + /* use the fast route only if the parent stream hasn't been changed */ + if (stream->access_counter != + stream->parent->real_stream->access_counter) + return FALSE; + + return i_stream_can_optimize_seek(stream->parent->real_stream); +} + +void i_stream_seek(struct istream *stream, uoff_t v_offset) +{ + struct istream_private *_stream = stream->real_stream; + + if (v_offset >= stream->v_offset && + i_stream_can_optimize_seek(_stream)) + i_stream_skip(stream, v_offset - stream->v_offset); + else { + if (unlikely(stream->closed || stream->stream_errno != 0)) { + stream->eof = TRUE; + return; + } + stream->eof = FALSE; + _stream->seek(_stream, v_offset, FALSE); + } + i_stream_update(_stream); +} + +void i_stream_seek_mark(struct istream *stream, uoff_t v_offset) +{ + struct istream_private *_stream = stream->real_stream; + + if (unlikely(stream->closed || stream->stream_errno != 0)) + return; + + stream->eof = FALSE; + _stream->seek(_stream, v_offset, TRUE); + i_stream_update(_stream); +} + +void i_stream_sync(struct istream *stream) +{ + struct istream_private *_stream = stream->real_stream; + + if (unlikely(stream->closed || stream->stream_errno != 0)) + return; + + if (_stream->sync != NULL) { + _stream->sync(_stream); + i_stream_update(_stream); + } +} + +int i_stream_stat(struct istream *stream, bool exact, const struct stat **st_r) +{ + struct istream_private *_stream = stream->real_stream; + + if (unlikely(stream->closed || stream->stream_errno != 0)) + return -1; + + if (_stream->stat(_stream, exact) < 0) { + stream->eof = TRUE; + return -1; + } + *st_r = &_stream->statbuf; + return 0; +} + +int i_stream_get_size(struct istream *stream, bool exact, uoff_t *size_r) +{ + struct istream_private *_stream = stream->real_stream; + + if (unlikely(stream->closed || stream->stream_errno != 0)) + return -1; + + int ret; + if ((ret = _stream->get_size(_stream, exact, size_r)) < 0) + stream->eof = TRUE; + return ret; +} + +bool i_stream_have_bytes_left(struct istream *stream) +{ + return i_stream_get_data_size(stream) > 0 || !stream->eof; +} + +bool i_stream_read_eof(struct istream *stream) +{ + if (i_stream_get_data_size(stream) == 0) + (void)i_stream_read(stream); + return !i_stream_have_bytes_left(stream); +} + +uoff_t i_stream_get_absolute_offset(struct istream *stream) +{ + uoff_t abs_offset = stream->v_offset; + while (stream != NULL) { + abs_offset += stream->real_stream->start_offset; + stream = stream->real_stream->parent; + } + return abs_offset; +} + +static char *i_stream_next_line_finish(struct istream_private *stream, size_t i) +{ + char *ret; + size_t end; + + if (i > stream->skip && stream->buffer[i-1] == '\r') { + end = i - 1; + stream->line_crlf = TRUE; + } else { + end = i; + stream->line_crlf = FALSE; + } + + if (stream->buffer == stream->w_buffer && + end < stream->buffer_size) { + /* modify the buffer directly */ + stream->w_buffer[end] = '\0'; + ret = (char *)stream->w_buffer + stream->skip; + } else { + /* use a temporary string to return it */ + if (stream->line_str == NULL) + stream->line_str = str_new(default_pool, 256); + str_truncate(stream->line_str, 0); + if (stream->skip < end) + str_append_data(stream->line_str, stream->buffer + stream->skip, + end - stream->skip); + ret = str_c_modifiable(stream->line_str); + } + + if (i < stream->pos) + i++; + stream->istream.v_offset += i - stream->skip; + stream->skip = i; + return ret; +} + +static char *i_stream_last_line(struct istream_private *_stream) +{ + if (_stream->istream.eof && _stream->skip != _stream->pos && + _stream->return_nolf_line) { + /* the last line is missing LF and we want to return it. */ + return i_stream_next_line_finish(_stream, _stream->pos); + } + return NULL; +} + +char *i_stream_next_line(struct istream *stream) +{ + struct istream_private *_stream = stream->real_stream; + const unsigned char *pos; + + if (_stream->skip >= _stream->pos) + return NULL; + + pos = memchr(_stream->buffer + _stream->skip, '\n', + _stream->pos - _stream->skip); + if (pos != NULL) { + return i_stream_next_line_finish(_stream, + pos - _stream->buffer); + } else { + return i_stream_last_line(_stream); + } +} + +char *i_stream_read_next_line(struct istream *stream) +{ + char *line; + + for (;;) { + line = i_stream_next_line(stream); + if (line != NULL) + break; + + switch (i_stream_read(stream)) { + case -2: + io_stream_set_error(&stream->real_stream->iostream, + "Line is too long (over %zu" + " bytes at offset %"PRIuUOFF_T")", + i_stream_get_data_size(stream), stream->v_offset); + stream->stream_errno = errno = ENOBUFS; + stream->eof = TRUE; + return NULL; + case -1: + return i_stream_last_line(stream->real_stream); + case 0: + return NULL; + } + } + return line; +} + +bool i_stream_last_line_crlf(struct istream *stream) +{ + return stream->real_stream->line_crlf; +} + +static bool i_stream_is_buffer_invalid(const struct istream_private *stream) +{ + if (stream->parent == NULL) { + /* the buffer can't point to parent, because it doesn't exist */ + return FALSE; + } + if (stream->w_buffer != NULL) { + /* we can pretty safely assume that the stream is using its + own private buffer, so it can never become invalid. */ + return FALSE; + } + if (stream->access_counter != + stream->parent->real_stream->access_counter) { + /* parent has been modified behind this stream, we can't trust + that our buffer is valid */ + return TRUE; + } + return i_stream_is_buffer_invalid(stream->parent->real_stream); +} + +const unsigned char * +i_stream_get_data(struct istream *stream, size_t *size_r) +{ + struct istream_private *_stream = stream->real_stream; + + if (_stream->skip >= _stream->pos) { + *size_r = 0; + return uchar_empty_ptr; + } + + if (unlikely(i_stream_is_buffer_invalid(_stream))) { + /* This stream may be using parent's buffer directly as + _stream->buffer, but the parent stream has already been + modified indirectly. This means that the buffer might no + longer point to where we assume it points to. So we'll + just return the stream as empty until it's read again. + + It's a bit ugly to suddenly drop data from the stream that + was already read, but since this happens only with shared + parent istreams the caller is hopefully aware enough that + something like this might happen. The other solutions would + be to a) try to automatically read the data back (but we + can't handle errors..) or b) always copy data to stream's + own buffer instead of pointing to parent's buffer (but this + causes data copying that is nearly always unnecessary). */ + *size_r = 0; + /* if we had already read until EOF, mark the stream again as + not being at the end of file. */ + if (stream->stream_errno == 0) { + _stream->skip = _stream->pos = 0; + stream->eof = FALSE; + } + return uchar_empty_ptr; + } + + *size_r = _stream->pos - _stream->skip; + return _stream->buffer + _stream->skip; +} + +size_t i_stream_get_data_size(struct istream *stream) +{ + size_t size; + + (void)i_stream_get_data(stream, &size); + return size; +} + +unsigned char *i_stream_get_modifiable_data(struct istream *stream, + size_t *size_r) +{ + struct istream_private *_stream = stream->real_stream; + + if (_stream->skip >= _stream->pos || _stream->w_buffer == NULL) { + *size_r = 0; + return NULL; + } + + *size_r = _stream->pos - _stream->skip; + return _stream->w_buffer + _stream->skip; +} + +int i_stream_read_data(struct istream *stream, const unsigned char **data_r, + size_t *size_r, size_t threshold) +{ + ssize_t ret = 0; + bool read_more = FALSE; + + do { + *data_r = i_stream_get_data(stream, size_r); + if (*size_r > threshold) + return 1; + + /* we need more data */ + ret = i_stream_read(stream); + if (ret > 0) + read_more = TRUE; + } while (ret > 0); + + *data_r = i_stream_get_data(stream, size_r); + if (ret == -2) + return -2; + + if (ret == 0) { + /* need to read more */ + i_assert(!stream->blocking); + return 0; + } + if (stream->eof) { + if (read_more) { + /* we read at least some new data */ + return 0; + } + } else { + i_assert(stream->stream_errno != 0); + } + return -1; +} + +int i_stream_read_limited(struct istream *stream, const unsigned char **data_r, + size_t *size_r, size_t limit) +{ + struct istream_private *_stream = stream->real_stream; + int ret; + + *data_r = i_stream_get_data(stream, size_r); + if (*size_r >= limit) { + *size_r = limit; + return 1; + } + + _stream->data_limit = limit; + ret = i_stream_read_more(stream, data_r, size_r); + _stream->data_limit = 0; + + if (*size_r >= limit) + *size_r = limit; + return ret; +} + +void i_stream_compress(struct istream_private *stream) +{ + i_assert(stream->memarea == NULL || + memarea_get_refcount(stream->memarea) == 1); + + if (stream->skip != stream->pos) { + memmove(stream->w_buffer, stream->w_buffer + stream->skip, + stream->pos - stream->skip); + } + stream->pos -= stream->skip; + + stream->skip = 0; +} + +static void i_stream_w_buffer_free(void *buf) +{ + i_free(buf); +} + +static void +i_stream_w_buffer_realloc(struct istream_private *stream, size_t old_size) +{ + void *new_buffer; + + if (stream->memarea != NULL && + memarea_get_refcount(stream->memarea) == 1) { + /* Nobody else is referencing the memarea. + We can just reallocate it. */ + memarea_free_without_callback(&stream->memarea); + new_buffer = i_realloc(stream->w_buffer, old_size, + stream->buffer_size); + } else { + new_buffer = i_malloc(stream->buffer_size); + if (old_size > 0) { + i_assert(stream->w_buffer != NULL); + memcpy(new_buffer, stream->w_buffer, old_size); + } + if (stream->memarea != NULL) + memarea_unref(&stream->memarea); + } + + stream->w_buffer = new_buffer; + stream->buffer = new_buffer; + + stream->memarea = memarea_init(stream->w_buffer, stream->buffer_size, + i_stream_w_buffer_free, new_buffer); +} + +void i_stream_grow_buffer(struct istream_private *stream, size_t bytes) +{ + size_t old_size, max_size; + + old_size = stream->buffer_size; + + stream->buffer_size = stream->pos + bytes; + if (stream->buffer_size <= stream->init_buffer_size) + stream->buffer_size = stream->init_buffer_size; + else + stream->buffer_size = nearest_power(stream->buffer_size); + + max_size = i_stream_get_max_buffer_size(&stream->istream); + i_assert(max_size > 0); + if (stream->buffer_size > max_size) + stream->buffer_size = max_size; + + if (stream->buffer_size <= old_size) + stream->buffer_size = old_size; + else + i_stream_w_buffer_realloc(stream, old_size); +} + +bool i_stream_try_alloc(struct istream_private *stream, + size_t wanted_size, size_t *size_r) +{ + i_assert(wanted_size > 0); + i_assert(stream->buffer_size >= stream->pos); + + if (wanted_size > stream->buffer_size - stream->pos) { + if (stream->skip > 0) { + /* remove the unused bytes from beginning of buffer */ + if (stream->memarea != NULL && + memarea_get_refcount(stream->memarea) > 1) { + /* The memarea is still referenced. We can't + overwrite data until extra references are + gone. */ + i_stream_w_buffer_realloc(stream, stream->buffer_size); + } + i_stream_compress(stream); + } else if (stream->buffer_size < i_stream_get_max_buffer_size(&stream->istream)) { + /* buffer is full - grow it */ + i_stream_grow_buffer(stream, I_STREAM_MIN_SIZE); + } + } + + if (stream->data_limit == 0 || + (stream->buffer_size - stream->skip) < stream->data_limit) + *size_r = stream->buffer_size - stream->pos; + else { + size_t buffered = (stream->pos - stream->skip); + + if (buffered >= stream->data_limit) + *size_r = 0; + else + *size_r = stream->data_limit - buffered; + } + i_assert(stream->w_buffer != NULL || *size_r == 0); + return *size_r > 0; +} + +bool ATTR_NOWARN_UNUSED_RESULT +i_stream_try_alloc_avoid_compress(struct istream_private *stream, + size_t wanted_size, size_t *size_r) +{ + size_t old_skip = stream->skip; + + /* try first with skip=0, so no compression is done */ + stream->skip = 0; + bool ret = i_stream_try_alloc(stream, wanted_size, size_r); + stream->skip = old_skip; + if (ret || old_skip == 0) + return ret; + /* it's full. try with compression. */ + return i_stream_try_alloc(stream, wanted_size, size_r); +} + +void *i_stream_alloc(struct istream_private *stream, size_t size) +{ + size_t old_size, avail_size; + + (void)i_stream_try_alloc(stream, size, &avail_size); + if (avail_size < size) { + old_size = stream->buffer_size; + stream->buffer_size = nearest_power(stream->pos + size); + i_stream_w_buffer_realloc(stream, old_size); + + (void)i_stream_try_alloc(stream, size, &avail_size); + i_assert(avail_size >= size); + } + return stream->w_buffer + stream->pos; +} + +void i_stream_memarea_detach(struct istream_private *stream) +{ + if (stream->memarea != NULL) { + /* Don't overwrite data in a snapshot. Allocate a new + buffer instead. */ + memarea_unref(&stream->memarea); + stream->buffer_size = 0; + stream->buffer = NULL; + stream->w_buffer = NULL; + } +} + +bool i_stream_add_data(struct istream *_stream, const unsigned char *data, + size_t size) +{ + struct istream_private *stream = _stream->real_stream; + size_t size2; + + (void)i_stream_try_alloc(stream, size, &size2); + if (size > size2) + return FALSE; + + memcpy(stream->w_buffer + stream->pos, data, size); + stream->pos += size; + return TRUE; +} + +struct istream *i_stream_get_root_io(struct istream *stream) +{ + while (stream->real_stream->parent != NULL) { + i_assert(stream->real_stream->io == NULL); + stream = stream->real_stream->parent; + } + return stream; +} + +void i_stream_set_input_pending(struct istream *stream, bool pending) +{ + if (!pending) + return; + + stream = i_stream_get_root_io(stream); + if (stream->real_stream->io != NULL) + io_set_pending(stream->real_stream->io); + else + stream->real_stream->io_pending = TRUE; +} + +void i_stream_switch_ioloop_to(struct istream *stream, struct ioloop *ioloop) +{ + io_stream_switch_ioloop_to(&stream->real_stream->iostream, ioloop); + + do { + if (stream->real_stream->switch_ioloop_to != NULL) { + stream->real_stream->switch_ioloop_to( + stream->real_stream, ioloop); + } + stream = stream->real_stream->parent; + } while (stream != NULL); +} + +void i_stream_switch_ioloop(struct istream *stream) +{ + i_stream_switch_ioloop_to(stream, current_ioloop); +} + +void i_stream_set_io(struct istream *stream, struct io *io) +{ + stream = i_stream_get_root_io(stream); + + i_assert(stream->real_stream->io == NULL); + stream->real_stream->io = io; + if (stream->real_stream->io_pending) { + io_set_pending(io); + stream->real_stream->io_pending = FALSE; + } +} + +void i_stream_unset_io(struct istream *stream, struct io *io) +{ + stream = i_stream_get_root_io(stream); + + i_assert(stream->real_stream->io == io); + if (io_is_pending(io)) + stream->real_stream->io_pending = TRUE; + stream->real_stream->io = NULL; +} + +static void +i_stream_default_set_max_buffer_size(struct iostream_private *stream, + size_t max_size) +{ + struct istream_private *_stream = + container_of(stream, struct istream_private, iostream); + + _stream->max_buffer_size = max_size; + if (_stream->parent != NULL) + i_stream_set_max_buffer_size(_stream->parent, max_size); +} + +static void i_stream_default_close(struct iostream_private *stream, + bool close_parent) +{ + struct istream_private *_stream = + container_of(stream, struct istream_private, iostream); + + if (close_parent) + i_stream_close(_stream->parent); +} + +static void i_stream_default_destroy(struct iostream_private *stream) +{ + struct istream_private *_stream = + container_of(stream, struct istream_private, iostream); + + i_stream_free_buffer(_stream); + i_stream_unref(&_stream->parent); +} + +static void +i_stream_default_seek_seekable(struct istream_private *stream, + uoff_t v_offset, bool mark ATTR_UNUSED) +{ + stream->istream.v_offset = v_offset; + stream->skip = stream->pos = 0; +} + +void i_stream_default_seek_nonseekable(struct istream_private *stream, + uoff_t v_offset, bool mark ATTR_UNUSED) +{ + size_t available; + + if (stream->istream.v_offset > v_offset) + i_panic("stream %s doesn't support seeking backwards", + i_stream_get_name(&stream->istream)); + + while (stream->istream.v_offset < v_offset) { + (void)i_stream_read(&stream->istream); + + available = stream->pos - stream->skip; + if (available == 0) { + if (stream->istream.stream_errno != 0) { + /* read failed */ + return; + } + io_stream_set_error(&stream->iostream, + "Can't seek to offset %"PRIuUOFF_T + ", because we have data only up to offset %" + PRIuUOFF_T" (eof=%d)", v_offset, + stream->istream.v_offset, stream->istream.eof ? 1 : 0); + stream->istream.stream_errno = ESPIPE; + return; + } + if (available <= v_offset - stream->istream.v_offset) + i_stream_skip(&stream->istream, available); + else { + i_stream_skip(&stream->istream, + v_offset - stream->istream.v_offset); + } + } +} + +bool i_stream_nonseekable_try_seek(struct istream_private *stream, + uoff_t v_offset) +{ + uoff_t start_offset = stream->istream.v_offset - stream->skip; + + if (v_offset < start_offset) { + /* have to seek backwards */ + i_stream_seek(stream->parent, stream->parent_start_offset); + stream->parent_expected_offset = stream->parent_start_offset; + stream->skip = stream->pos = 0; + stream->istream.v_offset = 0; + stream->high_pos = 0; + return FALSE; + } + + if (v_offset <= start_offset + stream->pos) { + /* seeking backwards within what's already cached */ + stream->skip = v_offset - start_offset; + stream->istream.v_offset = v_offset; + if (stream->high_pos == 0) + stream->high_pos = stream->pos; + stream->pos = stream->skip; + } else { + /* read forward */ + i_stream_default_seek_nonseekable(stream, v_offset, FALSE); + } + return TRUE; +} + +static int +seekable_i_stream_get_size(struct istream_private *stream) +{ + if (stream->cached_stream_size == UOFF_T_MAX) { + uoff_t old_offset = stream->istream.v_offset; + ssize_t ret; + + do { + i_stream_skip(&stream->istream, + i_stream_get_data_size(&stream->istream)); + } while ((ret = i_stream_read(&stream->istream)) > 0); + i_assert(ret == -1); + if (stream->istream.stream_errno != 0) + return -1; + + stream->cached_stream_size = stream->istream.v_offset; + i_stream_seek(&stream->istream, old_offset); + } + stream->statbuf.st_size = stream->cached_stream_size; + return 0; +} + +static int +i_stream_default_stat(struct istream_private *stream, bool exact) +{ + const struct stat *st; + + if (stream->parent == NULL) + return stream->istream.stream_errno == 0 ? 0 : -1; + + if (i_stream_stat(stream->parent, exact, &st) < 0) { + stream->istream.stream_errno = stream->parent->stream_errno; + return -1; + } + stream->statbuf = *st; + if (exact && !stream->stream_size_passthrough) { + /* exact size is not known, even if parent returned something */ + stream->statbuf.st_size = -1; + if (stream->istream.seekable) { + if (seekable_i_stream_get_size(stream) < 0) + return -1; + } + } else { + /* When exact=FALSE always return the parent stat's size, even + if we know the exact value. This is necessary because + otherwise e.g. mbox code can see two different values and + think that the mbox file keeps changing. */ + } + return 0; +} + +static int +i_stream_default_get_size(struct istream_private *stream, + bool exact, uoff_t *size_r) +{ + if (stream->stat(stream, exact) < 0) + return -1; + if (stream->statbuf.st_size == -1) + return 0; + + *size_r = stream->statbuf.st_size; + return 1; +} + +void i_stream_init_parent(struct istream_private *_stream, + struct istream *parent) +{ + _stream->access_counter = parent->real_stream->access_counter; + _stream->parent = parent; + _stream->parent_start_offset = parent->v_offset; + _stream->parent_expected_offset = parent->v_offset; + _stream->start_offset = parent->v_offset; + /* if parent stream is an istream-error, copy the error */ + _stream->istream.stream_errno = parent->stream_errno; + _stream->istream.eof = parent->eof; + i_stream_ref(parent); +} + +struct istream * +i_stream_create(struct istream_private *_stream, struct istream *parent, int fd, + enum istream_create_flag flags) +{ + bool noop_snapshot = (flags & ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT) != 0; + + _stream->fd = fd; + if (parent != NULL) + i_stream_init_parent(_stream, parent); + else if (_stream->memarea == NULL && !noop_snapshot) { + /* The stream has no parent and no memarea yet. We'll assume + that it wants to be using memareas for the reads. */ + _stream->memarea = memarea_init_empty(); + } + _stream->istream.real_stream = _stream; + + if (_stream->iostream.close == NULL) + _stream->iostream.close = i_stream_default_close; + if (_stream->iostream.destroy == NULL) + _stream->iostream.destroy = i_stream_default_destroy; + if (_stream->seek == NULL) { + _stream->seek = _stream->istream.seekable ? + i_stream_default_seek_seekable : + i_stream_default_seek_nonseekable; + } + if (_stream->stat == NULL) + _stream->stat = i_stream_default_stat; + if (_stream->get_size == NULL) + _stream->get_size = i_stream_default_get_size; + if (_stream->snapshot == NULL) { + _stream->snapshot = noop_snapshot ? + i_stream_noop_snapshot : + i_stream_default_snapshot; + } + if (_stream->iostream.set_max_buffer_size == NULL) { + _stream->iostream.set_max_buffer_size = + i_stream_default_set_max_buffer_size; + } + if (_stream->init_buffer_size == 0) + _stream->init_buffer_size = I_STREAM_MIN_SIZE; + + i_zero(&_stream->statbuf); + _stream->statbuf.st_size = -1; + _stream->statbuf.st_atime = + _stream->statbuf.st_mtime = + _stream->statbuf.st_ctime = ioloop_time; + _stream->cached_stream_size = UOFF_T_MAX; + + io_stream_init(&_stream->iostream); + + if (_stream->istream.stream_errno != 0) + _stream->istream.eof = TRUE; + + return &_stream->istream; +} + +struct istream *i_stream_create_error(int stream_errno) +{ + struct istream_private *stream; + + stream = i_new(struct istream_private, 1); + stream->istream.closed = TRUE; + stream->istream.readable_fd = FALSE; + stream->istream.blocking = TRUE; + stream->istream.seekable = TRUE; + stream->istream.eof = TRUE; + stream->istream.stream_errno = stream_errno; + /* Nothing can ever actually be read from this stream, but set a + reasonable max_buffer_size anyway since some filter istreams don't + behave properly otherwise. */ + stream->max_buffer_size = IO_BLOCK_SIZE; + i_stream_create(stream, NULL, -1, 0); + i_stream_set_name(&stream->istream, "(error)"); + return &stream->istream; +} + +struct istream * +i_stream_create_error_str(int stream_errno, const char *fmt, ...) +{ + struct istream *input; + va_list args; + + va_start(args, fmt); + input = i_stream_create_error(stream_errno); + io_stream_set_verror(&input->real_stream->iostream, fmt, args); + va_end(args); + return input; +} |