diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:36:47 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:36:47 +0000 |
commit | 0441d265f2bb9da249c7abf333f0f771fadb4ab5 (patch) | |
tree | 3f3789daa2f6db22da6e55e92bee0062a7d613fe /src/lib/istream-seekable.c | |
parent | Initial commit. (diff) | |
download | dovecot-0441d265f2bb9da249c7abf333f0f771fadb4ab5.tar.xz dovecot-0441d265f2bb9da249c7abf333f0f771fadb4ab5.zip |
Adding upstream version 1:2.3.21+dfsg1.upstream/1%2.3.21+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/lib/istream-seekable.c')
-rw-r--r-- | src/lib/istream-seekable.c | 558 |
1 files changed, 558 insertions, 0 deletions
diff --git a/src/lib/istream-seekable.c b/src/lib/istream-seekable.c new file mode 100644 index 0000000..5bc19ad --- /dev/null +++ b/src/lib/istream-seekable.c @@ -0,0 +1,558 @@ +/* Copyright (c) 2005-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "str.h" +#include "memarea.h" +#include "read-full.h" +#include "write-full.h" +#include "safe-mkstemp.h" +#include "istream-private.h" +#include "istream-concat.h" +#include "istream-seekable.h" + +#include <unistd.h> + +#define BUF_INITIAL_SIZE (1024*32) + +struct seekable_istream { + struct istream_private istream; + + char *temp_path; + uoff_t write_peak; + uoff_t size; + size_t buffer_peak; + + int (*fd_callback)(const char **path_r, void *context); + void *context; + + struct istream **input, *cur_input; + struct istream *fd_input; + unsigned int cur_idx; + int fd; + bool free_context; +}; + +static void i_stream_seekable_close(struct iostream_private *stream, + bool close_parent ATTR_UNUSED) +{ + struct seekable_istream *sstream = + container_of(stream, struct seekable_istream, istream.iostream); + + sstream->fd = -1; + i_stream_close(sstream->fd_input); +} + +static void unref_streams(struct seekable_istream *sstream) +{ + unsigned int i; + + for (i = 0; sstream->input[i] != NULL; i++) + i_stream_unref(&sstream->input[i]); +} + +static void i_stream_seekable_destroy(struct iostream_private *stream) +{ + struct seekable_istream *sstream = + container_of(stream, struct seekable_istream, istream.iostream); + + i_stream_free_buffer(&sstream->istream); + i_stream_unref(&sstream->fd_input); + unref_streams(sstream); + + if (sstream->free_context) + i_free(sstream->context); + i_free(sstream->temp_path); + i_free(sstream->input); +} + +static void +i_stream_seekable_set_max_buffer_size(struct iostream_private *stream, + size_t max_size) +{ + struct seekable_istream *sstream = + container_of(stream, struct seekable_istream, istream.iostream); + unsigned int i; + + sstream->istream.max_buffer_size = max_size; + if (sstream->fd_input != NULL) + i_stream_set_max_buffer_size(sstream->fd_input, max_size); + for (i = 0; sstream->input[i] != NULL; i++) + i_stream_set_max_buffer_size(sstream->input[i], max_size); +} + +static int copy_to_temp_file(struct seekable_istream *sstream) +{ + struct istream_private *stream = &sstream->istream; + const char *path; + const unsigned char *buffer; + size_t size; + int fd; + + fd = sstream->fd_callback(&path, sstream->context); + if (fd == -1) + return -1; + + /* copy our currently read buffer to it */ + i_assert(stream->pos <= sstream->buffer_peak); + if (write_full(fd, stream->buffer, sstream->buffer_peak) < 0) { + if (!ENOSPACE(errno)) + i_error("istream-seekable: write_full(%s) failed: %m", path); + i_close_fd(&fd); + return -1; + } + sstream->temp_path = i_strdup(path); + sstream->write_peak = sstream->buffer_peak; + + sstream->fd = fd; + sstream->fd_input = i_stream_create_fd_autoclose(&fd, + I_MAX(stream->pos, sstream->istream.max_buffer_size)); + i_stream_set_name(sstream->fd_input, t_strdup_printf( + "(seekable temp-istream for: %s)", i_stream_get_name(&stream->istream))); + + /* read back the data we just had in our buffer */ + for (;;) { + buffer = i_stream_get_data(sstream->fd_input, &size); + if (size >= stream->pos) + break; + + ssize_t ret; + if ((ret = i_stream_read_memarea(sstream->fd_input)) <= 0) { + i_assert(ret != 0); + i_assert(ret != -2); + i_error("istream-seekable: Couldn't read back " + "in-memory input %s: %s", + i_stream_get_name(&stream->istream), + i_stream_get_error(sstream->fd_input)); + i_stream_destroy(&sstream->fd_input); + sstream->fd = -1; /* autoclosed by fd_input */ + return -1; + } + } + /* Set the max buffer size only after we've already read everything + into memory. For example with istream-data it's possible that + more data exists in buffer than max_buffer_size. */ + i_stream_set_max_buffer_size(sstream->fd_input, + sstream->istream.max_buffer_size); + stream->buffer = buffer; + i_stream_free_buffer(&sstream->istream); + return 0; +} + +static ssize_t read_more(struct seekable_istream *sstream) +{ + size_t size; + ssize_t ret; + + if (sstream->cur_input == NULL) { + sstream->istream.istream.eof = TRUE; + return -1; + } + + while ((ret = i_stream_read_memarea(sstream->cur_input)) == -1) { + if (sstream->cur_input->stream_errno != 0) { + io_stream_set_error(&sstream->istream.iostream, + "read(%s) failed: %s", + i_stream_get_name(sstream->cur_input), + i_stream_get_error(sstream->cur_input)); + sstream->istream.istream.eof = TRUE; + sstream->istream.istream.stream_errno = + sstream->cur_input->stream_errno; + return -1; + } + + /* go to next stream */ + sstream->cur_input = sstream->input[sstream->cur_idx++]; + if (sstream->cur_input == NULL) { + /* last one, EOF */ + sstream->size = sstream->istream.istream.v_offset + + (sstream->istream.pos - sstream->istream.skip); + sstream->istream.istream.eof = TRUE; + /* Now that EOF is reached, the stream can't return 0 + anymore. Callers can now use this stream in places + that assert that blocking==TRUE. */ + sstream->istream.istream.blocking = TRUE; + unref_streams(sstream); + return -1; + } + + /* see if stream has pending data */ + size = i_stream_get_data_size(sstream->cur_input); + if (size != 0) + return size; + } + return ret; +} + +static bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r) +{ + struct istream_private *stream = &sstream->istream; + const unsigned char *data; + size_t size, avail_size; + + if (stream->pos < sstream->buffer_peak) { + /* This could be the first read() or we could have already + seeked backwards. */ + i_assert(stream->pos == 0 && stream->skip == 0); + stream->skip = stream->istream.v_offset; + stream->pos = sstream->buffer_peak; + size = stream->pos - stream->skip; + if (stream->istream.v_offset == sstream->buffer_peak) { + /* this could happen after write to temp file failed */ + return read_from_buffer(sstream, ret_r); + } + } else { + /* need to read more */ + i_assert(stream->pos == sstream->buffer_peak); + size = sstream->cur_input == NULL ? 0 : + i_stream_get_data_size(sstream->cur_input); + if (size == 0) { + /* read more to buffer */ + *ret_r = read_more(sstream); + if (*ret_r == 0 || *ret_r == -1) + return TRUE; + } + + /* we should have more now. */ + data = i_stream_get_data(sstream->cur_input, &size); + i_assert(size > 0); + + /* change skip to 0 temporarily so i_stream_try_alloc() won't try to + compress the buffer. */ + size_t old_skip = stream->skip; + stream->skip = 0; + bool have_space = i_stream_try_alloc(stream, size, &avail_size); + stream->skip = old_skip; + if (!have_space) + return FALSE; + + if (size > avail_size) + size = avail_size; + memcpy(stream->w_buffer + stream->pos, data, size); + stream->pos += size; + sstream->buffer_peak += size; + i_stream_skip(sstream->cur_input, size); + } + + *ret_r = size; + i_assert(*ret_r > 0); + return TRUE; +} + +static int i_stream_seekable_write_failed(struct seekable_istream *sstream) +{ + struct istream_private *stream = &sstream->istream; + void *data; + size_t old_pos = stream->pos; + + i_assert(sstream->fd != -1); + i_assert(stream->skip == 0); + + stream->max_buffer_size = SIZE_MAX; + stream->pos = 0; + data = i_stream_alloc(stream, sstream->write_peak); + stream->pos = old_pos; + + if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) { + sstream->istream.istream.stream_errno = errno; + sstream->istream.istream.eof = TRUE; + io_stream_set_error(&sstream->istream.iostream, + "istream-seekable: read(%s) failed: %m", + sstream->temp_path); + return -1; + } + i_stream_destroy(&sstream->fd_input); + sstream->fd = -1; /* autoclosed by fd_input */ + + i_free_and_null(sstream->temp_path); + return 0; +} + +static ssize_t i_stream_seekable_read(struct istream_private *stream) +{ + struct seekable_istream *sstream = + container_of(stream, struct seekable_istream, istream); + const unsigned char *data; + size_t size, pos; + ssize_t ret; + + if (sstream->fd == -1) { + if (read_from_buffer(sstream, &ret)) + return ret; + + /* copy everything to temp file and use it as the stream */ + if (copy_to_temp_file(sstream) < 0) { + stream->max_buffer_size = SIZE_MAX; + if (!read_from_buffer(sstream, &ret)) + i_unreached(); + return ret; + } + i_assert(sstream->fd != -1); + } + + stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip); + stream->pos -= stream->skip; + stream->skip = 0; + + i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak); + if (stream->istream.v_offset + stream->pos == sstream->write_peak) { + /* need to read more */ + if (sstream->cur_input == NULL || + i_stream_get_data_size(sstream->cur_input) == 0) { + ret = read_more(sstream); + if (ret == -1 || ret == 0) + return ret; + } + + /* save to our file */ + data = i_stream_get_data(sstream->cur_input, &size); + ret = write(sstream->fd, data, size); + if (ret <= 0) { + if (ret < 0 && !ENOSPACE(errno)) { + i_error("istream-seekable: write_full(%s) failed: %m", + sstream->temp_path); + } + if (i_stream_seekable_write_failed(sstream) < 0) + return -1; + if (!read_from_buffer(sstream, &ret)) + i_unreached(); + return ret; + } + i_stream_sync(sstream->fd_input); + i_stream_skip(sstream->cur_input, ret); + sstream->write_peak += ret; + } + + i_stream_seek(sstream->fd_input, stream->istream.v_offset); + ret = i_stream_read_memarea(sstream->fd_input); + if (ret <= 0) { + stream->istream.eof = sstream->fd_input->eof; + stream->istream.stream_errno = + sstream->fd_input->stream_errno; + } else { + ret = -2; + } + + stream->buffer = i_stream_get_data(sstream->fd_input, &pos); + stream->pos -= stream->skip; + stream->skip = 0; + + ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret; + stream->pos = pos; + return ret; +} + +static int +i_stream_seekable_stat(struct istream_private *stream, bool exact) +{ + struct seekable_istream *sstream = + container_of(stream, struct seekable_istream, istream); + const struct stat *st; + uoff_t old_offset, len; + ssize_t ret; + + if (sstream->size != UOFF_T_MAX) { + /* we've already reached EOF and know the size */ + stream->statbuf.st_size = sstream->size; + return 0; + } + + /* we want to know the full size of the file, so read until + we're finished */ + old_offset = stream->istream.v_offset; + do { + i_stream_skip(&stream->istream, + stream->pos - stream->skip); + } while ((ret = i_stream_seekable_read(stream)) > 0); + + if (ret == 0) { + i_panic("i_stream_stat() used for non-blocking " + "seekable stream %s offset %"PRIuUOFF_T, + i_stream_get_name(sstream->cur_input), + sstream->cur_input->v_offset); + } + i_stream_skip(&stream->istream, stream->pos - stream->skip); + len = stream->pos; + i_stream_seek(&stream->istream, old_offset); + unref_streams(sstream); + + if (stream->istream.stream_errno != 0) + return -1; + + if (sstream->fd_input != NULL) { + /* using a file backed buffer, we can use real fstat() */ + if (i_stream_stat(sstream->fd_input, exact, &st) < 0) + return -1; + stream->statbuf = *st; + } else { + /* buffer is completely in memory */ + i_assert(sstream->fd == -1); + + stream->statbuf.st_size = len; + } + return 0; +} + +static void i_stream_seekable_seek(struct istream_private *stream, + uoff_t v_offset, bool mark) +{ + if (v_offset <= stream->istream.v_offset) { + /* seeking backwards */ + stream->istream.v_offset = v_offset; + stream->skip = stream->pos = 0; + } else { + /* we can't skip over data we haven't yet read and written to + our buffer/temp file */ + i_stream_default_seek_nonseekable(stream, v_offset, mark); + } +} + +static struct istream_snapshot * +i_stream_seekable_snapshot(struct istream_private *stream, + struct istream_snapshot *prev_snapshot) +{ + struct seekable_istream *sstream = + container_of(stream, struct seekable_istream, istream); + + if (sstream->fd == -1) { + /* still in memory */ + if (stream->memarea == NULL) + return prev_snapshot; + return i_stream_default_snapshot(stream, prev_snapshot); + } else { + /* using the fd_input stream */ + return sstream->fd_input->real_stream-> + snapshot(sstream->fd_input->real_stream, prev_snapshot); + } +} + +struct istream * +i_streams_merge(struct istream *input[], size_t max_buffer_size, + int (*fd_callback)(const char **path_r, void *context), + void *context) ATTR_NULL(4) +{ + struct seekable_istream *sstream; + const unsigned char *data; + unsigned int count; + size_t size; + bool blocking = TRUE; + + i_assert(max_buffer_size > 0); + + /* if any of the streams isn't blocking, set ourself also nonblocking */ + for (count = 0; input[count] != NULL; count++) { + if (!input[count]->blocking) + blocking = FALSE; + i_stream_ref(input[count]); + } + i_assert(count != 0); + + sstream = i_new(struct seekable_istream, 1); + sstream->fd_callback = fd_callback; + sstream->context = context; + sstream->istream.max_buffer_size = max_buffer_size; + sstream->fd = -1; + sstream->size = UOFF_T_MAX; + + sstream->input = i_new(struct istream *, count + 1); + memcpy(sstream->input, input, sizeof(*input) * count); + sstream->cur_input = sstream->input[0]; + + sstream->istream.iostream.close = i_stream_seekable_close; + sstream->istream.iostream.destroy = i_stream_seekable_destroy; + sstream->istream.iostream.set_max_buffer_size = + i_stream_seekable_set_max_buffer_size; + + sstream->istream.read = i_stream_seekable_read; + sstream->istream.stat = i_stream_seekable_stat; + sstream->istream.seek = i_stream_seekable_seek; + sstream->istream.snapshot = i_stream_seekable_snapshot; + + sstream->istream.istream.readable_fd = FALSE; + sstream->istream.istream.blocking = blocking; + sstream->istream.istream.seekable = TRUE; + (void)i_stream_create(&sstream->istream, NULL, -1, 0); + + /* initialize our buffer from first stream's pending data */ + data = i_stream_get_data(sstream->cur_input, &size); + if (size > 0) { + memcpy(i_stream_alloc(&sstream->istream, size), data, size); + sstream->buffer_peak = size; + i_stream_skip(sstream->cur_input, size); + } + return &sstream->istream.istream; +} + +static bool inputs_are_seekable(struct istream *input[]) +{ + unsigned int count; + + for (count = 0; input[count] != NULL; count++) { + if (!input[count]->seekable) + return FALSE; + } + return TRUE; +} + +struct istream * +i_stream_create_seekable(struct istream *input[], + size_t max_buffer_size, + int (*fd_callback)(const char **path_r, void *context), + void *context) +{ + i_assert(max_buffer_size > 0); + + /* If all input streams are seekable, use concat istream instead */ + if (inputs_are_seekable(input)) + return i_stream_create_concat(input); + + return i_streams_merge(input, max_buffer_size, fd_callback, context); +} + +static int seekable_fd_callback(const char **path_r, void *context) +{ + char *temp_path_prefix = context; + string_t *path; + int fd; + + path = t_str_new(128); + str_append(path, temp_path_prefix); + fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); + if (fd == -1) { + i_error("istream-seekable: safe_mkstemp(%s) failed: %m", str_c(path)); + return -1; + } + + /* we just want the fd, unlink it */ + if (i_unlink(str_c(path)) < 0) { + /* shouldn't happen.. */ + i_close_fd(&fd); + return -1; + } + + *path_r = str_c(path); + return fd; +} + +struct istream * +i_stream_create_seekable_path(struct istream *input[], + size_t max_buffer_size, + const char *temp_path_prefix) +{ + struct seekable_istream *sstream; + struct istream *stream; + + i_assert(temp_path_prefix != NULL); + i_assert(max_buffer_size > 0); + + if (inputs_are_seekable(input)) + return i_stream_create_concat(input); + + stream = i_stream_create_seekable(input, max_buffer_size, + seekable_fd_callback, + i_strdup(temp_path_prefix)); + sstream = container_of(stream->real_stream, + struct seekable_istream, istream); + sstream->free_context = TRUE; + return stream; +} |