diff options
Diffstat (limited to 'src/liblzma/common/outqueue.c')
-rw-r--r-- | src/liblzma/common/outqueue.c | 287 |
1 files changed, 287 insertions, 0 deletions
diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c new file mode 100644 index 0000000..71e8648 --- /dev/null +++ b/src/liblzma/common/outqueue.c @@ -0,0 +1,287 @@ +/////////////////////////////////////////////////////////////////////////////// +// +/// \file outqueue.c +/// \brief Output queue handling in multithreaded coding +// +// Author: Lasse Collin +// +// This file has been put into the public domain. +// You can do whatever you want with this file. +// +/////////////////////////////////////////////////////////////////////////////// + +#include "outqueue.h" + + +/// Get the maximum number of buffers that may be allocated based +/// on the number of threads. For now this is twice the number of threads. +/// It's a compromise between RAM usage and keeping the worker threads busy +/// when buffers finish out of order. +#define GET_BUFS_LIMIT(threads) (2 * (threads)) + + +extern uint64_t +lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) +{ + // This is to ease integer overflow checking: We may allocate up to + // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra + // memory for other data structures too (that's the /2). + // + // lzma_outq_prealloc_buf() will still accept bigger buffers than this. + const uint64_t limit + = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; + + if (threads > LZMA_THREADS_MAX || buf_size_max > limit) + return UINT64_MAX; + + return GET_BUFS_LIMIT(threads) + * lzma_outq_outbuf_memusage(buf_size_max); +} + + +static void +move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) +{ + assert(outq->head != NULL); + assert(outq->tail != NULL); + assert(outq->bufs_in_use > 0); + + lzma_outbuf *buf = outq->head; + outq->head = buf->next; + if (outq->head == NULL) + outq->tail = NULL; + + if (outq->cache != NULL && outq->cache->allocated != buf->allocated) + lzma_outq_clear_cache(outq, allocator); + + buf->next = outq->cache; + outq->cache = buf; + + --outq->bufs_in_use; + outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated); + + return; +} + + +static void +free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) +{ + assert(outq->cache != NULL); + + lzma_outbuf *buf = outq->cache; + outq->cache = buf->next; + + --outq->bufs_allocated; + outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated); + + lzma_free(buf, allocator); + return; +} + + +extern void +lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) +{ + while (outq->cache != NULL) + free_one_cached_buffer(outq, allocator); + + return; +} + + +extern void +lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator, + size_t keep_size) +{ + if (outq->cache == NULL) + return; + + // Free all but one. + while (outq->cache->next != NULL) + free_one_cached_buffer(outq, allocator); + + // Free the last one only if its size doesn't equal to keep_size. + if (outq->cache->allocated != keep_size) + free_one_cached_buffer(outq, allocator); + + return; +} + + +extern lzma_ret +lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, + uint32_t threads) +{ + if (threads > LZMA_THREADS_MAX) + return LZMA_OPTIONS_ERROR; + + const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); + + // Clear head/tail. + while (outq->head != NULL) + move_head_to_cache(outq, allocator); + + // If new buf_limit is lower than the old one, we may need to free + // a few cached buffers. + while (bufs_limit < outq->bufs_allocated) + free_one_cached_buffer(outq, allocator); + + outq->bufs_limit = bufs_limit; + outq->read_pos = 0; + + return LZMA_OK; +} + + +extern void +lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) +{ + while (outq->head != NULL) + move_head_to_cache(outq, allocator); + + lzma_outq_clear_cache(outq, allocator); + return; +} + + +extern lzma_ret +lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, + size_t size) +{ + // Caller must have checked it with lzma_outq_has_buf(). + assert(outq->bufs_in_use < outq->bufs_limit); + + // If there already is appropriately-sized buffer in the cache, + // we need to do nothing. + if (outq->cache != NULL && outq->cache->allocated == size) + return LZMA_OK; + + if (size > SIZE_MAX - sizeof(lzma_outbuf)) + return LZMA_MEM_ERROR; + + const size_t alloc_size = lzma_outq_outbuf_memusage(size); + + // The cache may have buffers but their size is wrong. + lzma_outq_clear_cache(outq, allocator); + + outq->cache = lzma_alloc(alloc_size, allocator); + if (outq->cache == NULL) + return LZMA_MEM_ERROR; + + outq->cache->next = NULL; + outq->cache->allocated = size; + + ++outq->bufs_allocated; + outq->mem_allocated += alloc_size; + + return LZMA_OK; +} + + +extern lzma_outbuf * +lzma_outq_get_buf(lzma_outq *outq, void *worker) +{ + // Caller must have used lzma_outq_prealloc_buf() to ensure these. + assert(outq->bufs_in_use < outq->bufs_limit); + assert(outq->bufs_in_use < outq->bufs_allocated); + assert(outq->cache != NULL); + + lzma_outbuf *buf = outq->cache; + outq->cache = buf->next; + buf->next = NULL; + + if (outq->tail != NULL) { + assert(outq->head != NULL); + outq->tail->next = buf; + } else { + assert(outq->head == NULL); + outq->head = buf; + } + + outq->tail = buf; + + buf->worker = worker; + buf->finished = false; + buf->finish_ret = LZMA_STREAM_END; + buf->pos = 0; + buf->decoder_in_pos = 0; + + buf->unpadded_size = 0; + buf->uncompressed_size = 0; + + ++outq->bufs_in_use; + outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated); + + return buf; +} + + +extern bool +lzma_outq_is_readable(const lzma_outq *outq) +{ + if (outq->head == NULL) + return false; + + return outq->read_pos < outq->head->pos || outq->head->finished; +} + + +extern lzma_ret +lzma_outq_read(lzma_outq *restrict outq, + const lzma_allocator *restrict allocator, + uint8_t *restrict out, size_t *restrict out_pos, + size_t out_size, + lzma_vli *restrict unpadded_size, + lzma_vli *restrict uncompressed_size) +{ + // There must be at least one buffer from which to read. + if (outq->bufs_in_use == 0) + return LZMA_OK; + + // Get the buffer. + lzma_outbuf *buf = outq->head; + + // Copy from the buffer to output. + // + // FIXME? In threaded decoder it may be bad to do this copy while + // the mutex is being held. + lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, + out, out_pos, out_size); + + // Return if we didn't get all the data from the buffer. + if (!buf->finished || outq->read_pos < buf->pos) + return LZMA_OK; + + // The buffer was finished. Tell the caller its size information. + if (unpadded_size != NULL) + *unpadded_size = buf->unpadded_size; + + if (uncompressed_size != NULL) + *uncompressed_size = buf->uncompressed_size; + + // Remember the return value. + const lzma_ret finish_ret = buf->finish_ret; + + // Free this buffer for further use. + move_head_to_cache(outq, allocator); + outq->read_pos = 0; + + return finish_ret; +} + + +extern void +lzma_outq_enable_partial_output(lzma_outq *outq, + void (*enable_partial_output)(void *worker)) +{ + if (outq->head != NULL && !outq->head->finished + && outq->head->worker != NULL) { + enable_partial_output(outq->head->worker); + + // Set it to NULL since calling it twice is pointless. + outq->head->worker = NULL; + } + + return; +} |