summaryrefslogtreecommitdiffstats
path: root/src/lib/istream-callback.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/istream-callback.c')
-rw-r--r--src/lib/istream-callback.c118
1 files changed, 118 insertions, 0 deletions
diff --git a/src/lib/istream-callback.c b/src/lib/istream-callback.c
new file mode 100644
index 0000000..6f07d50
--- /dev/null
+++ b/src/lib/istream-callback.c
@@ -0,0 +1,118 @@
+/* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "istream-private.h"
+#include "istream-callback.h"
+
+struct callback_istream {
+ struct istream_private istream;
+ istream_callback_read_t *callback;
+ void *context;
+
+ buffer_t *buf;
+ size_t prev_pos;
+};
+
+static void i_stream_callback_destroy(struct iostream_private *stream)
+{
+ struct callback_istream *cstream =
+ container_of(stream, struct callback_istream, istream.iostream);
+
+ buffer_free(&cstream->buf);
+}
+
+static ssize_t i_stream_callback_read(struct istream_private *stream)
+{
+ struct callback_istream *cstream =
+ container_of(stream, struct callback_istream, istream);
+ size_t pos;
+
+ if (cstream->callback == NULL) {
+ /* already returned EOF / error */
+ stream->istream.eof = TRUE;
+ return -1;
+ }
+
+ if (stream->skip > 0) {
+ buffer_delete(cstream->buf, 0, stream->skip);
+ stream->pos -= stream->skip;
+ cstream->prev_pos -= stream->skip;
+ stream->skip = 0;
+ }
+ i_assert(cstream->buf->used >= cstream->prev_pos);
+ pos = cstream->prev_pos;
+ if (cstream->buf->used > pos) {
+ /* data was added outside the callback */
+ } else if (!cstream->callback(cstream->buf, cstream->context)) {
+ /* EOF / error */
+ stream->istream.eof = TRUE;
+ cstream->callback = NULL;
+ if (cstream->buf->used == pos ||
+ stream->istream.stream_errno != 0)
+ return -1;
+ /* EOF was returned with some data still added to the buffer.
+ return the buffer first and EOF only on the next call. */
+ } else if (cstream->buf->used == pos) {
+ /* buffer full */
+ i_assert(cstream->buf->used > 0);
+ return -2;
+ }
+ i_assert(cstream->buf->used > pos);
+ stream->buffer = cstream->buf->data;
+ cstream->prev_pos = stream->pos = cstream->buf->used;
+ return cstream->buf->used - pos;
+}
+
+#undef i_stream_create_callback
+struct istream *
+i_stream_create_callback(istream_callback_read_t *callback, void *context)
+{
+ struct callback_istream *cstream;
+ struct istream *istream;
+
+ i_assert(callback != NULL);
+
+ cstream = i_new(struct callback_istream, 1);
+ cstream->callback = callback;
+ cstream->context = context;
+ cstream->buf = buffer_create_dynamic(default_pool, 1024);
+
+ cstream->istream.iostream.destroy = i_stream_callback_destroy;
+ cstream->istream.read = i_stream_callback_read;
+
+ istream = i_stream_create(&cstream->istream, NULL, -1, 0);
+ istream->blocking = TRUE;
+ return istream;
+}
+
+void i_stream_callback_append(struct istream *input,
+ const void *data, size_t size)
+{
+ struct callback_istream *cstream =
+ container_of(input->real_stream,
+ struct callback_istream, istream);
+
+ buffer_append(cstream->buf, data, size);
+}
+
+void i_stream_callback_append_str(struct istream *input, const char *str)
+{
+ i_stream_callback_append(input, str, strlen(str));
+}
+
+buffer_t *i_stream_callback_get_buffer(struct istream *input)
+{
+ struct callback_istream *cstream =
+ container_of(input->real_stream,
+ struct callback_istream, istream);
+
+ return cstream->buf;
+}
+
+void i_stream_callback_set_error(struct istream *input, int stream_errno,
+ const char *error)
+{
+ input->stream_errno = stream_errno;
+ io_stream_set_error(&input->real_stream->iostream, "%s", error);
+}