summaryrefslogtreecommitdiffstats
path: root/src/zstd/contrib/adaptive-compression/adapt.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/zstd/contrib/adaptive-compression/adapt.c1137
1 files changed, 1137 insertions, 0 deletions
diff --git a/src/zstd/contrib/adaptive-compression/adapt.c b/src/zstd/contrib/adaptive-compression/adapt.c
new file mode 100644
index 00000000..8f9c678c
--- /dev/null
+++ b/src/zstd/contrib/adaptive-compression/adapt.c
@@ -0,0 +1,1137 @@
+/*
+ * Copyright (c) 2017-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+
+#include <stdio.h> /* fprintf */
+#include <stdlib.h> /* malloc, free */
+#include <pthread.h> /* pthread functions */
+#include <string.h> /* memset */
+#include "zstd_internal.h"
+#include "util.h"
+
+#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
+#define PRINT(...) fprintf(stdout, __VA_ARGS__)
+#define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
+#define FILE_CHUNK_SIZE 4 << 20
+#define MAX_NUM_JOBS 2
+#define stdinmark "/*stdin*\\"
+#define stdoutmark "/*stdout*\\"
+#define MAX_PATH 256
+#define DEFAULT_DISPLAY_LEVEL 1
+#define DEFAULT_COMPRESSION_LEVEL 6
+#define MAX_COMPRESSION_LEVEL_CHANGE 2
+#define CONVERGENCE_LOWER_BOUND 5
+#define CLEVEL_DECREASE_COOLDOWN 5
+#define CHANGE_BY_TWO_THRESHOLD 0.1
+#define CHANGE_BY_ONE_THRESHOLD 0.65
+
+#ifndef DEBUG_MODE
+static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
+#else
+static int g_displayLevel = DEBUG_MODE;
+#endif
+
+static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
+static UTIL_time_t g_startTime;
+static size_t g_streamedSize = 0;
+static unsigned g_useProgressBar = 1;
+static UTIL_freq_t g_ticksPerSecond;
+static unsigned g_forceCompressionLevel = 0;
+static unsigned g_minCLevel = 1;
+static unsigned g_maxCLevel;
+
+typedef struct {
+ void* start;
+ size_t size;
+ size_t capacity;
+} buffer_t;
+
+typedef struct {
+ size_t filled;
+ buffer_t buffer;
+} inBuff_t;
+
+typedef struct {
+ buffer_t src;
+ buffer_t dst;
+ unsigned jobID;
+ unsigned lastJobPlusOne;
+ size_t compressedSize;
+ size_t dictSize;
+} jobDescription;
+
+typedef struct {
+ pthread_mutex_t pMutex;
+ int noError;
+} mutex_t;
+
+typedef struct {
+ pthread_cond_t pCond;
+ int noError;
+} cond_t;
+
+typedef struct {
+ unsigned compressionLevel;
+ unsigned numJobs;
+ unsigned nextJobID;
+ unsigned threadError;
+
+ /*
+ * JobIDs for the next jobs to be created, compressed, and written
+ */
+ unsigned jobReadyID;
+ unsigned jobCompressedID;
+ unsigned jobWriteID;
+ unsigned allJobsCompleted;
+
+ /*
+ * counter for how many jobs in a row the compression level has not changed
+ * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the
+ * compression level tries to change (by non-zero amount) resets the counter
+ * to 1 and does not apply the change
+ */
+ unsigned convergenceCounter;
+
+ /*
+ * cooldown counter in order to prevent rapid successive decreases in compression level
+ * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN
+ * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented
+ * as long as cooldown != 0, the compression level cannot be decreased
+ */
+ unsigned cooldown;
+
+ /*
+ * XWaitYCompletion
+ * Range from 0.0 to 1.0
+ * if the value is not 1.0, then this implies that thread X waited on thread Y to finish
+ * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5
+ * implies that the compression thread waited on the write thread and it was only 50% finished writing a job)
+ */
+ double createWaitCompressionCompletion;
+ double compressWaitCreateCompletion;
+ double compressWaitWriteCompletion;
+ double writeWaitCompressionCompletion;
+
+ /*
+ * Completion values
+ * Range from 0.0 to 1.0
+ * Jobs are divided into mini-chunks in order to measure completion
+ * these values are updated each time a thread finishes its operation on the
+ * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk).
+ */
+ double compressionCompletion;
+ double writeCompletion;
+ double createCompletion;
+
+ mutex_t jobCompressed_mutex;
+ cond_t jobCompressed_cond;
+ mutex_t jobReady_mutex;
+ cond_t jobReady_cond;
+ mutex_t allJobsCompleted_mutex;
+ cond_t allJobsCompleted_cond;
+ mutex_t jobWrite_mutex;
+ cond_t jobWrite_cond;
+ mutex_t compressionCompletion_mutex;
+ mutex_t createCompletion_mutex;
+ mutex_t writeCompletion_mutex;
+ mutex_t compressionLevel_mutex;
+ size_t lastDictSize;
+ inBuff_t input;
+ jobDescription* jobs;
+ ZSTD_CCtx* cctx;
+} adaptCCtx;
+
+typedef struct {
+ adaptCCtx* ctx;
+ FILE* dstFile;
+} outputThreadArg;
+
+typedef struct {
+ FILE* srcFile;
+ adaptCCtx* ctx;
+ outputThreadArg* otArg;
+} fcResources;
+
+static void freeCompressionJobs(adaptCCtx* ctx)
+{
+ unsigned u;
+ for (u=0; u<ctx->numJobs; u++) {
+ jobDescription job = ctx->jobs[u];
+ free(job.dst.start);
+ free(job.src.start);
+ }
+}
+
+static int destroyMutex(mutex_t* mutex)
+{
+ if (mutex->noError) {
+ int const ret = pthread_mutex_destroy(&mutex->pMutex);
+ return ret;
+ }
+ return 0;
+}
+
+static int destroyCond(cond_t* cond)
+{
+ if (cond->noError) {
+ int const ret = pthread_cond_destroy(&cond->pCond);
+ return ret;
+ }
+ return 0;
+}
+
+static int freeCCtx(adaptCCtx* ctx)
+{
+ if (!ctx) return 0;
+ {
+ int error = 0;
+ error |= destroyMutex(&ctx->jobCompressed_mutex);
+ error |= destroyCond(&ctx->jobCompressed_cond);
+ error |= destroyMutex(&ctx->jobReady_mutex);
+ error |= destroyCond(&ctx->jobReady_cond);
+ error |= destroyMutex(&ctx->allJobsCompleted_mutex);
+ error |= destroyCond(&ctx->allJobsCompleted_cond);
+ error |= destroyMutex(&ctx->jobWrite_mutex);
+ error |= destroyCond(&ctx->jobWrite_cond);
+ error |= destroyMutex(&ctx->compressionCompletion_mutex);
+ error |= destroyMutex(&ctx->createCompletion_mutex);
+ error |= destroyMutex(&ctx->writeCompletion_mutex);
+ error |= destroyMutex(&ctx->compressionLevel_mutex);
+ error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
+ free(ctx->input.buffer.start);
+ if (ctx->jobs){
+ freeCompressionJobs(ctx);
+ free(ctx->jobs);
+ }
+ free(ctx);
+ return error;
+ }
+}
+
+static int initMutex(mutex_t* mutex)
+{
+ int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
+ mutex->noError = !ret;
+ return ret;
+}
+
+static int initCond(cond_t* cond)
+{
+ int const ret = pthread_cond_init(&cond->pCond, NULL);
+ cond->noError = !ret;
+ return ret;
+}
+
+static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
+{
+ ctx->compressionLevel = g_compressionLevel;
+ {
+ int pthreadError = 0;
+ pthreadError |= initMutex(&ctx->jobCompressed_mutex);
+ pthreadError |= initCond(&ctx->jobCompressed_cond);
+ pthreadError |= initMutex(&ctx->jobReady_mutex);
+ pthreadError |= initCond(&ctx->jobReady_cond);
+ pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
+ pthreadError |= initCond(&ctx->allJobsCompleted_cond);
+ pthreadError |= initMutex(&ctx->jobWrite_mutex);
+ pthreadError |= initCond(&ctx->jobWrite_cond);
+ pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
+ pthreadError |= initMutex(&ctx->createCompletion_mutex);
+ pthreadError |= initMutex(&ctx->writeCompletion_mutex);
+ pthreadError |= initMutex(&ctx->compressionLevel_mutex);
+ if (pthreadError) return pthreadError;
+ }
+ ctx->numJobs = numJobs;
+ ctx->jobReadyID = 0;
+ ctx->jobCompressedID = 0;
+ ctx->jobWriteID = 0;
+ ctx->lastDictSize = 0;
+
+
+ ctx->createWaitCompressionCompletion = 1;
+ ctx->compressWaitCreateCompletion = 1;
+ ctx->compressWaitWriteCompletion = 1;
+ ctx->writeWaitCompressionCompletion = 1;
+ ctx->createCompletion = 1;
+ ctx->writeCompletion = 1;
+ ctx->compressionCompletion = 1;
+ ctx->convergenceCounter = 0;
+ ctx->cooldown = 0;
+
+ ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
+
+ if (!ctx->jobs) {
+ DISPLAY("Error: could not allocate space for jobs during context creation\n");
+ return 1;
+ }
+
+ /* initializing jobs */
+ {
+ unsigned jobNum;
+ for (jobNum=0; jobNum<numJobs; jobNum++) {
+ jobDescription* job = &ctx->jobs[jobNum];
+ job->src.start = malloc(2 * FILE_CHUNK_SIZE);
+ job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
+ job->lastJobPlusOne = 0;
+ if (!job->src.start || !job->dst.start) {
+ DISPLAY("Could not allocate buffers for jobs\n");
+ return 1;
+ }
+ job->src.capacity = FILE_CHUNK_SIZE;
+ job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE);
+ }
+ }
+
+ ctx->nextJobID = 0;
+ ctx->threadError = 0;
+ ctx->allJobsCompleted = 0;
+
+ ctx->cctx = ZSTD_createCCtx();
+ if (!ctx->cctx) {
+ DISPLAY("Error: could not allocate ZSTD_CCtx\n");
+ return 1;
+ }
+
+ ctx->input.filled = 0;
+ ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE;
+
+ ctx->input.buffer.start = malloc(ctx->input.buffer.capacity);
+ if (!ctx->input.buffer.start) {
+ DISPLAY("Error: could not allocate input buffer\n");
+ return 1;
+ }
+ return 0;
+}
+
+static adaptCCtx* createCCtx(unsigned numJobs)
+{
+
+ adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
+ if (ctx == NULL) {
+ DISPLAY("Error: could not allocate space for context\n");
+ return NULL;
+ }
+ {
+ int const error = initCCtx(ctx, numJobs);
+ if (error) {
+ freeCCtx(ctx);
+ return NULL;
+ }
+ return ctx;
+ }
+}
+
+static void signalErrorToThreads(adaptCCtx* ctx)
+{
+ ctx->threadError = 1;
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+ pthread_cond_signal(&ctx->jobReady_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+ pthread_cond_signal(&ctx->jobWrite_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
+ pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
+ pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
+}
+
+static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
+{
+ if (!ctx) return;
+ pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
+ while (ctx->allJobsCompleted == 0 && !ctx->threadError) {
+ pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
+ }
+ pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
+}
+
+/* map completion percentages to values for changing compression level */
+static unsigned convertCompletionToChange(double completion)
+{
+ if (completion < CHANGE_BY_TWO_THRESHOLD) {
+ return 2;
+ }
+ else if (completion < CHANGE_BY_ONE_THRESHOLD) {
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+/*
+ * Compression level is changed depending on which part of the compression process is lagging
+ * Currently, three theads exist for job creation, compression, and file writing respectively.
+ * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging
+ * job creation or file writing lag => increased compression level
+ * compression thread lag => decreased compression level
+ * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
+ */
+static void adaptCompressionLevel(adaptCCtx* ctx)
+{
+ double createWaitCompressionCompletion;
+ double compressWaitCreateCompletion;
+ double compressWaitWriteCompletion;
+ double writeWaitCompressionCompletion;
+ double const threshold = 0.00001;
+ unsigned prevCompressionLevel;
+
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ prevCompressionLevel = ctx->compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
+
+ if (g_forceCompressionLevel) {
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ ctx->compressionLevel = g_compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+ return;
+ }
+
+
+ DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
+
+ /* read and reset completion measurements */
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+ DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
+ DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
+ createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
+ writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+ DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
+ compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+ DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
+ compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
+ DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
+
+ assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
+
+ /* adaptation logic */
+ if (ctx->cooldown) ctx->cooldown--;
+
+ if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) {
+ /* create or write waiting on compression */
+ /* use whichever one waited less because it was slower */
+ double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
+ unsigned const change = convertCompletionToChange(completion);
+ unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
+ if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
+ /* reset convergence counter, might have been a spike */
+ ctx->convergenceCounter = 0;
+ DEBUG(2, "convergence counter reset, no change applied\n");
+ }
+ else if (boundChange != 0) {
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ ctx->compressionLevel -= boundChange;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+ ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
+ ctx->convergenceCounter = 1;
+
+ DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange);
+ }
+ }
+ else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) {
+ /* compress waiting on write */
+ double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
+ unsigned const change = convertCompletionToChange(completion);
+ unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
+ if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
+ /* reset convergence counter, might have been a spike */
+ ctx->convergenceCounter = 0;
+ DEBUG(2, "convergence counter reset, no change applied\n");
+ }
+ else if (boundChange != 0) {
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ ctx->compressionLevel += boundChange;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+ ctx->cooldown = 0;
+ ctx->convergenceCounter = 1;
+
+ DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange);
+ }
+
+ }
+
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ if (ctx->compressionLevel == prevCompressionLevel) {
+ ctx->convergenceCounter++;
+ }
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+}
+
+static size_t getUseableDictSize(unsigned compressionLevel)
+{
+ ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
+ unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3;
+ size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog);
+ return overlapSize;
+}
+
+static void* compressionThread(void* arg)
+{
+ adaptCCtx* const ctx = (adaptCCtx*)arg;
+ unsigned currJob = 0;
+ for ( ; ; ) {
+ unsigned const currJobIndex = currJob % ctx->numJobs;
+ jobDescription* const job = &ctx->jobs[currJobIndex];
+ DEBUG(2, "starting compression for job %u\n", currJob);
+
+ {
+ /* check if compression thread will have to wait */
+ unsigned willWaitForCreate = 0;
+ unsigned willWaitForWrite = 0;
+
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+ if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+ if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+
+
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+ if (willWaitForCreate) {
+ DEBUG(2, "compression will wait for create on job %u\n", currJob);
+ ctx->compressWaitCreateCompletion = ctx->createCompletion;
+ DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
+
+ }
+ else {
+ ctx->compressWaitCreateCompletion = 1;
+ }
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+ if (willWaitForWrite) {
+ DEBUG(2, "compression will wait for write on job %u\n", currJob);
+ ctx->compressWaitWriteCompletion = ctx->writeCompletion;
+ DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
+ }
+ else {
+ ctx->compressWaitWriteCompletion = 1;
+ }
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
+
+ }
+
+ /* wait until job is ready */
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+ while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
+ pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
+ }
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+ /* wait until job previously in this space is written */
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+ while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+ pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
+ }
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+ /* reset compression completion */
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+ ctx->compressionCompletion = 0;
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+ /* adapt compression level */
+ if (currJob) adaptCompressionLevel(ctx);
+
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
+ /* compress the data */
+ {
+ size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
+ unsigned cLevel;
+ unsigned blockNum = 0;
+ size_t remaining = job->src.size;
+ size_t srcPos = 0;
+ size_t dstPos = 0;
+
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ cLevel = ctx->compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
+ /* reset compressed size */
+ job->compressedSize = 0;
+ DEBUG(2, "calling ZSTD_compressBegin()\n");
+ /* begin compression */
+ {
+ size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
+ size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
+ ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize);
+ params.cParams.windowLog = 23;
+ {
+ size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0);
+ size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
+ if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
+ DISPLAY("Error: something went wrong while starting compression\n");
+ signalErrorToThreads(ctx);
+ return arg;
+ }
+ }
+ }
+ DEBUG(2, "finished with ZSTD_compressBegin()\n");
+
+ do {
+ size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
+
+ /* continue compression */
+ if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
+ size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
+ if (ZSTD_isError(hSize)) {
+ DISPLAY("Error: something went wrong while continuing compression\n");
+ job->compressedSize = hSize;
+ signalErrorToThreads(ctx);
+ return arg;
+ }
+ ZSTD_invalidateRepCodes(ctx->cctx);
+ }
+ {
+ size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
+ ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
+ ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
+ if (ZSTD_isError(ret)) {
+ DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
+ signalErrorToThreads(ctx);
+ return arg;
+ }
+ job->compressedSize += ret;
+ remaining -= actualBlockSize;
+ srcPos += actualBlockSize;
+ dstPos += ret;
+ blockNum++;
+
+ /* update completion */
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+ ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+ }
+ } while (remaining != 0);
+ job->dst.size = job->compressedSize;
+ }
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ ctx->jobCompressedID++;
+ pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+ if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
+ /* finished compressing all jobs */
+ break;
+ }
+ DEBUG(2, "finished compressing job %u\n", currJob);
+ currJob++;
+ }
+ return arg;
+}
+
+static void displayProgress(unsigned cLevel, unsigned last)
+{
+ UTIL_time_t currTime;
+ UTIL_getTime(&currTime);
+ if (!g_useProgressBar) return;
+ {
+ double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_ticksPerSecond, g_startTime, currTime) / 1000.0);
+ double const sizeMB = (double)g_streamedSize / (1 << 20);
+ double const avgCompRate = sizeMB * 1000 / timeElapsed;
+ fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate);
+ if (last) {
+ fprintf(stderr, "\n");
+ }
+ else {
+ fflush(stderr);
+ }
+ }
+}
+
+static void* outputThread(void* arg)
+{
+ outputThreadArg* const otArg = (outputThreadArg*)arg;
+ adaptCCtx* const ctx = otArg->ctx;
+ FILE* const dstFile = otArg->dstFile;
+
+ unsigned currJob = 0;
+ for ( ; ; ) {
+ unsigned const currJobIndex = currJob % ctx->numJobs;
+ jobDescription* const job = &ctx->jobs[currJobIndex];
+ unsigned willWaitForCompress = 0;
+ DEBUG(2, "starting write for job %u\n", currJob);
+
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+ if (willWaitForCompress) {
+ /* write thread is waiting on compression thread */
+ ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
+ DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
+ }
+ else {
+ ctx->writeWaitCompressionCompletion = 1;
+ }
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
+ pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
+ }
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+ /* reset write completion */
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+ ctx->writeCompletion = 0;
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
+
+ {
+ size_t const compressedSize = job->compressedSize;
+ size_t remaining = compressedSize;
+ if (ZSTD_isError(compressedSize)) {
+ DISPLAY("Error: an error occurred during compression\n");
+ signalErrorToThreads(ctx);
+ return arg;
+ }
+ {
+ size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
+ size_t pos = 0;
+ for ( ; ; ) {
+ size_t const writeSize = MIN(remaining, blockSize);
+ size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
+ if (ret != writeSize) break;
+ pos += ret;
+ remaining -= ret;
+
+ /* update completion variable for writing */
+ pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+ ctx->writeCompletion = 1 - (double)remaining/compressedSize;
+ pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
+
+ if (remaining == 0) break;
+ }
+ if (pos != compressedSize) {
+ DISPLAY("Error: an error occurred during file write operation\n");
+ signalErrorToThreads(ctx);
+ return arg;
+ }
+ }
+ }
+ {
+ unsigned cLevel;
+ pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+ cLevel = ctx->compressionLevel;
+ pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+ displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
+ }
+ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+ ctx->jobWriteID++;
+ pthread_cond_signal(&ctx->jobWrite_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+
+ if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
+ /* finished with all jobs */
+ pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
+ ctx->allJobsCompleted = 1;
+ pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
+ pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
+ break;
+ }
+ DEBUG(2, "finished writing job %u\n", currJob);
+ currJob++;
+
+ }
+ return arg;
+}
+
+static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
+{
+ unsigned const nextJob = ctx->nextJobID;
+ unsigned const nextJobIndex = nextJob % ctx->numJobs;
+ jobDescription* const job = &ctx->jobs[nextJobIndex];
+
+
+ job->src.size = srcSize;
+ job->jobID = nextJob;
+ if (last) job->lastJobPlusOne = nextJob + 1;
+ {
+ /* swap buffer */
+ void* const copy = job->src.start;
+ job->src.start = ctx->input.buffer.start;
+ ctx->input.buffer.start = copy;
+ }
+ job->dictSize = ctx->lastDictSize;
+
+ ctx->nextJobID++;
+ /* if not on the last job, reuse data as dictionary in next job */
+ if (!last) {
+ size_t const oldDictSize = ctx->lastDictSize;
+ memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize);
+ ctx->lastDictSize = srcSize;
+ ctx->input.filled = srcSize;
+ }
+
+ /* signal job ready */
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+ ctx->jobReadyID++;
+ pthread_cond_signal(&ctx->jobReady_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+ return 0;
+}
+
+static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
+{
+ /* early error check to exit */
+ if (!ctx || !srcFile || !otArg) {
+ return 1;
+ }
+
+ /* create output thread */
+ {
+ pthread_t out;
+ if (pthread_create(&out, NULL, &outputThread, otArg)) {
+ DISPLAY("Error: could not create output thread\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ else if (pthread_detach(out)) {
+ DISPLAY("Error: could not detach output thread\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ }
+
+ /* create compression thread */
+ {
+ pthread_t compression;
+ if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
+ DISPLAY("Error: could not create compression thread\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ else if (pthread_detach(compression)) {
+ DISPLAY("Error: could not detach compression thread\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ }
+ {
+ unsigned currJob = 0;
+ /* creating jobs */
+ for ( ; ; ) {
+ size_t pos = 0;
+ size_t const readBlockSize = 1 << 15;
+ size_t remaining = FILE_CHUNK_SIZE;
+ unsigned const nextJob = ctx->nextJobID;
+ unsigned willWaitForCompress = 0;
+ DEBUG(2, "starting creation of job %u\n", currJob);
+
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+ if (willWaitForCompress) {
+ /* creation thread is waiting, take measurement of completion */
+ ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
+ DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
+ }
+ else {
+ ctx->createWaitCompressionCompletion = 1;
+ }
+ pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+ /* wait until the job has been compressed */
+ pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+ while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
+ pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
+ }
+ pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+ /* reset create completion */
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+ ctx->createCompletion = 0;
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
+
+ while (remaining != 0 && !feof(srcFile)) {
+ size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
+ if (ret != readBlockSize && !feof(srcFile)) {
+ /* error could not read correct number of bytes */
+ DISPLAY("Error: problem occurred during read from src file\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ pos += ret;
+ remaining -= ret;
+ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+ ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
+ pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
+ }
+ if (remaining != 0 && !feof(srcFile)) {
+ DISPLAY("Error: problem occurred during read from src file\n");
+ signalErrorToThreads(ctx);
+ return 1;
+ }
+ g_streamedSize += pos;
+ /* reading was fine, now create the compression job */
+ {
+ int const last = feof(srcFile);
+ int const error = createCompressionJob(ctx, pos, last);
+ if (error != 0) {
+ signalErrorToThreads(ctx);
+ return error;
+ }
+ }
+ DEBUG(2, "finished creating job %u\n", currJob);
+ currJob++;
+ if (feof(srcFile)) {
+ break;
+ }
+ }
+ }
+ /* success -- created all jobs */
+ return 0;
+}
+
+static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull)
+{
+ fcResources fcr;
+ unsigned const stdinUsed = !strcmp(srcFilename, stdinmark);
+ FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb");
+ const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull;
+ const char* outFilename = outFilenameIntermediate;
+ char fileAndSuffix[MAX_PATH];
+ size_t const numJobs = MAX_NUM_JOBS;
+
+ memset(&fcr, 0, sizeof(fcr));
+
+ if (!outFilenameIntermediate) {
+ if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) {
+ DISPLAY("Error: output filename is too long\n");
+ return fcr;
+ }
+ outFilename = fileAndSuffix;
+ }
+
+ {
+ unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark);
+ FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb");
+ fcr.otArg = malloc(sizeof(outputThreadArg));
+ if (!fcr.otArg) {
+ DISPLAY("Error: could not allocate space for output thread argument\n");
+ return fcr;
+ }
+ fcr.otArg->dstFile = dstFile;
+ }
+ /* checking for errors */
+ if (!fcr.otArg->dstFile || !srcFile) {
+ DISPLAY("Error: some file(s) could not be opened\n");
+ return fcr;
+ }
+
+ /* creating context */
+ fcr.ctx = createCCtx(numJobs);
+ fcr.otArg->ctx = fcr.ctx;
+ fcr.srcFile = srcFile;
+ return fcr;
+}
+
+static int freeFileCompressionResources(fcResources* fcr)
+{
+ int ret = 0;
+ waitUntilAllJobsCompleted(fcr->ctx);
+ ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
+ ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
+ if (fcr->otArg) {
+ ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0;
+ free(fcr->otArg);
+ /* no need to freeCCtx() on otArg->ctx because it should be the same context */
+ }
+ return ret;
+}
+
+static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull)
+{
+ int ret = 0;
+ fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull);
+ UTIL_getTime(&g_startTime);
+ g_streamedSize = 0;
+ ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg);
+ ret |= freeFileCompressionResources(&fcr);
+ return ret;
+}
+
+static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout)
+{
+ int ret = 0;
+ unsigned fileNum;
+ for (fileNum=0; fileNum<numFiles; fileNum++) {
+ const char* filename = filenameTable[fileNum];
+ if (!forceStdout) {
+ ret |= compressFilename(filename, NULL);
+ }
+ else {
+ ret |= compressFilename(filename, stdoutmark);
+ }
+
+ }
+ return ret;
+}
+
+/*! readU32FromChar() :
+ @return : unsigned integer value read from input in `char` format
+ allows and interprets K, KB, KiB, M, MB and MiB suffix.
+ Will also modify `*stringPtr`, advancing it to position where it stopped reading.
+ Note : function result can overflow if digit string > MAX_UINT */
+static unsigned readU32FromChar(const char** stringPtr)
+{
+ unsigned result = 0;
+ while ((**stringPtr >='0') && (**stringPtr <='9'))
+ result *= 10, result += **stringPtr - '0', (*stringPtr)++ ;
+ if ((**stringPtr=='K') || (**stringPtr=='M')) {
+ result <<= 10;
+ if (**stringPtr=='M') result <<= 10;
+ (*stringPtr)++ ;
+ if (**stringPtr=='i') (*stringPtr)++;
+ if (**stringPtr=='B') (*stringPtr)++;
+ }
+ return result;
+}
+
+static void help(const char* progPath)
+{
+ PRINT("Usage:\n");
+ PRINT(" %s [options] [file(s)]\n", progPath);
+ PRINT("\n");
+ PRINT("Options:\n");
+ PRINT(" -oFILE : specify the output file name\n");
+ PRINT(" -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL);
+ PRINT(" -h : display help/information\n");
+ PRINT(" -f : force the compression level to stay constant\n");
+ PRINT(" -c : force write to stdout\n");
+ PRINT(" -p : hide progress bar\n");
+ PRINT(" -q : quiet mode -- do not show progress bar or other information\n");
+ PRINT(" -l# : provide lower bound for compression level -- default 1\n");
+ PRINT(" -u# : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel());
+}
+/* return 0 if successful, else return error */
+int main(int argCount, const char* argv[])
+{
+ const char* outFilename = NULL;
+ const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*));
+ unsigned filenameIdx = 0;
+ unsigned forceStdout = 0;
+ unsigned providedInitialCLevel = 0;
+ int ret = 0;
+ int argNum;
+ filenameTable[0] = stdinmark;
+ g_maxCLevel = ZSTD_maxCLevel();
+
+ UTIL_initTimer(&g_ticksPerSecond);
+
+ if (filenameTable == NULL) {
+ DISPLAY("Error: could not allocate sapce for filename table.\n");
+ return 1;
+ }
+
+ for (argNum=1; argNum<argCount; argNum++) {
+ const char* argument = argv[argNum];
+
+ /* output filename designated with "-o" */
+ if (argument[0]=='-' && strlen(argument) > 1) {
+ switch (argument[1]) {
+ case 'o':
+ argument += 2;
+ outFilename = argument;
+ break;
+ case 'i':
+ argument += 2;
+ g_compressionLevel = readU32FromChar(&argument);
+ providedInitialCLevel = 1;
+ break;
+ case 'h':
+ help(argv[0]);
+ goto _main_exit;
+ case 'p':
+ g_useProgressBar = 0;
+ break;
+ case 'c':
+ forceStdout = 1;
+ outFilename = stdoutmark;
+ break;
+ case 'f':
+ g_forceCompressionLevel = 1;
+ break;
+ case 'q':
+ g_useProgressBar = 0;
+ g_displayLevel = 0;
+ break;
+ case 'l':
+ argument += 2;
+ g_minCLevel = readU32FromChar(&argument);
+ break;
+ case 'u':
+ argument += 2;
+ g_maxCLevel = readU32FromChar(&argument);
+ break;
+ default:
+ DISPLAY("Error: invalid argument provided\n");
+ ret = 1;
+ goto _main_exit;
+ }
+ continue;
+ }
+
+ /* regular files to be compressed */
+ filenameTable[filenameIdx++] = argument;
+ }
+
+ /* check initial, max, and min compression levels */
+ {
+ unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel;
+ unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel;
+ if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) {
+ DISPLAY("Error: provided compression level parameters are invalid\n");
+ ret = 1;
+ goto _main_exit;
+ }
+ else if (initialNotInRange) {
+ g_compressionLevel = g_minCLevel;
+ }
+ }
+
+ /* error checking with number of files */
+ if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) {
+ DISPLAY("Error: multiple input files provided, cannot use specified output file\n");
+ ret = 1;
+ goto _main_exit;
+ }
+
+ /* compress files */
+ if (filenameIdx <= 1) {
+ ret |= compressFilename(filenameTable[0], outFilename);
+ }
+ else {
+ ret |= compressFilenames(filenameTable, filenameIdx, forceStdout);
+ }
+_main_exit:
+ free(filenameTable);
+ return ret;
+}