summaryrefslogtreecommitdiffstats
path: root/src/lib/istream-concat.c
blob: 4c58b8653f9fb3434d379b490d3e358f3345783b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
/* Copyright (c) 2007-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "buffer.h"
#include "memarea.h"
#include "istream-private.h"
#include "istream-concat.h"

struct concat_istream {
	struct istream_private istream;

	struct istream **input, *cur_input;
	uoff_t *input_size;
	unsigned int input_count;

	unsigned int cur_idx, unknown_size_idx;
	size_t prev_stream_left, prev_stream_skip, prev_skip;
};

static void i_stream_concat_skip(struct concat_istream *cstream);

static void i_stream_concat_close(struct iostream_private *stream,
				  bool close_parent)
{
	struct concat_istream *cstream =
		container_of(stream, struct concat_istream, istream.iostream);
	i_assert(cstream->cur_input == cstream->input[cstream->cur_idx]);
	unsigned int i;

	if (cstream->istream.istream.stream_errno == 0) {
		/* get the parent streams to the wanted offset */
		(void)i_stream_concat_skip(cstream);
	}

	if (close_parent) {
		for (i = 0; i < cstream->input_count; i++)
			i_stream_close(cstream->input[i]);
	}
}

static void i_stream_concat_destroy(struct iostream_private *stream)
{
	struct concat_istream *cstream =
		container_of(stream, struct concat_istream, istream.iostream);
	i_assert(cstream->cur_input == cstream->input[cstream->cur_idx]);
	unsigned int i;

	for (i = 0; i < cstream->input_count; i++)
		i_stream_unref(&cstream->input[i]);
	i_free(cstream->input);
	i_free(cstream->input_size);
	i_stream_free_buffer(&cstream->istream);
}

static void
i_stream_concat_set_max_buffer_size(struct iostream_private *stream,
				    size_t max_size)
{
	struct concat_istream *cstream =
		container_of(stream, struct concat_istream, istream.iostream);
	i_assert(cstream->cur_input == cstream->input[cstream->cur_idx]);
	unsigned int i;

	cstream->istream.max_buffer_size = max_size;
	for (i = 0; i < cstream->input_count; i++)
		i_stream_set_max_buffer_size(cstream->input[i], max_size);
}

static void i_stream_concat_read_next(struct concat_istream *cstream)
{
	struct istream *prev_input = cstream->cur_input;
	const unsigned char *data;
	size_t data_size, size;

	i_assert(cstream->cur_input->eof);

	if (cstream->prev_stream_skip != 0) {
		i_stream_skip(cstream->input[cstream->cur_idx-1], cstream->prev_stream_skip);
		cstream->prev_stream_skip = 0;
	}

	data = i_stream_get_data(cstream->cur_input, &data_size);
	cstream->cur_idx++;
	cstream->cur_input = cstream->input[cstream->cur_idx];
	i_stream_seek(cstream->cur_input, 0);

	if (cstream->prev_stream_left > 0 || cstream->istream.pos == 0) {
		/* all the pending data is already in w_buffer */
		cstream->prev_stream_skip = data_size;
		cstream->prev_stream_left += data_size;
		i_assert(cstream->prev_stream_left ==
			 cstream->istream.pos - cstream->istream.skip);
		return;
	}
	i_assert(cstream->prev_stream_skip == 0);

	/* we already verified that the data size is less than the
	   maximum buffer size */
	cstream->istream.skip = 0;
	cstream->istream.pos = 0;
	if (data_size > 0) {
		if (cstream->istream.memarea != NULL &&
		    memarea_get_refcount(cstream->istream.memarea) > 1)
			i_stream_memarea_detach(&cstream->istream);
		if (!i_stream_try_alloc(&cstream->istream, data_size, &size))
			i_unreached();
		i_assert(size >= data_size);
	}

	cstream->prev_stream_left = data_size;
	memcpy(cstream->istream.w_buffer, data, data_size);
	i_stream_skip(prev_input, data_size);
	cstream->istream.skip = 0;
	cstream->istream.pos = data_size;
}

static void i_stream_concat_skip(struct concat_istream *cstream)
{
	struct istream_private *stream = &cstream->istream;
	size_t bytes_skipped;

	i_assert(stream->skip >= cstream->prev_skip);
	bytes_skipped = stream->skip - cstream->prev_skip;

	if (cstream->prev_stream_left == 0) {
		/* no need to worry about buffers, skip everything */
	} else if (bytes_skipped < cstream->prev_stream_left) {
		/* we're still skipping inside buffer */
		cstream->prev_stream_left -= bytes_skipped;
		bytes_skipped = 0;
	} else {
		/* done with the buffer */
		i_stream_skip(cstream->input[cstream->cur_idx-1], cstream->prev_stream_skip);
		cstream->prev_stream_skip = 0;

		bytes_skipped -= cstream->prev_stream_left;
		cstream->prev_stream_left = 0;
	}
	if (bytes_skipped > 0) {
		i_assert(stream->buffer != NULL);
		stream->pos -= bytes_skipped;
		stream->skip -= bytes_skipped;
		stream->buffer += bytes_skipped;
	}
	cstream->prev_skip = stream->skip;
	i_stream_skip(cstream->cur_input, bytes_skipped);
}

static ssize_t i_stream_concat_read(struct istream_private *stream)
{
	struct concat_istream *cstream =
		container_of(stream, struct concat_istream, istream);
	i_assert(cstream->cur_input == cstream->input[cstream->cur_idx]);
	const unsigned char *data;
	size_t size, data_size, cur_data_pos, new_pos;
	size_t new_bytes_count;
	ssize_t ret;
	bool last_stream;

	i_assert(cstream->cur_input != NULL);
	i_stream_concat_skip(cstream);

	i_assert(stream->pos >= stream->skip + cstream->prev_stream_left);
	cur_data_pos = stream->pos - (stream->skip + cstream->prev_stream_left);

	data = i_stream_get_data(cstream->cur_input, &data_size);
	if (data_size > cur_data_pos)
		ret = 0;
	else {
		/* need to read more - NOTE: Can't use i_stream_read_memarea()
		   here, because our stream->buffer may point to the parent
		   istream. Implementing explicit snapshot function to avoid
		   this isn't easy for seekable concat-istreams, because due to
		   seeking it's not necessarily the cur_input that needs to be
		   snapshotted. */
		i_assert(cur_data_pos == data_size);
		ret = i_stream_read(cstream->cur_input);
		if (ret == -2 || ret == 0)
			return ret;

		if (ret == -1 && cstream->cur_input->stream_errno != 0) {
			io_stream_set_error(&cstream->istream.iostream,
				"read(%s) failed: %s",
				i_stream_get_name(cstream->cur_input),
				i_stream_get_error(cstream->cur_input));
			stream->istream.stream_errno =
				cstream->cur_input->stream_errno;
			return -1;
		}

		/* we either read something or we're at EOF */
		last_stream = cstream->cur_idx+1 >= cstream->input_count;
		if (ret == -1 && !last_stream) {
			if (stream->pos - stream->skip >= i_stream_get_max_buffer_size(&stream->istream))
				return -2;

			i_stream_concat_read_next(cstream);
			cstream->prev_skip = stream->skip;
			return i_stream_concat_read(stream);
		}

		stream->istream.eof = cstream->cur_input->eof && last_stream;
		i_assert(ret != -1 || stream->istream.eof);
		data = i_stream_get_data(cstream->cur_input, &data_size);
	}

	if (data_size == cur_data_pos) {
		/* nothing new read - preserve the buffer as it was */
		i_assert(ret == 0 || ret == -1);
		return ret;
	}
	if (cstream->prev_stream_left == 0) {
		/* we can point directly to the current stream's buffers */
		stream->buffer = data;
		stream->pos -= stream->skip;
		stream->skip = 0;
		new_pos = data_size;
	} else {
		/* we still have some of the previous stream left. merge the
		   new data with it. */
		i_assert(data_size > cur_data_pos);
		new_bytes_count = data_size - cur_data_pos;
		if (!i_stream_try_alloc(stream, new_bytes_count, &size)) {
			stream->buffer = stream->w_buffer;
			return -2;
		}
		stream->buffer = stream->w_buffer;

		/* we'll copy all the new input to w_buffer. if we skip over
		   prev_stream_left bytes, the next read will switch to
		   pointing to cur_input's data directly. */
		if (new_bytes_count > size)
			new_bytes_count = size;
		memcpy(stream->w_buffer + stream->pos,
		       data + cur_data_pos, new_bytes_count);
		new_pos = stream->pos + new_bytes_count;
	}

	i_assert(new_pos > stream->pos);
	ret = (ssize_t)(new_pos - stream->pos);
	stream->pos = new_pos;
	cstream->prev_skip = stream->skip;
	return ret;
}

static int
find_v_offset(struct concat_istream *cstream, uoff_t *v_offset,
	      unsigned int *idx_r)
{
	const struct stat *st;
	unsigned int i;

	for (i = 0; i < cstream->input_count; i++) {
		if (*v_offset == 0) {
			/* seek to beginning of this stream */
			break;
		}
		if (i == cstream->unknown_size_idx) {
			/* we'll need to figure out this stream's size */
			if (i_stream_stat(cstream->input[i], TRUE, &st) < 0) {
				io_stream_set_error(&cstream->istream.iostream,
					"stat(%s) failed: %s",
					i_stream_get_name(cstream->input[i]),
					i_stream_get_error(cstream->input[i]));
				i_error("istream-concat: stat(%s) failed: %s",
					i_stream_get_name(cstream->input[i]),
					i_stream_get_error(cstream->input[i]));
				cstream->istream.istream.stream_errno =
					cstream->input[i]->stream_errno;
				return -1;
			}

			/* @UNSAFE */
			cstream->input_size[i] = st->st_size;
			cstream->unknown_size_idx = i + 1;
		}
		if (*v_offset < cstream->input_size[i])
			break;
		*v_offset -= cstream->input_size[i];
	}

	*idx_r = i;
	return 0;
}

static void i_stream_concat_seek(struct istream_private *stream,
				 uoff_t v_offset, bool mark ATTR_UNUSED)
{
	struct concat_istream *cstream =
		container_of(stream, struct concat_istream, istream);
	i_assert(cstream->cur_input == cstream->input[cstream->cur_idx]);

	stream->istream.v_offset = v_offset;
	stream->skip = stream->pos = 0;
	cstream->prev_stream_left = 0;
	cstream->prev_stream_skip = 0;
	cstream->prev_skip = 0;

	if (find_v_offset(cstream, &v_offset, &cstream->cur_idx) < 0) {
		/* failed */
		stream->istream.stream_errno = EINVAL;
		return;
	}
	if (cstream->cur_idx < cstream->input_count)
		cstream->cur_input = cstream->input[cstream->cur_idx];
	else {
		/* we allow seeking to EOF, but not past it. */
		if (v_offset != 0) {
			io_stream_set_error(&cstream->istream.iostream,
				"Seeking past EOF by %"PRIuUOFF_T" bytes", v_offset);
			cstream->istream.istream.stream_errno = EINVAL;
			return;
		}
		i_assert(cstream->cur_idx > 0);
		/* Position ourselves at the EOF of the last actual stream. */
		cstream->cur_idx--;
		cstream->cur_input = cstream->input[cstream->cur_idx];
		v_offset = cstream->input_size[cstream->cur_idx];
	}
	i_stream_seek(cstream->cur_input, v_offset);
}

static int
i_stream_concat_stat(struct istream_private *stream, bool exact ATTR_UNUSED)
{
	struct concat_istream *cstream =
		container_of(stream, struct concat_istream, istream);
	i_assert(cstream->cur_input == cstream->input[cstream->cur_idx]);
	uoff_t v_offset = UOFF_T_MAX;
	unsigned int i, cur_idx;

	/* make sure we have all sizes */
	if (find_v_offset(cstream, &v_offset, &cur_idx) < 0)
		return -1;

	stream->statbuf.st_size = 0;
	for (i = 0; i < cstream->unknown_size_idx; i++)
		stream->statbuf.st_size += cstream->input_size[i];
	return 0;
}

struct istream *i_stream_create_concat(struct istream *input[])
{
	struct concat_istream *cstream;
	unsigned int count;
	size_t max_buffer_size = 0;
	bool blocking = TRUE, seekable = TRUE;

	/* if any of the streams isn't blocking or seekable, set ourself also
	   nonblocking/nonseekable */
	for (count = 0; input[count] != NULL; count++) {
		size_t cur_max = i_stream_get_max_buffer_size(input[count]);

		i_assert(cur_max != 0);
		if (cur_max != SIZE_MAX && cur_max > max_buffer_size)
			max_buffer_size = cur_max;
		if (!input[count]->blocking)
			blocking = FALSE;
		if (!input[count]->seekable)
			seekable = FALSE;
		i_stream_ref(input[count]);
	}
	i_assert(count != 0);
	if (max_buffer_size == 0)
		max_buffer_size = SIZE_MAX;
	if (max_buffer_size < I_STREAM_MIN_SIZE)
		max_buffer_size = I_STREAM_MIN_SIZE;

	cstream = i_new(struct concat_istream, 1);
	cstream->input_count = count;
	cstream->input = p_memdup(default_pool, input, sizeof(*input) * count);
	cstream->input_size = i_new(uoff_t, count);

	cstream->cur_input = cstream->input[0];
	i_stream_seek(cstream->cur_input, 0);

	cstream->istream.iostream.close = i_stream_concat_close;
	cstream->istream.iostream.destroy = i_stream_concat_destroy;
	cstream->istream.iostream.set_max_buffer_size =
		i_stream_concat_set_max_buffer_size;

	cstream->istream.max_buffer_size = max_buffer_size;
	cstream->istream.read = i_stream_concat_read;
	cstream->istream.seek = i_stream_concat_seek;
	cstream->istream.stat = i_stream_concat_stat;

	cstream->istream.istream.readable_fd = FALSE;
	cstream->istream.istream.blocking = blocking;
	cstream->istream.istream.seekable = seekable;
	return i_stream_create(&cstream->istream, NULL, -1, 0);
}