From 36d22d82aa202bb199967e9512281e9a53db42c9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 21:33:14 +0200 Subject: Adding upstream version 115.7.0esr. Signed-off-by: Daniel Baumann --- other-licenses/7zstub/src/C/MtCoder.c | 601 ++++++++++++++++++++++++++++++++++ 1 file changed, 601 insertions(+) create mode 100644 other-licenses/7zstub/src/C/MtCoder.c (limited to 'other-licenses/7zstub/src/C/MtCoder.c') diff --git a/other-licenses/7zstub/src/C/MtCoder.c b/other-licenses/7zstub/src/C/MtCoder.c new file mode 100644 index 0000000000..ddc7c02858 --- /dev/null +++ b/other-licenses/7zstub/src/C/MtCoder.c @@ -0,0 +1,601 @@ +/* MtCoder.c -- Multi-thread Coder +2018-02-21 : Igor Pavlov : Public domain */ + +#include "Precomp.h" + +#include "MtCoder.h" + +#ifndef _7ZIP_ST + +SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize) +{ + CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt); + UInt64 inSize2 = 0; + UInt64 outSize2 = 0; + if (inSize != (UInt64)(Int64)-1) + { + inSize2 = inSize - thunk->inSize; + thunk->inSize = inSize; + } + if (outSize != (UInt64)(Int64)-1) + { + outSize2 = outSize - thunk->outSize; + thunk->outSize = outSize; + } + return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2); +} + + +void MtProgressThunk_CreateVTable(CMtProgressThunk *p) +{ + p->vt.Progress = MtProgressThunk_Progress; +} + + + +#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } + + +static WRes ArEvent_OptCreate_And_Reset(CEvent *p) +{ + if (Event_IsCreated(p)) + return Event_Reset(p); + return AutoResetEvent_CreateNotSignaled(p); +} + + +static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp); + + +static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) +{ + WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent); + if (wres == 0) + { + t->stop = False; + if (!Thread_WasCreated(&t->thread)) + wres = Thread_Create(&t->thread, ThreadFunc, t); + if (wres == 0) + wres = Event_Set(&t->startEvent); + } + if (wres == 0) + return SZ_OK; + return MY_SRes_HRESULT_FROM_WRes(wres); +} + + +static void MtCoderThread_Destruct(CMtCoderThread *t) +{ + if (Thread_WasCreated(&t->thread)) + { + t->stop = 1; + Event_Set(&t->startEvent); + Thread_Wait(&t->thread); + Thread_Close(&t->thread); + } + + Event_Close(&t->startEvent); + + if (t->inBuf) + { + ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf); + t->inBuf = NULL; + } +} + + + +static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) +{ + size_t size = *processedSize; + *processedSize = 0; + while (size != 0) + { + size_t cur = size; + SRes res = ISeqInStream_Read(stream, data, &cur); + *processedSize += cur; + data += cur; + size -= cur; + RINOK(res); + if (cur == 0) + return SZ_OK; + } + return SZ_OK; +} + + +/* + ThreadFunc2() returns: + SZ_OK - in all normal cases (even for stream error or memory allocation error) + SZ_ERROR_THREAD - in case of failure in system synch function +*/ + +static SRes ThreadFunc2(CMtCoderThread *t) +{ + CMtCoder *mtc = t->mtCoder; + + for (;;) + { + unsigned bi; + SRes res; + SRes res2; + Bool finished; + unsigned bufIndex; + size_t size; + const Byte *inData; + UInt64 readProcessed = 0; + + RINOK_THREAD(Event_Wait(&mtc->readEvent)) + + /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */ + + if (mtc->stopReading) + { + return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD; + } + + res = MtProgress_GetError(&mtc->mtProgress); + + size = 0; + inData = NULL; + finished = True; + + if (res == SZ_OK) + { + size = mtc->blockSize; + if (mtc->inStream) + { + if (!t->inBuf) + { + t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize); + if (!t->inBuf) + res = SZ_ERROR_MEM; + } + if (res == SZ_OK) + { + res = FullRead(mtc->inStream, t->inBuf, &size); + readProcessed = mtc->readProcessed + size; + mtc->readProcessed = readProcessed; + } + if (res != SZ_OK) + { + mtc->readRes = res; + /* after reading error - we can stop encoding of previous blocks */ + MtProgress_SetError(&mtc->mtProgress, res); + } + else + finished = (size != mtc->blockSize); + } + else + { + size_t rem; + readProcessed = mtc->readProcessed; + rem = mtc->inDataSize - (size_t)readProcessed; + if (size > rem) + size = rem; + inData = mtc->inData + (size_t)readProcessed; + readProcessed += size; + mtc->readProcessed = readProcessed; + finished = (mtc->inDataSize == (size_t)readProcessed); + } + } + + /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */ + + res2 = SZ_OK; + + if (Semaphore_Wait(&mtc->blocksSemaphore) != 0) + { + res2 = SZ_ERROR_THREAD; + if (res == SZ_OK) + { + res = res2; + // MtProgress_SetError(&mtc->mtProgress, res); + } + } + + bi = mtc->blockIndex; + + if (++mtc->blockIndex >= mtc->numBlocksMax) + mtc->blockIndex = 0; + + bufIndex = (unsigned)(int)-1; + + if (res == SZ_OK) + res = MtProgress_GetError(&mtc->mtProgress); + + if (res != SZ_OK) + finished = True; + + if (!finished) + { + if (mtc->numStartedThreads < mtc->numStartedThreadsLimit + && mtc->expectedDataSize != readProcessed) + { + res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); + if (res == SZ_OK) + mtc->numStartedThreads++; + else + { + MtProgress_SetError(&mtc->mtProgress, res); + finished = True; + } + } + } + + if (finished) + mtc->stopReading = True; + + RINOK_THREAD(Event_Set(&mtc->readEvent)) + + if (res2 != SZ_OK) + return res2; + + if (res == SZ_OK) + { + CriticalSection_Enter(&mtc->cs); + bufIndex = mtc->freeBlockHead; + mtc->freeBlockHead = mtc->freeBlockList[bufIndex]; + CriticalSection_Leave(&mtc->cs); + + res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex, + mtc->inStream ? t->inBuf : inData, size, finished); + + // MtProgress_Reinit(&mtc->mtProgress, t->index); + + if (res != SZ_OK) + MtProgress_SetError(&mtc->mtProgress, res); + } + + { + CMtCoderBlock *block = &mtc->blocks[bi]; + block->res = res; + block->bufIndex = bufIndex; + block->finished = finished; + } + + #ifdef MTCODER__USE_WRITE_THREAD + RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) + #else + { + unsigned wi; + { + CriticalSection_Enter(&mtc->cs); + wi = mtc->writeIndex; + if (wi == bi) + mtc->writeIndex = (unsigned)(int)-1; + else + mtc->ReadyBlocks[bi] = True; + CriticalSection_Leave(&mtc->cs); + } + + if (wi != bi) + { + if (res != SZ_OK || finished) + return 0; + continue; + } + + if (mtc->writeRes != SZ_OK) + res = mtc->writeRes; + + for (;;) + { + if (res == SZ_OK && bufIndex != (unsigned)(int)-1) + { + res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex); + if (res != SZ_OK) + { + mtc->writeRes = res; + MtProgress_SetError(&mtc->mtProgress, res); + } + } + + if (++wi >= mtc->numBlocksMax) + wi = 0; + { + Bool isReady; + + CriticalSection_Enter(&mtc->cs); + + if (bufIndex != (unsigned)(int)-1) + { + mtc->freeBlockList[bufIndex] = mtc->freeBlockHead; + mtc->freeBlockHead = bufIndex; + } + + isReady = mtc->ReadyBlocks[wi]; + + if (isReady) + mtc->ReadyBlocks[wi] = False; + else + mtc->writeIndex = wi; + + CriticalSection_Leave(&mtc->cs); + + RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore)) + + if (!isReady) + break; + } + + { + CMtCoderBlock *block = &mtc->blocks[wi]; + if (res == SZ_OK && block->res != SZ_OK) + res = block->res; + bufIndex = block->bufIndex; + finished = block->finished; + } + } + } + #endif + + if (finished || res != SZ_OK) + return 0; + } +} + + +static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) +{ + CMtCoderThread *t = (CMtCoderThread *)pp; + for (;;) + { + if (Event_Wait(&t->startEvent) != 0) + return SZ_ERROR_THREAD; + if (t->stop) + return 0; + { + SRes res = ThreadFunc2(t); + CMtCoder *mtc = t->mtCoder; + if (res != SZ_OK) + { + MtProgress_SetError(&mtc->mtProgress, res); + } + + #ifndef MTCODER__USE_WRITE_THREAD + { + unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); + if (numFinished == mtc->numStartedThreads) + if (Event_Set(&mtc->finishedEvent) != 0) + return SZ_ERROR_THREAD; + } + #endif + } + } +} + + + +void MtCoder_Construct(CMtCoder *p) +{ + unsigned i; + + p->blockSize = 0; + p->numThreadsMax = 0; + p->expectedDataSize = (UInt64)(Int64)-1; + + p->inStream = NULL; + p->inData = NULL; + p->inDataSize = 0; + + p->progress = NULL; + p->allocBig = NULL; + + p->mtCallback = NULL; + p->mtCallbackObject = NULL; + + p->allocatedBufsSize = 0; + + Event_Construct(&p->readEvent); + Semaphore_Construct(&p->blocksSemaphore); + + for (i = 0; i < MTCODER__THREADS_MAX; i++) + { + CMtCoderThread *t = &p->threads[i]; + t->mtCoder = p; + t->index = i; + t->inBuf = NULL; + t->stop = False; + Event_Construct(&t->startEvent); + Thread_Construct(&t->thread); + } + + #ifdef MTCODER__USE_WRITE_THREAD + for (i = 0; i < MTCODER__BLOCKS_MAX; i++) + Event_Construct(&p->writeEvents[i]); + #else + Event_Construct(&p->finishedEvent); + #endif + + CriticalSection_Init(&p->cs); + CriticalSection_Init(&p->mtProgress.cs); +} + + + + +static void MtCoder_Free(CMtCoder *p) +{ + unsigned i; + + /* + p->stopReading = True; + if (Event_IsCreated(&p->readEvent)) + Event_Set(&p->readEvent); + */ + + for (i = 0; i < MTCODER__THREADS_MAX; i++) + MtCoderThread_Destruct(&p->threads[i]); + + Event_Close(&p->readEvent); + Semaphore_Close(&p->blocksSemaphore); + + #ifdef MTCODER__USE_WRITE_THREAD + for (i = 0; i < MTCODER__BLOCKS_MAX; i++) + Event_Close(&p->writeEvents[i]); + #else + Event_Close(&p->finishedEvent); + #endif +} + + +void MtCoder_Destruct(CMtCoder *p) +{ + MtCoder_Free(p); + + CriticalSection_Delete(&p->cs); + CriticalSection_Delete(&p->mtProgress.cs); +} + + +SRes MtCoder_Code(CMtCoder *p) +{ + unsigned numThreads = p->numThreadsMax; + unsigned numBlocksMax; + unsigned i; + SRes res = SZ_OK; + + if (numThreads > MTCODER__THREADS_MAX) + numThreads = MTCODER__THREADS_MAX; + numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads); + + if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; + if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; + if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; + + if (numBlocksMax > MTCODER__BLOCKS_MAX) + numBlocksMax = MTCODER__BLOCKS_MAX; + + if (p->blockSize != p->allocatedBufsSize) + { + for (i = 0; i < MTCODER__THREADS_MAX; i++) + { + CMtCoderThread *t = &p->threads[i]; + if (t->inBuf) + { + ISzAlloc_Free(p->allocBig, t->inBuf); + t->inBuf = NULL; + } + } + p->allocatedBufsSize = p->blockSize; + } + + p->readRes = SZ_OK; + + MtProgress_Init(&p->mtProgress, p->progress); + + #ifdef MTCODER__USE_WRITE_THREAD + for (i = 0; i < numBlocksMax; i++) + { + RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i])); + } + #else + RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); + #endif + + { + RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent)); + + if (Semaphore_IsCreated(&p->blocksSemaphore)) + { + RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore)); + } + RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax)); + } + + for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++) + p->freeBlockList[i] = i + 1; + p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1; + p->freeBlockHead = 0; + + p->readProcessed = 0; + p->blockIndex = 0; + p->numBlocksMax = numBlocksMax; + p->stopReading = False; + + #ifndef MTCODER__USE_WRITE_THREAD + p->writeIndex = 0; + p->writeRes = SZ_OK; + for (i = 0; i < MTCODER__BLOCKS_MAX; i++) + p->ReadyBlocks[i] = False; + p->numFinishedThreads = 0; + #endif + + p->numStartedThreadsLimit = numThreads; + p->numStartedThreads = 0; + + // for (i = 0; i < numThreads; i++) + { + CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; + RINOK(MtCoderThread_CreateAndStart(nextThread)); + } + + RINOK_THREAD(Event_Set(&p->readEvent)) + + #ifdef MTCODER__USE_WRITE_THREAD + { + unsigned bi = 0; + + for (;; bi++) + { + if (bi >= numBlocksMax) + bi = 0; + + RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) + + { + const CMtCoderBlock *block = &p->blocks[bi]; + unsigned bufIndex = block->bufIndex; + Bool finished = block->finished; + if (res == SZ_OK && block->res != SZ_OK) + res = block->res; + + if (bufIndex != (unsigned)(int)-1) + { + if (res == SZ_OK) + { + res = p->mtCallback->Write(p->mtCallbackObject, bufIndex); + if (res != SZ_OK) + MtProgress_SetError(&p->mtProgress, res); + } + + CriticalSection_Enter(&p->cs); + { + p->freeBlockList[bufIndex] = p->freeBlockHead; + p->freeBlockHead = bufIndex; + } + CriticalSection_Leave(&p->cs); + } + + RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore)) + + if (finished) + break; + } + } + } + #else + { + WRes wres = Event_Wait(&p->finishedEvent); + res = MY_SRes_HRESULT_FROM_WRes(wres); + } + #endif + + if (res == SZ_OK) + res = p->readRes; + + if (res == SZ_OK) + res = p->mtProgress.res; + + #ifndef MTCODER__USE_WRITE_THREAD + if (res == SZ_OK) + res = p->writeRes; + #endif + + if (res != SZ_OK) + MtCoder_Free(p); + return res; +} + +#endif -- cgit v1.2.3