summaryrefslogtreecommitdiffstats
path: root/src/lib/iostream-temp.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/lib/iostream-temp.c404
1 files changed, 404 insertions, 0 deletions
diff --git a/src/lib/iostream-temp.c b/src/lib/iostream-temp.c
new file mode 100644
index 0000000..6e14ff5
--- /dev/null
+++ b/src/lib/iostream-temp.c
@@ -0,0 +1,404 @@
+/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "str.h"
+#include "safe-mkstemp.h"
+#include "write-full.h"
+#include "istream-private.h"
+#include "ostream-private.h"
+#include "iostream-temp.h"
+
+#include <unistd.h>
+
+#define IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT (1024*128)
+
+struct temp_ostream {
+ struct ostream_private ostream;
+
+ char *temp_path_prefix;
+ enum iostream_temp_flags flags;
+ size_t max_mem_size;
+
+ struct istream *dupstream;
+ uoff_t dupstream_offset, dupstream_start_offset;
+ char *name;
+
+ buffer_t *buf;
+ int fd;
+ bool fd_tried;
+ uoff_t fd_size;
+};
+
+static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
+ enum ostream_send_istream_result *res_r);
+
+static void
+o_stream_temp_close(struct iostream_private *stream,
+ bool close_parent ATTR_UNUSED)
+{
+ struct temp_ostream *tstream =
+ container_of(stream, struct temp_ostream, ostream.iostream);
+
+ i_close_fd(&tstream->fd);
+ buffer_free(&tstream->buf);
+ i_free(tstream->temp_path_prefix);
+ i_free(tstream->name);
+}
+
+static int o_stream_temp_move_to_fd(struct temp_ostream *tstream)
+{
+ string_t *path;
+
+ if (tstream->fd_tried)
+ return -1;
+ tstream->fd_tried = TRUE;
+
+ path = t_str_new(128);
+ str_append(path, tstream->temp_path_prefix);
+ tstream->fd = safe_mkstemp_hostpid(path, 0600, (uid_t)-1, (gid_t)-1);
+ if (tstream->fd == -1) {
+ i_error("safe_mkstemp(%s) failed: %m", str_c(path));
+ return -1;
+ }
+ if (i_unlink(str_c(path)) < 0) {
+ i_close_fd(&tstream->fd);
+ return -1;
+ }
+ if (write_full(tstream->fd, tstream->buf->data, tstream->buf->used) < 0) {
+ i_error("write(%s) failed: %m", str_c(path));
+ i_close_fd(&tstream->fd);
+ return -1;
+ }
+ /* make the fd available also to o_stream_get_fd(),
+ e.g. for unit tests */
+ tstream->ostream.fd = tstream->fd;
+ tstream->fd_size = tstream->buf->used;
+ buffer_free(&tstream->buf);
+ return 0;
+}
+
+int o_stream_temp_move_to_memory(struct ostream *output)
+{
+ struct temp_ostream *tstream =
+ container_of(output->real_stream, struct temp_ostream, ostream);
+ unsigned char buf[IO_BLOCK_SIZE];
+ uoff_t offset = 0;
+ ssize_t ret = 0;
+
+ i_assert(tstream->buf == NULL);
+ tstream->buf = buffer_create_dynamic(default_pool, 8192);
+ while (offset < tstream->ostream.ostream.offset &&
+ (ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) {
+ if ((size_t)ret > tstream->ostream.ostream.offset - offset)
+ ret = tstream->ostream.ostream.offset - offset;
+ buffer_append(tstream->buf, buf, ret);
+ offset += ret;
+ }
+ if (ret < 0) {
+ /* not really expecting this to happen */
+ i_error("iostream-temp %s: read(%s*) failed: %m",
+ o_stream_get_name(&tstream->ostream.ostream),
+ tstream->temp_path_prefix);
+ tstream->ostream.ostream.stream_errno = EIO;
+ return -1;
+ }
+ i_close_fd(&tstream->fd);
+ tstream->ostream.fd = -1;
+ return 0;
+}
+
+static ssize_t
+o_stream_temp_fd_sendv(struct temp_ostream *tstream,
+ const struct const_iovec *iov, unsigned int iov_count)
+{
+ size_t bytes = 0;
+ unsigned int i;
+
+ for (i = 0; i < iov_count; i++) {
+ if (write_full(tstream->fd, iov[i].iov_base, iov[i].iov_len) < 0) {
+ i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory",
+ o_stream_get_name(&tstream->ostream.ostream),
+ tstream->temp_path_prefix);
+ if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0)
+ return -1;
+ for (; i < iov_count; i++) {
+ buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len);
+ bytes += iov[i].iov_len;
+ tstream->ostream.ostream.offset += iov[i].iov_len;
+ }
+ i_assert(tstream->fd_tried);
+ return bytes;
+ }
+ bytes += iov[i].iov_len;
+ tstream->ostream.ostream.offset += iov[i].iov_len;
+ }
+ tstream->fd_size += bytes;
+ return bytes;
+}
+
+static ssize_t
+o_stream_temp_sendv(struct ostream_private *stream,
+ const struct const_iovec *iov, unsigned int iov_count)
+{
+ struct temp_ostream *tstream =
+ container_of(stream, struct temp_ostream, ostream);
+ ssize_t ret = 0;
+ unsigned int i;
+ enum ostream_send_istream_result res;
+
+
+ tstream->flags &= ENUM_NEGATE(IOSTREAM_TEMP_FLAG_TRY_FD_DUP);
+ if (tstream->dupstream != NULL) {
+ if (o_stream_temp_dup_cancel(tstream, &res))
+ return -1;
+ }
+
+ if (tstream->fd != -1)
+ return o_stream_temp_fd_sendv(tstream, iov, iov_count);
+
+ for (i = 0; i < iov_count; i++) {
+ if (tstream->buf->used + iov[i].iov_len > tstream->max_mem_size) {
+ if (o_stream_temp_move_to_fd(tstream) == 0) {
+ i_assert(tstream->fd != -1);
+ return o_stream_temp_fd_sendv(tstream, iov+i,
+ iov_count-i);
+ }
+ /* failed to move to temp fd, just keep it in memory */
+ }
+ buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len);
+ ret += iov[i].iov_len;
+ stream->ostream.offset += iov[i].iov_len;
+ }
+ return ret;
+}
+
+static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
+ enum ostream_send_istream_result *res_r)
+{
+ struct istream *input;
+ uoff_t size = tstream->dupstream_offset -
+ tstream->dupstream_start_offset;
+ bool ret = TRUE; /* use res_r to return error */
+
+ i_stream_seek(tstream->dupstream, tstream->dupstream_start_offset);
+ tstream->ostream.ostream.offset = 0;
+
+ input = i_stream_create_limit(tstream->dupstream, size);
+ i_stream_unref(&tstream->dupstream);
+
+ *res_r = io_stream_copy(&tstream->ostream.ostream, input);
+ switch (*res_r) {
+ case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+ /* everything copied */
+ ret = FALSE;
+ break;
+ case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+ case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+ i_unreached();
+ case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+ tstream->ostream.ostream.stream_errno = input->stream_errno;
+ io_stream_set_error(&tstream->ostream.iostream,
+ "iostream-temp: read(%s) failed: %s",
+ i_stream_get_name(input),
+ i_stream_get_error(input));
+ break;
+ case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+ break;
+ }
+ i_stream_destroy(&input);
+ return ret;
+}
+
+static bool
+o_stream_temp_dup_istream(struct temp_ostream *outstream,
+ struct istream *instream,
+ enum ostream_send_istream_result *res_r)
+{
+ uoff_t in_size;
+
+ if (!instream->readable_fd || i_stream_get_fd(instream) == -1)
+ return FALSE;
+
+ if (i_stream_get_size(instream, TRUE, &in_size) <= 0) {
+ if (outstream->dupstream != NULL)
+ return o_stream_temp_dup_cancel(outstream, res_r);
+ return FALSE;
+ }
+ i_assert(instream->v_offset <= in_size);
+
+ if (outstream->dupstream == NULL) {
+ outstream->dupstream = instream;
+ outstream->dupstream_start_offset = instream->v_offset;
+ i_stream_ref(outstream->dupstream);
+ } else {
+ if (outstream->dupstream != instream ||
+ outstream->dupstream_offset != instream->v_offset ||
+ outstream->dupstream_offset > in_size)
+ return o_stream_temp_dup_cancel(outstream, res_r);
+ }
+ i_stream_seek(instream, in_size);
+ /* we should be at EOF now. o_stream_send_istream() asserts if
+ eof isn't set. */
+ instream->eof = TRUE;
+ outstream->dupstream_offset = instream->v_offset;
+ outstream->ostream.ostream.offset =
+ outstream->dupstream_offset - outstream->dupstream_start_offset;
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+ return TRUE;
+}
+
+static enum ostream_send_istream_result
+o_stream_temp_send_istream(struct ostream_private *_outstream,
+ struct istream *instream)
+{
+ struct temp_ostream *outstream =
+ container_of(_outstream, struct temp_ostream, ostream);
+ enum ostream_send_istream_result res;
+
+ if ((outstream->flags & IOSTREAM_TEMP_FLAG_TRY_FD_DUP) != 0) {
+ if (o_stream_temp_dup_istream(outstream, instream, &res))
+ return res;
+ outstream->flags &= ENUM_NEGATE(IOSTREAM_TEMP_FLAG_TRY_FD_DUP);
+ }
+ return io_stream_copy(&outstream->ostream.ostream, instream);
+}
+
+static int
+o_stream_temp_write_at(struct ostream_private *stream,
+ const void *data, size_t size, uoff_t offset)
+{
+ struct temp_ostream *tstream =
+ container_of(stream, struct temp_ostream, ostream);
+
+ if (tstream->fd == -1) {
+ i_assert(stream->ostream.offset == tstream->buf->used);
+ buffer_write(tstream->buf, offset, data, size);
+ stream->ostream.offset = tstream->buf->used;
+ } else {
+ if (pwrite_full(tstream->fd, data, size, offset) < 0) {
+ stream->ostream.stream_errno = errno;
+ i_close_fd(&tstream->fd);
+ return -1;
+ }
+ if (tstream->fd_size < offset + size)
+ tstream->fd_size = offset + size;
+ }
+ return 0;
+}
+
+static int o_stream_temp_seek(struct ostream_private *_stream, uoff_t offset)
+{
+ _stream->ostream.offset = offset;
+ return 0;
+}
+
+struct ostream *iostream_temp_create(const char *temp_path_prefix,
+ enum iostream_temp_flags flags)
+{
+ return iostream_temp_create_named(temp_path_prefix, flags, "");
+}
+
+struct ostream *iostream_temp_create_named(const char *temp_path_prefix,
+ enum iostream_temp_flags flags,
+ const char *name)
+{
+ return iostream_temp_create_sized(temp_path_prefix, flags, name,
+ IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT);
+}
+
+struct ostream *iostream_temp_create_sized(const char *temp_path_prefix,
+ enum iostream_temp_flags flags,
+ const char *name,
+ size_t max_mem_size)
+{
+ struct temp_ostream *tstream;
+ struct ostream *output;
+
+ tstream = i_new(struct temp_ostream, 1);
+ tstream->ostream.ostream.blocking = TRUE;
+ tstream->ostream.sendv = o_stream_temp_sendv;
+ tstream->ostream.send_istream = o_stream_temp_send_istream;
+ tstream->ostream.write_at = o_stream_temp_write_at;
+ tstream->ostream.seek = o_stream_temp_seek;
+ tstream->ostream.iostream.close = o_stream_temp_close;
+ tstream->temp_path_prefix = i_strdup(temp_path_prefix);
+ tstream->flags = flags;
+ tstream->max_mem_size = max_mem_size;
+ tstream->buf = buffer_create_dynamic(default_pool, 8192);
+ tstream->fd = -1;
+
+ output = o_stream_create(&tstream->ostream, NULL, -1);
+ tstream->name = i_strdup(name);
+ if (name[0] == '\0') {
+ o_stream_set_name(output, t_strdup_printf(
+ "(temp iostream in %s)", temp_path_prefix));
+ } else {
+ o_stream_set_name(output, t_strdup_printf(
+ "(temp iostream in %s for %s)", temp_path_prefix, name));
+ }
+ return output;
+}
+
+static void iostream_temp_buf_destroyed(buffer_t *buf)
+{
+ buffer_free(&buf);
+}
+
+struct istream *iostream_temp_finish(struct ostream **output,
+ size_t max_buffer_size)
+{
+ struct temp_ostream *tstream =
+ container_of((*output)->real_stream, struct temp_ostream,
+ ostream);
+ struct istream *input, *input2;
+ uoff_t abs_offset, size;
+ const char *for_path;
+ int fd;
+
+ if (tstream->name[0] == '\0')
+ for_path = "";
+ else
+ for_path = t_strdup_printf(" for %s", tstream->name);
+
+ if (tstream->dupstream != NULL && !tstream->dupstream->closed) {
+ abs_offset = i_stream_get_absolute_offset(tstream->dupstream) -
+ tstream->dupstream->v_offset +
+ tstream->dupstream_start_offset;
+ size = tstream->dupstream_offset -
+ tstream->dupstream_start_offset;
+ fd = dup(i_stream_get_fd(tstream->dupstream));
+ if (fd == -1)
+ input = i_stream_create_error_str(errno, "dup() failed: %m");
+ else {
+ input2 = i_stream_create_fd_autoclose(&fd, max_buffer_size);
+ i_stream_seek(input2, abs_offset);
+ input = i_stream_create_limit(input2, size);
+ i_stream_unref(&input2);
+ }
+ i_stream_set_name(input, t_strdup_printf(
+ "(Temp file in %s%s, from %s)", tstream->temp_path_prefix,
+ for_path, i_stream_get_name(tstream->dupstream)));
+ i_stream_unref(&tstream->dupstream);
+ } else if (tstream->dupstream != NULL) {
+ /* return the original failed stream. */
+ input = tstream->dupstream;
+ } else if (tstream->fd != -1) {
+ int fd = tstream->fd;
+ input = i_stream_create_fd_autoclose(&tstream->fd, max_buffer_size);
+ i_stream_set_name(input, t_strdup_printf(
+ "(Temp file fd %d in %s%s, %"PRIuUOFF_T" bytes)",
+ fd, tstream->temp_path_prefix, for_path, tstream->fd_size));
+ } else {
+ input = i_stream_create_from_data(tstream->buf->data,
+ tstream->buf->used);
+ i_stream_set_name(input, t_strdup_printf(
+ "(Temp buffer in %s%s, %zu bytes)",
+ tstream->temp_path_prefix, for_path, tstream->buf->used));
+ i_stream_add_destroy_callback(input, iostream_temp_buf_destroyed,
+ tstream->buf);
+ tstream->buf = NULL;
+ }
+ o_stream_destroy(output);
+ return input;
+}