1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
/* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */
#include "lib.h"
#include "buffer.h"
#include "istream-private.h"
#include "istream-callback.h"
struct callback_istream {
struct istream_private istream;
istream_callback_read_t *callback;
void *context;
buffer_t *buf;
size_t prev_pos;
};
static void i_stream_callback_destroy(struct iostream_private *stream)
{
struct callback_istream *cstream =
container_of(stream, struct callback_istream, istream.iostream);
buffer_free(&cstream->buf);
}
static ssize_t i_stream_callback_read(struct istream_private *stream)
{
struct callback_istream *cstream =
container_of(stream, struct callback_istream, istream);
size_t pos;
if (cstream->callback == NULL) {
/* already returned EOF / error */
stream->istream.eof = TRUE;
return -1;
}
if (stream->skip > 0) {
buffer_delete(cstream->buf, 0, stream->skip);
stream->pos -= stream->skip;
cstream->prev_pos -= stream->skip;
stream->skip = 0;
}
i_assert(cstream->buf->used >= cstream->prev_pos);
pos = cstream->prev_pos;
if (cstream->buf->used > pos) {
/* data was added outside the callback */
} else if (!cstream->callback(cstream->buf, cstream->context)) {
/* EOF / error */
stream->istream.eof = TRUE;
cstream->callback = NULL;
if (cstream->buf->used == pos ||
stream->istream.stream_errno != 0)
return -1;
/* EOF was returned with some data still added to the buffer.
return the buffer first and EOF only on the next call. */
} else if (cstream->buf->used == pos) {
/* buffer full */
i_assert(cstream->buf->used > 0);
return -2;
}
i_assert(cstream->buf->used > pos);
stream->buffer = cstream->buf->data;
cstream->prev_pos = stream->pos = cstream->buf->used;
return cstream->buf->used - pos;
}
#undef i_stream_create_callback
struct istream *
i_stream_create_callback(istream_callback_read_t *callback, void *context)
{
struct callback_istream *cstream;
struct istream *istream;
i_assert(callback != NULL);
cstream = i_new(struct callback_istream, 1);
cstream->callback = callback;
cstream->context = context;
cstream->buf = buffer_create_dynamic(default_pool, 1024);
cstream->istream.iostream.destroy = i_stream_callback_destroy;
cstream->istream.read = i_stream_callback_read;
istream = i_stream_create(&cstream->istream, NULL, -1, 0);
istream->blocking = TRUE;
return istream;
}
void i_stream_callback_append(struct istream *input,
const void *data, size_t size)
{
struct callback_istream *cstream =
container_of(input->real_stream,
struct callback_istream, istream);
buffer_append(cstream->buf, data, size);
}
void i_stream_callback_append_str(struct istream *input, const char *str)
{
i_stream_callback_append(input, str, strlen(str));
}
buffer_t *i_stream_callback_get_buffer(struct istream *input)
{
struct callback_istream *cstream =
container_of(input->real_stream,
struct callback_istream, istream);
return cstream->buf;
}
void i_stream_callback_set_error(struct istream *input, int stream_errno,
const char *error)
{
input->stream_errno = stream_errno;
io_stream_set_error(&input->real_stream->iostream, "%s", error);
}
|