summaryrefslogtreecommitdiffstats
path: root/src/lib/iostream-pump.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/iostream-pump.c')
-rw-r--r--src/lib/iostream-pump.c251
1 files changed, 251 insertions, 0 deletions
diff --git a/src/lib/iostream-pump.c b/src/lib/iostream-pump.c
new file mode 100644
index 0000000..cebae3d
--- /dev/null
+++ b/src/lib/iostream-pump.c
@@ -0,0 +1,251 @@
+/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "str.h"
+#include "iostream-pump.h"
+#include "istream.h"
+#include "ostream.h"
+#include <unistd.h>
+
+#undef iostream_pump_set_completion_callback
+
+struct iostream_pump {
+ int refcount;
+
+ struct istream *input;
+ struct ostream *output;
+
+ struct io *io;
+
+ iostream_pump_callback_t *callback;
+ void *context;
+
+ bool waiting_output;
+ bool completed;
+};
+
+static void iostream_pump_copy(struct iostream_pump *pump)
+{
+ enum ostream_send_istream_result res;
+ size_t old_size;
+
+ o_stream_cork(pump->output);
+ old_size = o_stream_get_max_buffer_size(pump->output);
+ o_stream_set_max_buffer_size(pump->output,
+ I_MIN(IO_BLOCK_SIZE,
+ o_stream_get_max_buffer_size(pump->output)));
+ res = o_stream_send_istream(pump->output, pump->input);
+ o_stream_set_max_buffer_size(pump->output, old_size);
+ o_stream_uncork(pump->output);
+
+ switch(res) {
+ case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+ io_remove(&pump->io);
+ pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR,
+ pump->context);
+ return;
+ case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+ io_remove(&pump->io);
+ pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
+ pump->context);
+ return;
+ case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+ i_assert(!pump->output->blocking);
+ pump->waiting_output = TRUE;
+ io_remove(&pump->io);
+ return;
+ case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+ pump->waiting_output = FALSE;
+ io_remove(&pump->io);
+ /* flush it */
+ switch (o_stream_flush(pump->output)) {
+ case -1:
+ pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
+ pump->context);
+ break;
+ case 0:
+ pump->waiting_output = TRUE;
+ pump->completed = TRUE;
+ break;
+ default:
+ pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF,
+ pump->context);
+ break;
+ }
+ return;
+ case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+ i_assert(!pump->input->blocking);
+ pump->waiting_output = FALSE;
+ return;
+ }
+ i_unreached();
+}
+
+static int iostream_pump_flush(struct iostream_pump *pump)
+{
+ int ret;
+
+ if ((ret = o_stream_flush(pump->output)) <= 0) {
+ if (ret < 0) {
+ pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
+ pump->context);
+ }
+ return ret;
+ }
+ pump->waiting_output = FALSE;
+ if (pump->completed) {
+ pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
+ return 1;
+ }
+
+ if (pump->input->blocking)
+ iostream_pump_copy(pump);
+ else if (pump->io == NULL) {
+ pump->io = io_add_istream(pump->input,
+ iostream_pump_copy, pump);
+ io_set_pending(pump->io);
+ }
+ return ret;
+}
+
+struct iostream_pump *
+iostream_pump_create(struct istream *input, struct ostream *output)
+{
+ struct iostream_pump *pump;
+
+ i_assert(input != NULL &&
+ output != NULL);
+ i_assert(!input->blocking || !output->blocking);
+
+ /* ref streams */
+ i_stream_ref(input);
+ o_stream_ref(output);
+
+ /* create pump */
+ pump = i_new(struct iostream_pump, 1);
+ pump->refcount = 1;
+ pump->input = input;
+ pump->output = output;
+
+ return pump;
+}
+
+void iostream_pump_start(struct iostream_pump *pump)
+{
+ i_assert(pump != NULL);
+ i_assert(pump->callback != NULL);
+
+ /* add flush handler */
+ if (!pump->output->blocking) {
+ o_stream_set_flush_callback(pump->output,
+ iostream_pump_flush, pump);
+ }
+
+ /* make IO objects */
+ if (pump->input->blocking) {
+ i_assert(!pump->output->blocking);
+ o_stream_set_flush_pending(pump->output, TRUE);
+ } else {
+ pump->io = io_add_istream(pump->input,
+ iostream_pump_copy, pump);
+ io_set_pending(pump->io);
+ }
+}
+
+struct istream *iostream_pump_get_input(struct iostream_pump *pump)
+{
+ i_assert(pump != NULL);
+ return pump->input;
+}
+
+struct ostream *iostream_pump_get_output(struct iostream_pump *pump)
+{
+ i_assert(pump != NULL);
+ return pump->output;
+}
+
+void iostream_pump_set_completion_callback(struct iostream_pump *pump,
+ iostream_pump_callback_t *callback,
+ void *context)
+{
+ i_assert(pump != NULL);
+ pump->callback = callback;
+ pump->context = context;
+}
+
+void iostream_pump_ref(struct iostream_pump *pump)
+{
+ i_assert(pump != NULL);
+ i_assert(pump->refcount > 0);
+ pump->refcount++;
+}
+
+void iostream_pump_unref(struct iostream_pump **_pump)
+{
+ i_assert(_pump != NULL);
+ struct iostream_pump *pump = *_pump;
+
+ if (pump == NULL)
+ return;
+
+ i_assert(pump->refcount > 0);
+
+ *_pump = NULL;
+
+ if (--pump->refcount > 0)
+ return;
+
+ iostream_pump_stop(pump);
+
+ o_stream_unref(&pump->output);
+ i_stream_unref(&pump->input);
+ i_free(pump);
+}
+
+void iostream_pump_destroy(struct iostream_pump **_pump)
+{
+ i_assert(_pump != NULL);
+ struct iostream_pump *pump = *_pump;
+
+ if (pump == NULL)
+ return;
+
+ *_pump = NULL;
+
+ iostream_pump_stop(pump);
+ o_stream_unref(&pump->output);
+ i_stream_unref(&pump->input);
+
+ iostream_pump_unref(&pump);
+}
+
+void iostream_pump_stop(struct iostream_pump *pump)
+{
+ i_assert(pump != NULL);
+
+ if (pump->output != NULL)
+ o_stream_unset_flush_callback(pump->output);
+
+ io_remove(&pump->io);
+}
+
+bool iostream_pump_is_waiting_output(struct iostream_pump *pump)
+{
+ return pump->waiting_output;
+}
+
+void iostream_pump_switch_ioloop_to(struct iostream_pump *pump,
+ struct ioloop *ioloop)
+{
+ i_assert(pump != NULL);
+ if (pump->io != NULL)
+ pump->io = io_loop_move_io_to(ioloop, &pump->io);
+ o_stream_switch_ioloop_to(pump->output, ioloop);
+ i_stream_switch_ioloop_to(pump->input, ioloop);
+}
+
+void iostream_pump_switch_ioloop(struct iostream_pump *pump)
+{
+ iostream_pump_switch_ioloop_to(pump, current_ioloop);
+}