/* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "buffer.h" #include "istream-private.h" #include "istream-callback.h" struct callback_istream { struct istream_private istream; istream_callback_read_t *callback; void *context; buffer_t *buf; size_t prev_pos; }; static void i_stream_callback_destroy(struct iostream_private *stream) { struct callback_istream *cstream = container_of(stream, struct callback_istream, istream.iostream); buffer_free(&cstream->buf); } static ssize_t i_stream_callback_read(struct istream_private *stream) { struct callback_istream *cstream = container_of(stream, struct callback_istream, istream); size_t pos; if (cstream->callback == NULL) { /* already returned EOF / error */ stream->istream.eof = TRUE; return -1; } if (stream->skip > 0) { buffer_delete(cstream->buf, 0, stream->skip); stream->pos -= stream->skip; cstream->prev_pos -= stream->skip; stream->skip = 0; } i_assert(cstream->buf->used >= cstream->prev_pos); pos = cstream->prev_pos; if (cstream->buf->used > pos) { /* data was added outside the callback */ } else if (!cstream->callback(cstream->buf, cstream->context)) { /* EOF / error */ stream->istream.eof = TRUE; cstream->callback = NULL; if (cstream->buf->used == pos || stream->istream.stream_errno != 0) return -1; /* EOF was returned with some data still added to the buffer. return the buffer first and EOF only on the next call. */ } else if (cstream->buf->used == pos) { /* buffer full */ i_assert(cstream->buf->used > 0); return -2; } i_assert(cstream->buf->used > pos); stream->buffer = cstream->buf->data; cstream->prev_pos = stream->pos = cstream->buf->used; return cstream->buf->used - pos; } #undef i_stream_create_callback struct istream * i_stream_create_callback(istream_callback_read_t *callback, void *context) { struct callback_istream *cstream; struct istream *istream; i_assert(callback != NULL); cstream = i_new(struct callback_istream, 1); cstream->callback = callback; cstream->context = context; cstream->buf = buffer_create_dynamic(default_pool, 1024); cstream->istream.iostream.destroy = i_stream_callback_destroy; cstream->istream.read = i_stream_callback_read; istream = i_stream_create(&cstream->istream, NULL, -1, 0); istream->blocking = TRUE; return istream; } void i_stream_callback_append(struct istream *input, const void *data, size_t size) { struct callback_istream *cstream = container_of(input->real_stream, struct callback_istream, istream); buffer_append(cstream->buf, data, size); } void i_stream_callback_append_str(struct istream *input, const char *str) { i_stream_callback_append(input, str, strlen(str)); } buffer_t *i_stream_callback_get_buffer(struct istream *input) { struct callback_istream *cstream = container_of(input->real_stream, struct callback_istream, istream); return cstream->buf; } void i_stream_callback_set_error(struct istream *input, int stream_errno, const char *error) { input->stream_errno = stream_errno; io_stream_set_error(&input->real_stream->iostream, "%s", error); }