summaryrefslogtreecommitdiffstats
path: root/src/lib/istream-callback.c
blob: 6f07d5059517b6dd720733afd773189825cf12fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "buffer.h"
#include "istream-private.h"
#include "istream-callback.h"

struct callback_istream {
	struct istream_private istream;
	istream_callback_read_t *callback;
	void *context;

	buffer_t *buf;
	size_t prev_pos;
};

static void i_stream_callback_destroy(struct iostream_private *stream)
{
	struct callback_istream *cstream =
		container_of(stream, struct callback_istream, istream.iostream);

	buffer_free(&cstream->buf);
}

static ssize_t i_stream_callback_read(struct istream_private *stream)
{
	struct callback_istream *cstream =
		container_of(stream, struct callback_istream, istream);
	size_t pos;

	if (cstream->callback == NULL) {
		/* already returned EOF / error */
		stream->istream.eof = TRUE;
		return -1;
	}

	if (stream->skip > 0) {
		buffer_delete(cstream->buf, 0, stream->skip);
		stream->pos -= stream->skip;
		cstream->prev_pos -= stream->skip;
		stream->skip = 0;
	}
	i_assert(cstream->buf->used >= cstream->prev_pos);
	pos = cstream->prev_pos;
	if (cstream->buf->used > pos) {
		/* data was added outside the callback */
	} else if (!cstream->callback(cstream->buf, cstream->context)) {
		/* EOF / error */
		stream->istream.eof = TRUE;
		cstream->callback = NULL;
		if (cstream->buf->used == pos ||
		    stream->istream.stream_errno != 0)
			return -1;
		/* EOF was returned with some data still added to the buffer.
		   return the buffer first and EOF only on the next call. */
	} else if (cstream->buf->used == pos) {
		/* buffer full */
		i_assert(cstream->buf->used > 0);
		return -2;
	}
	i_assert(cstream->buf->used > pos);
	stream->buffer = cstream->buf->data;
	cstream->prev_pos = stream->pos = cstream->buf->used;
	return cstream->buf->used - pos;
}

#undef i_stream_create_callback
struct istream *
i_stream_create_callback(istream_callback_read_t *callback, void *context)
{
	struct callback_istream *cstream;
	struct istream *istream;

	i_assert(callback != NULL);

	cstream = i_new(struct callback_istream, 1);
	cstream->callback = callback;
	cstream->context = context;
	cstream->buf = buffer_create_dynamic(default_pool, 1024);

	cstream->istream.iostream.destroy = i_stream_callback_destroy;
	cstream->istream.read = i_stream_callback_read;

	istream = i_stream_create(&cstream->istream, NULL, -1, 0);
	istream->blocking = TRUE;
	return istream;
}

void i_stream_callback_append(struct istream *input,
			      const void *data, size_t size)
{
	struct callback_istream *cstream =
		container_of(input->real_stream,
			     struct callback_istream, istream);

	buffer_append(cstream->buf, data, size);
}

void i_stream_callback_append_str(struct istream *input, const char *str)
{
	i_stream_callback_append(input, str, strlen(str));
}

buffer_t *i_stream_callback_get_buffer(struct istream *input)
{
	struct callback_istream *cstream =
		container_of(input->real_stream,
			     struct callback_istream, istream);

	return cstream->buf;
}

void i_stream_callback_set_error(struct istream *input, int stream_errno,
				 const char *error)
{
	input->stream_errno = stream_errno;
	io_stream_set_error(&input->real_stream->iostream, "%s", error);
}