summaryrefslogtreecommitdiffstats
path: root/src/lib/ostream-file.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/ostream-file.c')
-rw-r--r--src/lib/ostream-file.c1154
1 files changed, 1154 insertions, 0 deletions
diff --git a/src/lib/ostream-file.c b/src/lib/ostream-file.c
new file mode 100644
index 0000000..2be00d2
--- /dev/null
+++ b/src/lib/ostream-file.c
@@ -0,0 +1,1154 @@
+/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
+
+/* @UNSAFE: whole file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "write-full.h"
+#include "net.h"
+#include "sendfile-util.h"
+#include "istream.h"
+#include "istream-private.h"
+#include "ostream-file-private.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+#ifdef HAVE_SYS_UIO_H
+# include <sys/uio.h>
+#endif
+#include <fcntl.h>
+
+/* try to keep the buffer size within 4k..128k. ReiserFS may actually return
+ 128k as optimal size. */
+#define DEFAULT_OPTIMAL_BLOCK_SIZE IO_BLOCK_SIZE
+#define MAX_OPTIMAL_BLOCK_SIZE (128*1024)
+
+#define IS_STREAM_EMPTY(fstream) \
+ ((fstream)->head == (fstream)->tail && !(fstream)->full)
+
+#define MAX_SSIZE_T(size) \
+ ((size) < SSIZE_T_MAX ? (size_t)(size) : SSIZE_T_MAX)
+
+static void stream_send_io(struct file_ostream *fstream);
+static struct ostream * o_stream_create_fd_common(int fd,
+ size_t max_buffer_size, bool autoclose_fd);
+
+static void stream_closed(struct file_ostream *fstream)
+{
+ io_remove(&fstream->io);
+
+ if (fstream->autoclose_fd && fstream->fd != -1) {
+ /* Ignore ECONNRESET because we don't really care about it here,
+ as we are closing the socket down in any case. There might be
+ unsent data but nothing we can do about that. */
+ if (unlikely(close(fstream->fd) < 0 && errno != ECONNRESET)) {
+ i_error("file_ostream.close(%s) failed: %m",
+ o_stream_get_name(&fstream->ostream.ostream));
+ }
+ }
+ fstream->fd = -1;
+
+ fstream->ostream.ostream.closed = TRUE;
+}
+
+void o_stream_file_close(struct iostream_private *stream,
+ bool close_parent ATTR_UNUSED)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream.iostream);
+
+ stream_closed(fstream);
+}
+
+static void o_stream_file_destroy(struct iostream_private *stream)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream.iostream);
+
+ i_free(fstream->buffer);
+}
+
+static size_t file_buffer_get_used_size(struct file_ostream *fstream)
+{
+ if (fstream->head == fstream->tail)
+ return fstream->full ? fstream->buffer_size : 0;
+ else if (fstream->head < fstream->tail) {
+ /* ...HXXXT... */
+ return fstream->tail - fstream->head;
+ } else {
+ /* XXXT...HXXX */
+ return fstream->tail +
+ (fstream->buffer_size - fstream->head);
+ }
+}
+
+static void update_buffer(struct file_ostream *fstream, size_t size)
+{
+ size_t used;
+
+ if (IS_STREAM_EMPTY(fstream) || size == 0)
+ return;
+
+ if (fstream->head < fstream->tail) {
+ /* ...HXXXT... */
+ used = fstream->tail - fstream->head;
+ i_assert(size <= used);
+ fstream->head += size;
+ } else {
+ /* XXXT...HXXX */
+ used = fstream->buffer_size - fstream->head;
+ if (size > used) {
+ size -= used;
+ i_assert(size <= fstream->tail);
+ fstream->head = size;
+ } else {
+ fstream->head += size;
+ }
+
+ fstream->full = FALSE;
+ }
+
+ if (fstream->head == fstream->tail)
+ fstream->head = fstream->tail = 0;
+
+ if (fstream->head == fstream->buffer_size)
+ fstream->head = 0;
+}
+
+static void o_stream_socket_cork(struct file_ostream *fstream)
+{
+ if (fstream->ostream.corked && !fstream->socket_cork_set) {
+ if (!fstream->no_socket_cork) {
+ if (net_set_cork(fstream->fd, TRUE) < 0)
+ fstream->no_socket_cork = TRUE;
+ else
+ fstream->socket_cork_set = TRUE;
+ }
+ }
+}
+
+static int o_stream_lseek(struct file_ostream *fstream)
+{
+ off_t ret;
+
+ if (fstream->real_offset == fstream->buffer_offset)
+ return 0;
+
+ ret = lseek(fstream->fd, (off_t)fstream->buffer_offset, SEEK_SET);
+ if (ret < 0) {
+ io_stream_set_error(&fstream->ostream.iostream,
+ "lseek() failed: %m");
+ fstream->ostream.ostream.stream_errno = errno;
+ return -1;
+ }
+
+ if (ret != (off_t)fstream->buffer_offset) {
+ io_stream_set_error(&fstream->ostream.iostream,
+ "lseek() returned wrong value");
+ fstream->ostream.ostream.stream_errno = EINVAL;
+ return -1;
+ }
+ fstream->real_offset = fstream->buffer_offset;
+ return 0;
+}
+
+ssize_t o_stream_file_writev(struct file_ostream *fstream,
+ const struct const_iovec *iov,
+ unsigned int iov_count)
+{
+ ssize_t ret;
+ size_t size, sent;
+ unsigned int i;
+
+ if (iov_count == 1) {
+ i_assert(iov->iov_len > 0);
+
+ if (!fstream->file ||
+ fstream->real_offset == fstream->buffer_offset) {
+ ret = write(fstream->fd, iov->iov_base, iov->iov_len);
+ if (ret > 0)
+ fstream->real_offset += ret;
+ } else {
+ ret = pwrite(fstream->fd, iov->iov_base, iov->iov_len,
+ fstream->buffer_offset);
+ }
+ } else {
+ if (o_stream_lseek(fstream) < 0)
+ return -1;
+
+ sent = 0;
+ while (iov_count > IOV_MAX) {
+ size = 0;
+ for (i = 0; i < IOV_MAX; i++)
+ size += iov[i].iov_len;
+
+ ret = writev(fstream->fd, (const struct iovec *)iov,
+ IOV_MAX);
+ if (ret != (ssize_t)size) {
+ break;
+ }
+
+ fstream->real_offset += ret;
+ fstream->buffer_offset += ret;
+ sent += ret;
+ iov += IOV_MAX;
+ iov_count -= IOV_MAX;
+ }
+
+ if (iov_count <= IOV_MAX) {
+ size = 0;
+ for (i = 0; i < iov_count; i++)
+ size += iov[i].iov_len;
+
+ ret = writev(fstream->fd, (const struct iovec *)iov,
+ iov_count);
+ }
+ if (ret > 0) {
+ fstream->real_offset += ret;
+ ret += sent;
+ } else if (!fstream->file && sent > 0) {
+ /* return what we managed to get sent */
+ ret = sent;
+ }
+ }
+ return ret;
+}
+
+static ssize_t
+o_stream_file_writev_full(struct file_ostream *fstream,
+ const struct const_iovec *iov,
+ unsigned int iov_count)
+{
+ ssize_t ret, ret2;
+ size_t size, total_size;
+ bool partial;
+ unsigned int i;
+
+ for (i = 0, total_size = 0; i < iov_count; i++)
+ total_size += iov[i].iov_len;
+
+ o_stream_socket_cork(fstream);
+ ret = fstream->writev(fstream, iov, iov_count);
+ partial = ret != (ssize_t)total_size;
+
+ if (ret < 0) {
+ if (fstream->file) {
+ if (errno == EINTR) {
+ /* automatically retry */
+ return o_stream_file_writev_full(fstream, iov, iov_count);
+ }
+ } else if (errno == EAGAIN || errno == EINTR) {
+ /* try again later */
+ return 0;
+ }
+ fstream->ostream.ostream.stream_errno = errno;
+ stream_closed(fstream);
+ return -1;
+ }
+ if (unlikely(ret == 0 && fstream->file)) {
+ /* assume out of disk space */
+ fstream->ostream.ostream.stream_errno = ENOSPC;
+ stream_closed(fstream);
+ return -1;
+ }
+ fstream->buffer_offset += ret;
+ if (partial && fstream->file) {
+ /* we failed to write everything to a file. either we ran out
+ of disk space or we're writing to NFS. try to write the
+ rest to resolve this. */
+ size = ret;
+ while (iov_count > 0 && size >= iov->iov_len) {
+ size -= iov->iov_len;
+ iov++;
+ iov_count--;
+ }
+ i_assert(iov_count > 0);
+ if (size == 0)
+ ret2 = o_stream_file_writev_full(fstream, iov, iov_count);
+ else {
+ /* write the first iov separately */
+ struct const_iovec new_iov;
+
+ new_iov.iov_base =
+ CONST_PTR_OFFSET(iov->iov_base, size);
+ new_iov.iov_len = iov->iov_len - size;
+ ret2 = o_stream_file_writev_full(fstream, &new_iov, 1);
+ if (ret2 > 0) {
+ i_assert((size_t)ret2 == new_iov.iov_len);
+ /* write the rest */
+ if (iov_count > 1) {
+ ret += ret2;
+ ret2 = o_stream_file_writev_full(fstream, iov + 1,
+ iov_count - 1);
+ }
+ }
+ }
+ i_assert(ret2 != 0);
+ if (ret2 < 0)
+ ret = ret2;
+ else
+ ret += ret2;
+ }
+ i_assert(ret < 0 || !fstream->file ||
+ (size_t)ret == total_size);
+ return ret;
+}
+
+/* returns how much of vector was used */
+static int o_stream_fill_iovec(struct file_ostream *fstream,
+ struct const_iovec iov[2])
+{
+ if (IS_STREAM_EMPTY(fstream))
+ return 0;
+
+ if (fstream->head < fstream->tail) {
+ iov[0].iov_base = fstream->buffer + fstream->head;
+ iov[0].iov_len = fstream->tail - fstream->head;
+ return 1;
+ } else {
+ iov[0].iov_base = fstream->buffer + fstream->head;
+ iov[0].iov_len = fstream->buffer_size - fstream->head;
+ if (fstream->tail == 0)
+ return 1;
+ else {
+ iov[1].iov_base = fstream->buffer;
+ iov[1].iov_len = fstream->tail;
+ return 2;
+ }
+ }
+}
+
+static int buffer_flush(struct file_ostream *fstream)
+{
+ struct const_iovec iov[2];
+ int iov_len;
+ ssize_t ret;
+
+ iov_len = o_stream_fill_iovec(fstream, iov);
+ if (iov_len > 0) {
+ ret = o_stream_file_writev_full(fstream, iov, iov_len);
+ if (ret < 0)
+ return -1;
+
+ update_buffer(fstream, ret);
+ }
+
+ return IS_STREAM_EMPTY(fstream) ? 1 : 0;
+}
+
+static void o_stream_tcp_flush_via_nodelay(struct file_ostream *fstream)
+{
+ if (net_set_tcp_nodelay(fstream->fd, TRUE) < 0) {
+ /* Don't bother logging errors. There are quite a lot of
+ different errors that need to be ignored, and it differs
+ between OSes. At least:
+ Linux: ENOTSUP, ENOTSOCK, ENOPROTOOPT
+ FreeBSD: EINVAL, ECONNRESET */
+ fstream->no_socket_nodelay = TRUE;
+ } else if (net_set_tcp_nodelay(fstream->fd, FALSE) < 0) {
+ /* We already successfully enabled TCP_NODELAY, so there
+ shouldn't really be errors. Except ECONNRESET can possibly
+ still happen between these two calls, so again don't log
+ errors. */
+ fstream->no_socket_nodelay = TRUE;
+ }
+}
+
+static void o_stream_file_cork(struct ostream_private *stream, bool set)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+ struct iostream_private *iostream = &fstream->ostream.iostream;
+ int ret;
+
+ if (stream->corked != set && !stream->ostream.closed) {
+ if (set && fstream->io != NULL)
+ io_remove(&fstream->io);
+ else if (!set) {
+ /* buffer flushing might close the stream */
+ ret = buffer_flush(fstream);
+ stream->last_errors_not_checked = TRUE;
+ if (fstream->io == NULL &&
+ (ret == 0 || fstream->flush_pending) &&
+ !stream->ostream.closed) {
+ fstream->io = io_add_to(
+ io_stream_get_ioloop(iostream),
+ fstream->fd, IO_WRITE,
+ stream_send_io, fstream);
+ }
+ }
+ if (stream->ostream.closed) {
+ /* flushing may have closed the stream already */
+ return;
+ }
+
+ if (fstream->socket_cork_set) {
+ i_assert(!set);
+ if (net_set_cork(fstream->fd, FALSE) < 0)
+ fstream->no_socket_cork = TRUE;
+ fstream->socket_cork_set = FALSE;
+ }
+ if (!set && !fstream->no_socket_nodelay) {
+ /* Uncorking - send all the pending data immediately.
+ Remove nodelay immediately afterwards, so if any
+ output is sent outside corking it may get delayed. */
+ o_stream_tcp_flush_via_nodelay(fstream);
+ }
+ if (!set && !fstream->no_socket_quickack) {
+ /* Uncorking - disable delayed ACKs to reduce latency.
+ Note that this needs to be set repeatedly. */
+ if (net_set_tcp_quickack(fstream->fd, TRUE) < 0)
+ fstream->no_socket_quickack = TRUE;
+ }
+ stream->corked = set;
+ }
+}
+
+static int o_stream_file_flush(struct ostream_private *stream)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+
+ return buffer_flush(fstream);
+}
+
+static void
+o_stream_file_flush_pending(struct ostream_private *stream, bool set)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+ struct iostream_private *iostream = &fstream->ostream.iostream;
+
+ fstream->flush_pending = set;
+ if (set && !stream->corked && fstream->io == NULL) {
+ fstream->io = io_add_to(io_stream_get_ioloop(iostream),
+ fstream->fd, IO_WRITE,
+ stream_send_io, fstream);
+ }
+}
+
+static size_t get_unused_space(const struct file_ostream *fstream)
+{
+ if (fstream->head > fstream->tail) {
+ /* XXXT...HXXX */
+ return fstream->head - fstream->tail;
+ } else if (fstream->head < fstream->tail) {
+ /* ...HXXXT... */
+ return (fstream->buffer_size - fstream->tail) + fstream->head;
+ } else {
+ /* either fully unused or fully used */
+ return fstream->full ? 0 : fstream->buffer_size;
+ }
+}
+
+static size_t
+o_stream_file_get_buffer_used_size(const struct ostream_private *stream)
+{
+ const struct file_ostream *fstream =
+ container_of(stream, const struct file_ostream, ostream);
+
+ return fstream->buffer_size - get_unused_space(fstream);
+}
+
+static int o_stream_file_seek(struct ostream_private *stream, uoff_t offset)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+
+ if (offset > OFF_T_MAX) {
+ stream->ostream.stream_errno = EINVAL;
+ return -1;
+ }
+ if (!fstream->file) {
+ stream->ostream.stream_errno = ESPIPE;
+ return -1;
+ }
+
+ if (buffer_flush(fstream) < 0)
+ return -1;
+
+ stream->ostream.offset = offset;
+ fstream->buffer_offset = offset;
+ return 1;
+}
+
+static void o_stream_grow_buffer(struct file_ostream *fstream, size_t bytes)
+{
+ size_t size, new_size, end_size;
+
+ size = nearest_power(fstream->buffer_size + bytes);
+ if (size > fstream->ostream.max_buffer_size) {
+ /* limit the size */
+ size = fstream->ostream.max_buffer_size;
+ } else if (fstream->ostream.corked) {
+ /* try to use optimal buffer size with corking */
+ new_size = I_MIN(fstream->optimal_block_size,
+ fstream->ostream.max_buffer_size);
+ if (new_size > size)
+ size = new_size;
+ }
+
+ if (size <= fstream->buffer_size)
+ return;
+
+ fstream->buffer = i_realloc(fstream->buffer,
+ fstream->buffer_size, size);
+
+ if (fstream->tail <= fstream->head && !IS_STREAM_EMPTY(fstream)) {
+ /* move head forward to end of buffer */
+ end_size = fstream->buffer_size - fstream->head;
+ memmove(fstream->buffer + size - end_size,
+ fstream->buffer + fstream->head, end_size);
+ fstream->head = size - end_size;
+ }
+
+ fstream->full = FALSE;
+ fstream->buffer_size = size;
+}
+
+static void stream_send_io(struct file_ostream *fstream)
+{
+ struct ostream *ostream = &fstream->ostream.ostream;
+ struct iostream_private *iostream = &fstream->ostream.iostream;
+ bool use_cork = !fstream->ostream.corked;
+ int ret;
+
+ /* Set flush_pending = FALSE first before calling the flush callback,
+ and change it to TRUE only if callback returns 0. That way the
+ callback can call o_stream_set_flush_pending() again and we don't
+ forget it even if flush callback returns 1. */
+ fstream->flush_pending = FALSE;
+
+ o_stream_ref(ostream);
+ if (use_cork)
+ o_stream_cork(ostream);
+ if (fstream->ostream.callback != NULL)
+ ret = fstream->ostream.callback(fstream->ostream.context);
+ else
+ ret = o_stream_file_flush(&fstream->ostream);
+ if (use_cork)
+ o_stream_uncork(ostream);
+
+ if (ret == 0)
+ fstream->flush_pending = TRUE;
+
+ if (!fstream->flush_pending && IS_STREAM_EMPTY(fstream)) {
+ io_remove(&fstream->io);
+ } else if (!fstream->ostream.ostream.closed) {
+ /* Add the IO handler if it's not there already. Callback
+ might have just returned 0 without there being any data
+ to be sent. */
+ if (fstream->io == NULL) {
+ fstream->io = io_add_to(io_stream_get_ioloop(iostream),
+ fstream->fd, IO_WRITE,
+ stream_send_io, fstream);
+ }
+ }
+
+ o_stream_unref(&ostream);
+}
+
+static size_t o_stream_add(struct file_ostream *fstream,
+ const void *data, size_t size)
+{
+ struct iostream_private *iostream = &fstream->ostream.iostream;
+ size_t unused, sent;
+ int i;
+
+ unused = get_unused_space(fstream);
+ if (unused < size)
+ o_stream_grow_buffer(fstream, size-unused);
+
+ sent = 0;
+ for (i = 0; i < 2 && sent < size && !fstream->full; i++) {
+ unused = fstream->tail >= fstream->head ?
+ fstream->buffer_size - fstream->tail :
+ fstream->head - fstream->tail;
+
+ if (unused > size-sent)
+ unused = size-sent;
+ memcpy(fstream->buffer + fstream->tail,
+ CONST_PTR_OFFSET(data, sent), unused);
+ sent += unused;
+
+ fstream->tail += unused;
+ if (fstream->tail == fstream->buffer_size)
+ fstream->tail = 0;
+
+ if (fstream->head == fstream->tail &&
+ fstream->buffer_size > 0)
+ fstream->full = TRUE;
+ }
+
+ if (sent != 0 && fstream->io == NULL &&
+ !fstream->ostream.corked && !fstream->file) {
+ fstream->io = io_add_to(io_stream_get_ioloop(iostream),
+ fstream->fd, IO_WRITE, stream_send_io,
+ fstream);
+ }
+
+ return sent;
+}
+
+ssize_t o_stream_file_sendv(struct ostream_private *stream,
+ const struct const_iovec *iov,
+ unsigned int iov_count)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+ size_t size, total_size, added, optimal_size;
+ unsigned int i;
+ ssize_t ret = 0;
+
+ for (i = 0, size = 0; i < iov_count; i++)
+ size += iov[i].iov_len;
+ total_size = size;
+
+ if (size > get_unused_space(fstream) && !IS_STREAM_EMPTY(fstream)) {
+ if (o_stream_file_flush(stream) < 0)
+ return -1;
+ }
+
+ optimal_size = I_MIN(fstream->optimal_block_size,
+ fstream->ostream.max_buffer_size);
+ if (IS_STREAM_EMPTY(fstream) &&
+ (!stream->corked || size >= optimal_size)) {
+ /* send immediately */
+ ret = o_stream_file_writev_full(fstream, iov, iov_count);
+ if (ret < 0)
+ return -1;
+
+ size = ret;
+ while (size > 0 && iov_count > 0 && size >= iov[0].iov_len) {
+ size -= iov[0].iov_len;
+ iov++;
+ iov_count--;
+ }
+
+ if (iov_count == 0)
+ i_assert(size == 0);
+ else {
+ added = o_stream_add(fstream,
+ CONST_PTR_OFFSET(iov[0].iov_base, size),
+ iov[0].iov_len - size);
+ ret += added;
+
+ if (added != iov[0].iov_len - size) {
+ /* buffer full */
+ stream->ostream.offset += ret;
+ return ret;
+ }
+
+ iov++;
+ iov_count--;
+ }
+ }
+
+ /* buffer it, at least partly */
+ for (i = 0; i < iov_count; i++) {
+ added = o_stream_add(fstream, iov[i].iov_base, iov[i].iov_len);
+ ret += added;
+ if (added != iov[i].iov_len)
+ break;
+ }
+ stream->ostream.offset += ret;
+ i_assert((size_t)ret <= total_size);
+ i_assert((size_t)ret == total_size || !fstream->file);
+ return ret;
+}
+
+static size_t
+o_stream_file_update_buffer(struct file_ostream *fstream,
+ const void *data, size_t size, size_t pos)
+{
+ size_t avail, copy_size;
+
+ if (fstream->head < fstream->tail) {
+ /* ...HXXXT... */
+ i_assert(pos < fstream->tail);
+ avail = fstream->tail - pos;
+ } else {
+ /* XXXT...HXXX */
+ avail = fstream->buffer_size - pos;
+ }
+ copy_size = I_MIN(size, avail);
+ memcpy(fstream->buffer + pos, data, copy_size);
+ data = CONST_PTR_OFFSET(data, copy_size);
+ size -= copy_size;
+
+ if (size > 0 && fstream->head >= fstream->tail) {
+ /* wraps to beginning of the buffer */
+ copy_size = I_MIN(size, fstream->tail);
+ memcpy(fstream->buffer, data, copy_size);
+ size -= copy_size;
+ }
+ return size;
+}
+
+static int
+o_stream_file_write_at(struct ostream_private *stream,
+ const void *data, size_t size, uoff_t offset)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+ size_t used, pos, skip, left;
+
+ /* update buffer if the write overlaps it */
+ used = file_buffer_get_used_size(fstream);
+ if (used > 0 &&
+ fstream->buffer_offset < offset + size &&
+ fstream->buffer_offset + used > offset) {
+ if (fstream->buffer_offset <= offset) {
+ /* updating from the beginning */
+ skip = 0;
+ } else {
+ skip = fstream->buffer_offset - offset;
+ }
+ pos = (fstream->head + offset + skip - fstream->buffer_offset) %
+ fstream->buffer_size;
+ left = o_stream_file_update_buffer(fstream,
+ CONST_PTR_OFFSET(data, skip), size - skip, pos);
+ if (left > 0) {
+ /* didn't write all of it */
+ if (skip > 0) {
+ /* we also have to write a prefix. don't
+ bother with two syscalls, just write all
+ of it in one pwrite(). */
+ } else {
+ /* write only the suffix */
+ size_t update_count = size - left;
+
+ data = CONST_PTR_OFFSET(data, update_count);
+ size -= update_count;
+ offset += update_count;
+ }
+ } else if (skip == 0) {
+ /* everything done */
+ return 0;
+ } else {
+ /* still have to write prefix */
+ size = skip;
+ }
+ }
+
+ /* we couldn't write everything to the buffer. flush the buffer
+ and pwrite() the rest. */
+ if (o_stream_file_flush(stream) < 0)
+ return -1;
+
+ if (pwrite_full(fstream->fd, data, size, offset) < 0) {
+ stream->ostream.stream_errno = errno;
+ stream_closed(fstream);
+ return -1;
+ }
+ return 0;
+}
+
+static bool
+io_stream_sendfile(struct ostream_private *outstream,
+ struct istream *instream, int in_fd,
+ enum ostream_send_istream_result *res_r)
+{
+ struct file_ostream *foutstream =
+ container_of(outstream, struct file_ostream, ostream);
+ uoff_t in_size, offset, send_size, v_offset, abs_start_offset;
+ ssize_t ret;
+ bool sendfile_not_supported = FALSE;
+
+ if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) {
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
+ return TRUE;
+ }
+ if (ret == 0) {
+ /* size unknown. we can't use sendfile(). */
+ return FALSE;
+ }
+
+ o_stream_socket_cork(foutstream);
+
+ /* flush out any data in buffer */
+ if ((ret = buffer_flush(foutstream)) < 0) {
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+ return TRUE;
+ } else if (ret == 0) {
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
+ return TRUE;
+ }
+
+ if (o_stream_lseek(foutstream) < 0) {
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+ return TRUE;
+ }
+
+ v_offset = instream->v_offset;
+ abs_start_offset = i_stream_get_absolute_offset(instream) - v_offset;
+ while (v_offset < in_size) {
+ offset = abs_start_offset + v_offset;
+ send_size = in_size - v_offset;
+
+ ret = safe_sendfile(foutstream->fd, in_fd, &offset,
+ MAX_SSIZE_T(send_size));
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Unexpectedly early EOF at input */
+ i_stream_seek(instream, v_offset);
+ instream->eof = TRUE;
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+ return TRUE;
+ }
+ if (foutstream->file) {
+ if (errno == EINTR) {
+ /* automatically retry */
+ continue;
+ }
+ } else {
+ if (errno == EINTR || errno == EAGAIN) {
+ ret = 0;
+ break;
+ }
+ }
+ if (errno == EINVAL)
+ sendfile_not_supported = TRUE;
+ else {
+ io_stream_set_error(&outstream->iostream,
+ "sendfile() failed: %m");
+ outstream->ostream.stream_errno = errno;
+ /* close only if error wasn't because
+ sendfile() isn't supported */
+ stream_closed(foutstream);
+ }
+ break;
+ }
+
+ v_offset += ret;
+ foutstream->real_offset += ret;
+ foutstream->buffer_offset += ret;
+ outstream->ostream.offset += ret;
+ }
+
+ i_stream_seek(instream, v_offset);
+ if (v_offset == in_size) {
+ instream->eof = TRUE;
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+ return TRUE;
+ }
+ i_assert(ret <= 0);
+ if (sendfile_not_supported)
+ return FALSE;
+ if (ret < 0)
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+ else
+ *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
+ return TRUE;
+}
+
+static enum ostream_send_istream_result
+io_stream_copy_backwards(struct ostream_private *outstream,
+ struct istream *instream, uoff_t in_size)
+{
+ struct file_ostream *foutstream =
+ container_of(outstream, struct file_ostream, ostream);
+ uoff_t in_start_offset, in_offset, in_limit, out_offset;
+ const unsigned char *data;
+ size_t buffer_size, size, read_size;
+ ssize_t ret;
+
+ i_assert(IS_STREAM_EMPTY(foutstream));
+
+ /* figure out optimal buffer size */
+ buffer_size = instream->real_stream->buffer_size;
+ if (buffer_size == 0 || buffer_size > foutstream->buffer_size) {
+ if (foutstream->optimal_block_size > foutstream->buffer_size) {
+ o_stream_grow_buffer(foutstream,
+ foutstream->optimal_block_size -
+ foutstream->buffer_size);
+ }
+
+ buffer_size = foutstream->buffer_size;
+ }
+
+ in_start_offset = instream->v_offset;
+ in_offset = in_limit = in_size;
+ out_offset = outstream->ostream.offset + (in_offset - in_start_offset);
+
+ while (in_offset > in_start_offset) {
+ if (in_offset - in_start_offset <= buffer_size)
+ read_size = in_offset - in_start_offset;
+ else
+ read_size = buffer_size;
+ in_offset -= read_size;
+ out_offset -= read_size;
+
+ for (;;) {
+ i_assert(in_offset <= in_limit);
+
+ i_stream_seek(instream, in_offset);
+ read_size = in_limit - in_offset;
+
+ /* FIXME: something's wrong here */
+ if (i_stream_read_bytes(instream, &data, &size,
+ read_size) == 0)
+ i_unreached();
+ if (size >= read_size) {
+ size = read_size;
+ if (instream->mmaped) {
+ /* we'll have to write it through
+ buffer or the file gets corrupted */
+ i_assert(size <=
+ foutstream->buffer_size);
+ memcpy(foutstream->buffer, data, size);
+ data = foutstream->buffer;
+ }
+ break;
+ }
+
+ /* buffer too large probably, try with smaller */
+ read_size -= size;
+ in_offset += read_size;
+ out_offset += read_size;
+ buffer_size -= read_size;
+ }
+ in_limit -= size;
+
+ ret = pwrite_full(foutstream->fd, data, size, out_offset);
+ if (ret < 0) {
+ /* error */
+ outstream->ostream.stream_errno = errno;
+ return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
+ }
+ i_stream_skip(instream, size);
+ }
+ /* make it visible that we're at instream's EOF */
+ i_stream_seek(instream, in_size);
+ instream->eof = TRUE;
+
+ outstream->ostream.offset += in_size - in_start_offset;
+ return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+}
+
+static enum ostream_send_istream_result
+io_stream_copy_same_stream(struct ostream_private *outstream,
+ struct istream *instream)
+{
+ uoff_t in_size;
+ off_t in_abs_offset, ret = 0;
+
+ /* copying data within same fd. we'll have to be careful with
+ seeks and overlapping writes. */
+ if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
+ return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
+ if (ret == 0) {
+ /* if we couldn't find out the size, it means that instream
+ isn't a regular file_istream. we can be reasonably sure that
+ we can copy it safely the regular way. (there's really no
+ other possibility, other than failing completely.) */
+ return io_stream_copy(&outstream->ostream, instream);
+ }
+ i_assert(instream->v_offset <= in_size);
+
+ in_abs_offset = i_stream_get_absolute_offset(instream);
+ ret = (off_t)outstream->ostream.offset - in_abs_offset;
+ if (ret == 0) {
+ /* copying data over itself. we don't really
+ need to do that, just fake it. */
+ return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+ }
+ if (ret > 0 && in_size > (uoff_t)ret) {
+ /* overlapping */
+ i_assert(instream->seekable);
+ return io_stream_copy_backwards(outstream, instream, in_size);
+ } else {
+ /* non-overlapping */
+ return io_stream_copy(&outstream->ostream, instream);
+ }
+}
+
+static enum ostream_send_istream_result
+o_stream_file_send_istream(struct ostream_private *outstream,
+ struct istream *instream)
+{
+ struct file_ostream *foutstream =
+ container_of(outstream, struct file_ostream, ostream);
+ bool same_stream;
+ int in_fd;
+ enum ostream_send_istream_result res;
+
+ in_fd = !instream->readable_fd ? -1 : i_stream_get_fd(instream);
+ if (!foutstream->no_sendfile && in_fd != -1 &&
+ in_fd != foutstream->fd && instream->seekable) {
+ if (io_stream_sendfile(outstream, instream, in_fd, &res))
+ return res;
+
+ /* sendfile() not supported (with this fd), fallback to
+ regular sending. */
+ foutstream->no_sendfile = TRUE;
+ }
+
+ same_stream = i_stream_get_fd(instream) == foutstream->fd &&
+ foutstream->fd != -1;
+ if (!same_stream)
+ return io_stream_copy(&outstream->ostream, instream);
+ return io_stream_copy_same_stream(outstream, instream);
+}
+
+static void o_stream_file_switch_ioloop_to(struct ostream_private *stream,
+ struct ioloop *ioloop)
+{
+ struct file_ostream *fstream =
+ container_of(stream, struct file_ostream, ostream);
+
+ if (fstream->io != NULL)
+ fstream->io = io_loop_move_io_to(ioloop, &fstream->io);
+}
+
+struct ostream *
+o_stream_create_file_common(struct file_ostream *fstream,
+ int fd, size_t max_buffer_size, bool autoclose_fd)
+{
+ struct ostream *ostream;
+
+ fstream->fd = fd;
+ fstream->autoclose_fd = autoclose_fd;
+ fstream->optimal_block_size = DEFAULT_OPTIMAL_BLOCK_SIZE;
+
+ fstream->ostream.iostream.close = o_stream_file_close;
+ fstream->ostream.iostream.destroy = o_stream_file_destroy;
+
+ fstream->ostream.cork = o_stream_file_cork;
+ fstream->ostream.flush = o_stream_file_flush;
+ fstream->ostream.flush_pending = o_stream_file_flush_pending;
+ fstream->ostream.get_buffer_used_size =
+ o_stream_file_get_buffer_used_size;
+ fstream->ostream.seek = o_stream_file_seek;
+ fstream->ostream.sendv = o_stream_file_sendv;
+ fstream->ostream.write_at = o_stream_file_write_at;
+ fstream->ostream.send_istream = o_stream_file_send_istream;
+ fstream->ostream.switch_ioloop_to = o_stream_file_switch_ioloop_to;
+
+ fstream->writev = o_stream_file_writev;
+
+ fstream->ostream.max_buffer_size = max_buffer_size;
+ ostream = o_stream_create(&fstream->ostream, NULL, fd);
+
+ if (max_buffer_size == 0)
+ fstream->ostream.max_buffer_size = fstream->optimal_block_size;
+
+ return ostream;
+}
+
+static void fstream_init_file(struct file_ostream *fstream)
+{
+ struct stat st;
+
+ fstream->no_sendfile = TRUE;
+ if (fstat(fstream->fd, &st) < 0)
+ return;
+
+ if ((uoff_t)st.st_blksize > fstream->optimal_block_size) {
+ /* use the optimal block size, but with a reasonable limit */
+ fstream->optimal_block_size =
+ I_MIN(st.st_blksize, MAX_OPTIMAL_BLOCK_SIZE);
+ }
+
+ if (S_ISREG(st.st_mode)) {
+ fstream->no_socket_cork = TRUE;
+ fstream->no_socket_nodelay = TRUE;
+ fstream->no_socket_quickack = TRUE;
+ fstream->file = TRUE;
+ }
+}
+
+static
+struct ostream * o_stream_create_fd_common(int fd, size_t max_buffer_size,
+ bool autoclose_fd)
+{
+ struct file_ostream *fstream;
+ struct ostream *ostream;
+ off_t offset;
+
+ fstream = i_new(struct file_ostream, 1);
+ ostream = o_stream_create_file_common
+ (fstream, fd, max_buffer_size, autoclose_fd);
+
+ offset = lseek(fd, 0, SEEK_CUR);
+ if (offset >= 0) {
+ ostream->offset = offset;
+ fstream->real_offset = offset;
+ fstream->buffer_offset = offset;
+ fstream_init_file(fstream);
+ } else {
+ struct ip_addr local_ip;
+
+ if (net_getsockname(fd, &local_ip, NULL) < 0) {
+ /* not a socket */
+ fstream->no_sendfile = TRUE;
+ fstream->no_socket_cork = TRUE;
+ fstream->no_socket_nodelay = TRUE;
+ fstream->no_socket_quickack = TRUE;
+ } else if (local_ip.family == 0) {
+ /* UNIX domain socket */
+ fstream->no_socket_cork = TRUE;
+ fstream->no_socket_nodelay = TRUE;
+ fstream->no_socket_quickack = TRUE;
+ }
+ }
+
+ return ostream;
+}
+
+struct ostream *
+o_stream_create_fd(int fd, size_t max_buffer_size)
+{
+ return o_stream_create_fd_common(fd, max_buffer_size, FALSE);
+}
+
+struct ostream *
+o_stream_create_fd_autoclose(int *fd, size_t max_buffer_size)
+{
+ struct ostream *ostream = o_stream_create_fd_common(*fd,
+ max_buffer_size, TRUE);
+ *fd = -1;
+ return ostream;
+}
+
+struct ostream *
+o_stream_create_fd_file(int fd, uoff_t offset, bool autoclose_fd)
+{
+ struct file_ostream *fstream;
+ struct ostream *ostream;
+
+ if (offset == UOFF_T_MAX)
+ offset = lseek(fd, 0, SEEK_CUR);
+
+ fstream = i_new(struct file_ostream, 1);
+ ostream = o_stream_create_file_common(fstream, fd, 0, autoclose_fd);
+ fstream_init_file(fstream);
+ fstream->real_offset = offset;
+ fstream->buffer_offset = offset;
+ ostream->blocking = fstream->file;
+ ostream->offset = offset;
+ return ostream;
+}
+
+struct ostream *o_stream_create_fd_file_autoclose(int *fd, uoff_t offset)
+{
+ struct ostream *output;
+
+ output = o_stream_create_fd_file(*fd, offset, TRUE);
+ *fd = -1;
+ return output;
+}
+
+struct ostream *o_stream_create_file(const char *path, uoff_t offset, mode_t mode,
+ enum ostream_create_file_flags flags)
+{
+ int fd;
+ int open_flags = O_WRONLY|O_CREAT;
+ if (HAS_ANY_BITS(flags, OSTREAM_CREATE_FILE_FLAG_APPEND))
+ open_flags |= O_APPEND;
+ else
+ open_flags |= O_TRUNC;
+ if ((fd = open(path, open_flags, mode)) < 0)
+ return o_stream_create_error(errno);
+ return o_stream_create_fd_file_autoclose(&fd, offset);
+}