summaryrefslogtreecommitdiffstats
path: root/src/liblzma/common/outqueue.c
blob: 71e8648a294d64386ecd6d49d48b5c7e3936bd79 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
///////////////////////////////////////////////////////////////////////////////
//
/// \file       outqueue.c
/// \brief      Output queue handling in multithreaded coding
//
//  Author:     Lasse Collin
//
//  This file has been put into the public domain.
//  You can do whatever you want with this file.
//
///////////////////////////////////////////////////////////////////////////////

#include "outqueue.h"


/// Get the maximum number of buffers that may be allocated based
/// on the number of threads. For now this is twice the number of threads.
/// It's a compromise between RAM usage and keeping the worker threads busy
/// when buffers finish out of order.
#define GET_BUFS_LIMIT(threads) (2 * (threads))


extern uint64_t
lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
{
	// This is to ease integer overflow checking: We may allocate up to
	// GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
	// memory for other data structures too (that's the /2).
	//
	// lzma_outq_prealloc_buf() will still accept bigger buffers than this.
	const uint64_t limit
			= UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;

	if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
		return UINT64_MAX;

	return GET_BUFS_LIMIT(threads)
			* lzma_outq_outbuf_memusage(buf_size_max);
}


static void
move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
	assert(outq->head != NULL);
	assert(outq->tail != NULL);
	assert(outq->bufs_in_use > 0);

	lzma_outbuf *buf = outq->head;
	outq->head = buf->next;
	if (outq->head == NULL)
		outq->tail = NULL;

	if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
		lzma_outq_clear_cache(outq, allocator);

	buf->next = outq->cache;
	outq->cache = buf;

	--outq->bufs_in_use;
	outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated);

	return;
}


static void
free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
{
	assert(outq->cache != NULL);

	lzma_outbuf *buf = outq->cache;
	outq->cache = buf->next;

	--outq->bufs_allocated;
	outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated);

	lzma_free(buf, allocator);
	return;
}


extern void
lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
	while (outq->cache != NULL)
		free_one_cached_buffer(outq, allocator);

	return;
}


extern void
lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator,
		size_t keep_size)
{
	if (outq->cache == NULL)
		return;

	// Free all but one.
	while (outq->cache->next != NULL)
		free_one_cached_buffer(outq, allocator);

	// Free the last one only if its size doesn't equal to keep_size.
	if (outq->cache->allocated != keep_size)
		free_one_cached_buffer(outq, allocator);

	return;
}


extern lzma_ret
lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
		uint32_t threads)
{
	if (threads > LZMA_THREADS_MAX)
		return LZMA_OPTIONS_ERROR;

	const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);

	// Clear head/tail.
	while (outq->head != NULL)
		move_head_to_cache(outq, allocator);

	// If new buf_limit is lower than the old one, we may need to free
	// a few cached buffers.
	while (bufs_limit < outq->bufs_allocated)
		free_one_cached_buffer(outq, allocator);

	outq->bufs_limit = bufs_limit;
	outq->read_pos = 0;

	return LZMA_OK;
}


extern void
lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
{
	while (outq->head != NULL)
		move_head_to_cache(outq, allocator);

	lzma_outq_clear_cache(outq, allocator);
	return;
}


extern lzma_ret
lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
		size_t size)
{
	// Caller must have checked it with lzma_outq_has_buf().
	assert(outq->bufs_in_use < outq->bufs_limit);

	// If there already is appropriately-sized buffer in the cache,
	// we need to do nothing.
	if (outq->cache != NULL && outq->cache->allocated == size)
		return LZMA_OK;

	if (size > SIZE_MAX - sizeof(lzma_outbuf))
		return LZMA_MEM_ERROR;

	const size_t alloc_size = lzma_outq_outbuf_memusage(size);

	// The cache may have buffers but their size is wrong.
	lzma_outq_clear_cache(outq, allocator);

	outq->cache = lzma_alloc(alloc_size, allocator);
	if (outq->cache == NULL)
		return LZMA_MEM_ERROR;

	outq->cache->next = NULL;
	outq->cache->allocated = size;

	++outq->bufs_allocated;
	outq->mem_allocated += alloc_size;

	return LZMA_OK;
}


extern lzma_outbuf *
lzma_outq_get_buf(lzma_outq *outq, void *worker)
{
	// Caller must have used lzma_outq_prealloc_buf() to ensure these.
	assert(outq->bufs_in_use < outq->bufs_limit);
	assert(outq->bufs_in_use < outq->bufs_allocated);
	assert(outq->cache != NULL);

	lzma_outbuf *buf = outq->cache;
	outq->cache = buf->next;
	buf->next = NULL;

	if (outq->tail != NULL) {
		assert(outq->head != NULL);
		outq->tail->next = buf;
	} else {
		assert(outq->head == NULL);
		outq->head = buf;
	}

	outq->tail = buf;

	buf->worker = worker;
	buf->finished = false;
	buf->finish_ret = LZMA_STREAM_END;
	buf->pos = 0;
	buf->decoder_in_pos = 0;

	buf->unpadded_size = 0;
	buf->uncompressed_size = 0;

	++outq->bufs_in_use;
	outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated);

	return buf;
}


extern bool
lzma_outq_is_readable(const lzma_outq *outq)
{
	if (outq->head == NULL)
		return false;

	return outq->read_pos < outq->head->pos || outq->head->finished;
}


extern lzma_ret
lzma_outq_read(lzma_outq *restrict outq,
		const lzma_allocator *restrict allocator,
		uint8_t *restrict out, size_t *restrict out_pos,
		size_t out_size,
		lzma_vli *restrict unpadded_size,
		lzma_vli *restrict uncompressed_size)
{
	// There must be at least one buffer from which to read.
	if (outq->bufs_in_use == 0)
		return LZMA_OK;

	// Get the buffer.
	lzma_outbuf *buf = outq->head;

	// Copy from the buffer to output.
	//
	// FIXME? In threaded decoder it may be bad to do this copy while
	// the mutex is being held.
	lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
			out, out_pos, out_size);

	// Return if we didn't get all the data from the buffer.
	if (!buf->finished || outq->read_pos < buf->pos)
		return LZMA_OK;

	// The buffer was finished. Tell the caller its size information.
	if (unpadded_size != NULL)
		*unpadded_size = buf->unpadded_size;

	if (uncompressed_size != NULL)
		*uncompressed_size = buf->uncompressed_size;

	// Remember the return value.
	const lzma_ret finish_ret = buf->finish_ret;

	// Free this buffer for further use.
	move_head_to_cache(outq, allocator);
	outq->read_pos = 0;

	return finish_ret;
}


extern void
lzma_outq_enable_partial_output(lzma_outq *outq,
		void (*enable_partial_output)(void *worker))
{
	if (outq->head != NULL && !outq->head->finished
			&& outq->head->worker != NULL) {
		enable_partial_output(outq->head->worker);

		// Set it to NULL since calling it twice is pointless.
		outq->head->worker = NULL;
	}

	return;
}