diff options
Diffstat (limited to '')
-rw-r--r-- | other-licenses/7zstub/src/C/MtCoder.c | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/other-licenses/7zstub/src/C/MtCoder.c b/other-licenses/7zstub/src/C/MtCoder.c new file mode 100644 index 0000000000..ddc7c02858 --- /dev/null +++ b/other-licenses/7zstub/src/C/MtCoder.c @@ -0,0 +1,601 @@ +/* MtCoder.c -- Multi-thread Coder
+2018-02-21 : Igor Pavlov : Public domain */
+
+#include "Precomp.h"
+
+#include "MtCoder.h"
+
+#ifndef _7ZIP_ST
+
+SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
+{
+ CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
+ UInt64 inSize2 = 0;
+ UInt64 outSize2 = 0;
+ if (inSize != (UInt64)(Int64)-1)
+ {
+ inSize2 = inSize - thunk->inSize;
+ thunk->inSize = inSize;
+ }
+ if (outSize != (UInt64)(Int64)-1)
+ {
+ outSize2 = outSize - thunk->outSize;
+ thunk->outSize = outSize;
+ }
+ return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
+}
+
+
+void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
+{
+ p->vt.Progress = MtProgressThunk_Progress;
+}
+
+
+
+#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
+
+
+static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
+{
+ if (Event_IsCreated(p))
+ return Event_Reset(p);
+ return AutoResetEvent_CreateNotSignaled(p);
+}
+
+
+static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
+
+
+static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
+{
+ WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
+ if (wres == 0)
+ {
+ t->stop = False;
+ if (!Thread_WasCreated(&t->thread))
+ wres = Thread_Create(&t->thread, ThreadFunc, t);
+ if (wres == 0)
+ wres = Event_Set(&t->startEvent);
+ }
+ if (wres == 0)
+ return SZ_OK;
+ return MY_SRes_HRESULT_FROM_WRes(wres);
+}
+
+
+static void MtCoderThread_Destruct(CMtCoderThread *t)
+{
+ if (Thread_WasCreated(&t->thread))
+ {
+ t->stop = 1;
+ Event_Set(&t->startEvent);
+ Thread_Wait(&t->thread);
+ Thread_Close(&t->thread);
+ }
+
+ Event_Close(&t->startEvent);
+
+ if (t->inBuf)
+ {
+ ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
+ t->inBuf = NULL;
+ }
+}
+
+
+
+static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
+{
+ size_t size = *processedSize;
+ *processedSize = 0;
+ while (size != 0)
+ {
+ size_t cur = size;
+ SRes res = ISeqInStream_Read(stream, data, &cur);
+ *processedSize += cur;
+ data += cur;
+ size -= cur;
+ RINOK(res);
+ if (cur == 0)
+ return SZ_OK;
+ }
+ return SZ_OK;
+}
+
+
+/*
+ ThreadFunc2() returns:
+ SZ_OK - in all normal cases (even for stream error or memory allocation error)
+ SZ_ERROR_THREAD - in case of failure in system synch function
+*/
+
+static SRes ThreadFunc2(CMtCoderThread *t)
+{
+ CMtCoder *mtc = t->mtCoder;
+
+ for (;;)
+ {
+ unsigned bi;
+ SRes res;
+ SRes res2;
+ Bool finished;
+ unsigned bufIndex;
+ size_t size;
+ const Byte *inData;
+ UInt64 readProcessed = 0;
+
+ RINOK_THREAD(Event_Wait(&mtc->readEvent))
+
+ /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
+
+ if (mtc->stopReading)
+ {
+ return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
+ }
+
+ res = MtProgress_GetError(&mtc->mtProgress);
+
+ size = 0;
+ inData = NULL;
+ finished = True;
+
+ if (res == SZ_OK)
+ {
+ size = mtc->blockSize;
+ if (mtc->inStream)
+ {
+ if (!t->inBuf)
+ {
+ t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
+ if (!t->inBuf)
+ res = SZ_ERROR_MEM;
+ }
+ if (res == SZ_OK)
+ {
+ res = FullRead(mtc->inStream, t->inBuf, &size);
+ readProcessed = mtc->readProcessed + size;
+ mtc->readProcessed = readProcessed;
+ }
+ if (res != SZ_OK)
+ {
+ mtc->readRes = res;
+ /* after reading error - we can stop encoding of previous blocks */
+ MtProgress_SetError(&mtc->mtProgress, res);
+ }
+ else
+ finished = (size != mtc->blockSize);
+ }
+ else
+ {
+ size_t rem;
+ readProcessed = mtc->readProcessed;
+ rem = mtc->inDataSize - (size_t)readProcessed;
+ if (size > rem)
+ size = rem;
+ inData = mtc->inData + (size_t)readProcessed;
+ readProcessed += size;
+ mtc->readProcessed = readProcessed;
+ finished = (mtc->inDataSize == (size_t)readProcessed);
+ }
+ }
+
+ /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
+
+ res2 = SZ_OK;
+
+ if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
+ {
+ res2 = SZ_ERROR_THREAD;
+ if (res == SZ_OK)
+ {
+ res = res2;
+ // MtProgress_SetError(&mtc->mtProgress, res);
+ }
+ }
+
+ bi = mtc->blockIndex;
+
+ if (++mtc->blockIndex >= mtc->numBlocksMax)
+ mtc->blockIndex = 0;
+
+ bufIndex = (unsigned)(int)-1;
+
+ if (res == SZ_OK)
+ res = MtProgress_GetError(&mtc->mtProgress);
+
+ if (res != SZ_OK)
+ finished = True;
+
+ if (!finished)
+ {
+ if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
+ && mtc->expectedDataSize != readProcessed)
+ {
+ res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
+ if (res == SZ_OK)
+ mtc->numStartedThreads++;
+ else
+ {
+ MtProgress_SetError(&mtc->mtProgress, res);
+ finished = True;
+ }
+ }
+ }
+
+ if (finished)
+ mtc->stopReading = True;
+
+ RINOK_THREAD(Event_Set(&mtc->readEvent))
+
+ if (res2 != SZ_OK)
+ return res2;
+
+ if (res == SZ_OK)
+ {
+ CriticalSection_Enter(&mtc->cs);
+ bufIndex = mtc->freeBlockHead;
+ mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
+ CriticalSection_Leave(&mtc->cs);
+
+ res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
+ mtc->inStream ? t->inBuf : inData, size, finished);
+
+ // MtProgress_Reinit(&mtc->mtProgress, t->index);
+
+ if (res != SZ_OK)
+ MtProgress_SetError(&mtc->mtProgress, res);
+ }
+
+ {
+ CMtCoderBlock *block = &mtc->blocks[bi];
+ block->res = res;
+ block->bufIndex = bufIndex;
+ block->finished = finished;
+ }
+
+ #ifdef MTCODER__USE_WRITE_THREAD
+ RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
+ #else
+ {
+ unsigned wi;
+ {
+ CriticalSection_Enter(&mtc->cs);
+ wi = mtc->writeIndex;
+ if (wi == bi)
+ mtc->writeIndex = (unsigned)(int)-1;
+ else
+ mtc->ReadyBlocks[bi] = True;
+ CriticalSection_Leave(&mtc->cs);
+ }
+
+ if (wi != bi)
+ {
+ if (res != SZ_OK || finished)
+ return 0;
+ continue;
+ }
+
+ if (mtc->writeRes != SZ_OK)
+ res = mtc->writeRes;
+
+ for (;;)
+ {
+ if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
+ {
+ res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
+ if (res != SZ_OK)
+ {
+ mtc->writeRes = res;
+ MtProgress_SetError(&mtc->mtProgress, res);
+ }
+ }
+
+ if (++wi >= mtc->numBlocksMax)
+ wi = 0;
+ {
+ Bool isReady;
+
+ CriticalSection_Enter(&mtc->cs);
+
+ if (bufIndex != (unsigned)(int)-1)
+ {
+ mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
+ mtc->freeBlockHead = bufIndex;
+ }
+
+ isReady = mtc->ReadyBlocks[wi];
+
+ if (isReady)
+ mtc->ReadyBlocks[wi] = False;
+ else
+ mtc->writeIndex = wi;
+
+ CriticalSection_Leave(&mtc->cs);
+
+ RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
+
+ if (!isReady)
+ break;
+ }
+
+ {
+ CMtCoderBlock *block = &mtc->blocks[wi];
+ if (res == SZ_OK && block->res != SZ_OK)
+ res = block->res;
+ bufIndex = block->bufIndex;
+ finished = block->finished;
+ }
+ }
+ }
+ #endif
+
+ if (finished || res != SZ_OK)
+ return 0;
+ }
+}
+
+
+static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
+{
+ CMtCoderThread *t = (CMtCoderThread *)pp;
+ for (;;)
+ {
+ if (Event_Wait(&t->startEvent) != 0)
+ return SZ_ERROR_THREAD;
+ if (t->stop)
+ return 0;
+ {
+ SRes res = ThreadFunc2(t);
+ CMtCoder *mtc = t->mtCoder;
+ if (res != SZ_OK)
+ {
+ MtProgress_SetError(&mtc->mtProgress, res);
+ }
+
+ #ifndef MTCODER__USE_WRITE_THREAD
+ {
+ unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
+ if (numFinished == mtc->numStartedThreads)
+ if (Event_Set(&mtc->finishedEvent) != 0)
+ return SZ_ERROR_THREAD;
+ }
+ #endif
+ }
+ }
+}
+
+
+
+void MtCoder_Construct(CMtCoder *p)
+{
+ unsigned i;
+
+ p->blockSize = 0;
+ p->numThreadsMax = 0;
+ p->expectedDataSize = (UInt64)(Int64)-1;
+
+ p->inStream = NULL;
+ p->inData = NULL;
+ p->inDataSize = 0;
+
+ p->progress = NULL;
+ p->allocBig = NULL;
+
+ p->mtCallback = NULL;
+ p->mtCallbackObject = NULL;
+
+ p->allocatedBufsSize = 0;
+
+ Event_Construct(&p->readEvent);
+ Semaphore_Construct(&p->blocksSemaphore);
+
+ for (i = 0; i < MTCODER__THREADS_MAX; i++)
+ {
+ CMtCoderThread *t = &p->threads[i];
+ t->mtCoder = p;
+ t->index = i;
+ t->inBuf = NULL;
+ t->stop = False;
+ Event_Construct(&t->startEvent);
+ Thread_Construct(&t->thread);
+ }
+
+ #ifdef MTCODER__USE_WRITE_THREAD
+ for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
+ Event_Construct(&p->writeEvents[i]);
+ #else
+ Event_Construct(&p->finishedEvent);
+ #endif
+
+ CriticalSection_Init(&p->cs);
+ CriticalSection_Init(&p->mtProgress.cs);
+}
+
+
+
+
+static void MtCoder_Free(CMtCoder *p)
+{
+ unsigned i;
+
+ /*
+ p->stopReading = True;
+ if (Event_IsCreated(&p->readEvent))
+ Event_Set(&p->readEvent);
+ */
+
+ for (i = 0; i < MTCODER__THREADS_MAX; i++)
+ MtCoderThread_Destruct(&p->threads[i]);
+
+ Event_Close(&p->readEvent);
+ Semaphore_Close(&p->blocksSemaphore);
+
+ #ifdef MTCODER__USE_WRITE_THREAD
+ for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
+ Event_Close(&p->writeEvents[i]);
+ #else
+ Event_Close(&p->finishedEvent);
+ #endif
+}
+
+
+void MtCoder_Destruct(CMtCoder *p)
+{
+ MtCoder_Free(p);
+
+ CriticalSection_Delete(&p->cs);
+ CriticalSection_Delete(&p->mtProgress.cs);
+}
+
+
+SRes MtCoder_Code(CMtCoder *p)
+{
+ unsigned numThreads = p->numThreadsMax;
+ unsigned numBlocksMax;
+ unsigned i;
+ SRes res = SZ_OK;
+
+ if (numThreads > MTCODER__THREADS_MAX)
+ numThreads = MTCODER__THREADS_MAX;
+ numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
+
+ if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
+ if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
+ if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
+
+ if (numBlocksMax > MTCODER__BLOCKS_MAX)
+ numBlocksMax = MTCODER__BLOCKS_MAX;
+
+ if (p->blockSize != p->allocatedBufsSize)
+ {
+ for (i = 0; i < MTCODER__THREADS_MAX; i++)
+ {
+ CMtCoderThread *t = &p->threads[i];
+ if (t->inBuf)
+ {
+ ISzAlloc_Free(p->allocBig, t->inBuf);
+ t->inBuf = NULL;
+ }
+ }
+ p->allocatedBufsSize = p->blockSize;
+ }
+
+ p->readRes = SZ_OK;
+
+ MtProgress_Init(&p->mtProgress, p->progress);
+
+ #ifdef MTCODER__USE_WRITE_THREAD
+ for (i = 0; i < numBlocksMax; i++)
+ {
+ RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
+ }
+ #else
+ RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
+ #endif
+
+ {
+ RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
+
+ if (Semaphore_IsCreated(&p->blocksSemaphore))
+ {
+ RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
+ }
+ RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
+ }
+
+ for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
+ p->freeBlockList[i] = i + 1;
+ p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
+ p->freeBlockHead = 0;
+
+ p->readProcessed = 0;
+ p->blockIndex = 0;
+ p->numBlocksMax = numBlocksMax;
+ p->stopReading = False;
+
+ #ifndef MTCODER__USE_WRITE_THREAD
+ p->writeIndex = 0;
+ p->writeRes = SZ_OK;
+ for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
+ p->ReadyBlocks[i] = False;
+ p->numFinishedThreads = 0;
+ #endif
+
+ p->numStartedThreadsLimit = numThreads;
+ p->numStartedThreads = 0;
+
+ // for (i = 0; i < numThreads; i++)
+ {
+ CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
+ RINOK(MtCoderThread_CreateAndStart(nextThread));
+ }
+
+ RINOK_THREAD(Event_Set(&p->readEvent))
+
+ #ifdef MTCODER__USE_WRITE_THREAD
+ {
+ unsigned bi = 0;
+
+ for (;; bi++)
+ {
+ if (bi >= numBlocksMax)
+ bi = 0;
+
+ RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
+
+ {
+ const CMtCoderBlock *block = &p->blocks[bi];
+ unsigned bufIndex = block->bufIndex;
+ Bool finished = block->finished;
+ if (res == SZ_OK && block->res != SZ_OK)
+ res = block->res;
+
+ if (bufIndex != (unsigned)(int)-1)
+ {
+ if (res == SZ_OK)
+ {
+ res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
+ if (res != SZ_OK)
+ MtProgress_SetError(&p->mtProgress, res);
+ }
+
+ CriticalSection_Enter(&p->cs);
+ {
+ p->freeBlockList[bufIndex] = p->freeBlockHead;
+ p->freeBlockHead = bufIndex;
+ }
+ CriticalSection_Leave(&p->cs);
+ }
+
+ RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
+
+ if (finished)
+ break;
+ }
+ }
+ }
+ #else
+ {
+ WRes wres = Event_Wait(&p->finishedEvent);
+ res = MY_SRes_HRESULT_FROM_WRes(wres);
+ }
+ #endif
+
+ if (res == SZ_OK)
+ res = p->readRes;
+
+ if (res == SZ_OK)
+ res = p->mtProgress.res;
+
+ #ifndef MTCODER__USE_WRITE_THREAD
+ if (res == SZ_OK)
+ res = p->writeRes;
+ #endif
+
+ if (res != SZ_OK)
+ MtCoder_Free(p);
+ return res;
+}
+
+#endif
|