diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/zstd/contrib/pzstd | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/zstd/contrib/pzstd')
36 files changed, 4286 insertions, 0 deletions
diff --git a/src/zstd/contrib/pzstd/.gitignore b/src/zstd/contrib/pzstd/.gitignore new file mode 100644 index 00000000..84e68fb0 --- /dev/null +++ b/src/zstd/contrib/pzstd/.gitignore @@ -0,0 +1,2 @@ +# compilation result +pzstd diff --git a/src/zstd/contrib/pzstd/BUCK b/src/zstd/contrib/pzstd/BUCK new file mode 100644 index 00000000..d04eeedd --- /dev/null +++ b/src/zstd/contrib/pzstd/BUCK @@ -0,0 +1,72 @@ +cxx_library( + name='libpzstd', + visibility=['PUBLIC'], + header_namespace='', + exported_headers=[ + 'ErrorHolder.h', + 'Logging.h', + 'Pzstd.h', + ], + headers=[ + 'SkippableFrame.h', + ], + srcs=[ + 'Pzstd.cpp', + 'SkippableFrame.cpp', + ], + deps=[ + ':options', + '//contrib/pzstd/utils:utils', + '//lib:mem', + '//lib:zstd', + ], +) + +cxx_library( + name='options', + visibility=['PUBLIC'], + header_namespace='', + exported_headers=['Options.h'], + srcs=['Options.cpp'], + deps=[ + '//contrib/pzstd/utils:scope_guard', + '//lib:zstd', + '//programs:util', + ], +) + +cxx_binary( + name='pzstd', + visibility=['PUBLIC'], + srcs=['main.cpp'], + deps=[ + ':libpzstd', + ':options', + ], +) + +# Must run "make googletest" first +cxx_library( + name='gtest', + srcs=glob([ + 'googletest/googletest/src/gtest-all.cc', + 'googletest/googlemock/src/gmock-all.cc', + 'googletest/googlemock/src/gmock_main.cc', + ]), + header_namespace='', + exported_headers=subdir_glob([ + ('googletest/googletest/include', '**/*.h'), + ('googletest/googlemock/include', '**/*.h'), + ]), + headers=subdir_glob([ + ('googletest/googletest', 'src/*.cc'), + ('googletest/googletest', 'src/*.h'), + ('googletest/googlemock', 'src/*.cc'), + ('googletest/googlemock', 'src/*.h'), + ]), + platform_linker_flags=[ + ('android', []), + ('', ['-lpthread']), + ], + visibility=['PUBLIC'], +) diff --git a/src/zstd/contrib/pzstd/ErrorHolder.h b/src/zstd/contrib/pzstd/ErrorHolder.h new file mode 100644 index 00000000..829651c5 --- /dev/null +++ b/src/zstd/contrib/pzstd/ErrorHolder.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include <atomic> +#include <cassert> +#include <stdexcept> +#include <string> + +namespace pzstd { + +// Coordinates graceful shutdown of the pzstd pipeline +class ErrorHolder { + std::atomic<bool> error_; + std::string message_; + + public: + ErrorHolder() : error_(false) {} + + bool hasError() noexcept { + return error_.load(); + } + + void setError(std::string message) noexcept { + // Given multiple possibly concurrent calls, exactly one will ever succeed. + bool expected = false; + if (error_.compare_exchange_strong(expected, true)) { + message_ = std::move(message); + } + } + + bool check(bool predicate, std::string message) noexcept { + if (!predicate) { + setError(std::move(message)); + } + return !hasError(); + } + + std::string getError() noexcept { + error_.store(false); + return std::move(message_); + } + + ~ErrorHolder() { + assert(!hasError()); + } +}; +} diff --git a/src/zstd/contrib/pzstd/Logging.h b/src/zstd/contrib/pzstd/Logging.h new file mode 100644 index 00000000..16a63932 --- /dev/null +++ b/src/zstd/contrib/pzstd/Logging.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include <cstdio> +#include <mutex> + +namespace pzstd { + +constexpr int ERROR = 1; +constexpr int INFO = 2; +constexpr int DEBUG = 3; +constexpr int VERBOSE = 4; + +class Logger { + std::mutex mutex_; + FILE* out_; + const int level_; + + using Clock = std::chrono::system_clock; + Clock::time_point lastUpdate_; + std::chrono::milliseconds refreshRate_; + + public: + explicit Logger(int level, FILE* out = stderr) + : out_(out), level_(level), lastUpdate_(Clock::now()), + refreshRate_(150) {} + + + bool logsAt(int level) { + return level <= level_; + } + + template <typename... Args> + void operator()(int level, const char *fmt, Args... args) { + if (level > level_) { + return; + } + std::lock_guard<std::mutex> lock(mutex_); + std::fprintf(out_, fmt, args...); + } + + template <typename... Args> + void update(int level, const char *fmt, Args... args) { + if (level > level_) { + return; + } + std::lock_guard<std::mutex> lock(mutex_); + auto now = Clock::now(); + if (now - lastUpdate_ > refreshRate_) { + lastUpdate_ = now; + std::fprintf(out_, "\r"); + std::fprintf(out_, fmt, args...); + } + } + + void clear(int level) { + if (level > level_) { + return; + } + std::lock_guard<std::mutex> lock(mutex_); + std::fprintf(out_, "\r%79s\r", ""); + } +}; + +} diff --git a/src/zstd/contrib/pzstd/Makefile b/src/zstd/contrib/pzstd/Makefile new file mode 100644 index 00000000..40531e21 --- /dev/null +++ b/src/zstd/contrib/pzstd/Makefile @@ -0,0 +1,269 @@ +# ################################################################ +# Copyright (c) 2016-present, Facebook, Inc. +# All rights reserved. +# +# This source code is licensed under both the BSD-style license (found in the +# LICENSE file in the root directory of this source tree) and the GPLv2 (found +# in the COPYING file in the root directory of this source tree). +# ################################################################ + +# Standard variables for installation +DESTDIR ?= +PREFIX ?= /usr/local +BINDIR := $(DESTDIR)$(PREFIX)/bin + +ZSTDDIR = ../../lib +PROGDIR = ../../programs + +# External program to use to run tests, e.g. qemu or valgrind +TESTPROG ?= +# Flags to pass to the tests +TESTFLAGS ?= + +# We use gcc/clang to generate the header dependencies of files +DEPFLAGS = -MMD -MP -MF $*.Td +POSTCOMPILE = mv -f $*.Td $*.d + +# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override +CFLAGS ?= -O3 -Wall -Wextra +CXXFLAGS ?= -O3 -Wall -Wextra -pedantic +CPPFLAGS ?= +LDFLAGS ?= + +# Include flags +PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I. +GTEST_INC = -isystem googletest/googletest/include + +PZSTD_CPPFLAGS = $(PZSTD_INC) +PZSTD_CCXXFLAGS = +PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS) +PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS) -std=c++11 +PZSTD_LDFLAGS = +EXTRA_FLAGS = +ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS) +ALL_CXXFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS) +ALL_LDFLAGS = $(EXTRA_FLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS) + + +# gtest libraries need to go before "-lpthread" because they depend on it. +GTEST_LIB = -L googletest/build/googlemock/gtest +LIBS = + +# Compilation commands +LD_COMMAND = $(CXX) $^ $(ALL_LDFLAGS) $(LIBS) -lpthread -o $@ +CC_COMMAND = $(CC) $(DEPFLAGS) $(ALL_CFLAGS) -c $< -o $@ +CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $< -o $@ + +# Get a list of all zstd files so we rebuild the static library when we need to +ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \ + $(wildcard $(ZSTDDIR)/common/*.h) +ZSTDCOMP_FILES := $(wildcard $(ZSTDDIR)/compress/*.c) \ + $(wildcard $(ZSTDDIR)/compress/*.h) +ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \ + $(wildcard $(ZSTDDIR)/decompress/*.h) +ZSTDPROG_FILES := $(wildcard $(PROGDIR)/*.c) \ + $(wildcard $(PROGDIR)/*.h) +ZSTD_FILES := $(wildcard $(ZSTDDIR)/*.h) \ + $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \ + $(ZSTDPROG_FILES) + +# List all the pzstd source files so we can determine their dependencies +PZSTD_SRCS := $(wildcard *.cpp) +PZSTD_TESTS := $(wildcard test/*.cpp) +UTILS_TESTS := $(wildcard utils/test/*.cpp) +ALL_SRCS := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS) + + +# Define *.exe as extension for Windows systems +ifneq (,$(filter Windows%,$(OS))) +EXT =.exe +else +EXT = +endif + +# Standard targets +.PHONY: default +default: all + +.PHONY: test-pzstd +test-pzstd: TESTFLAGS=--gtest_filter=-*ExtremelyLarge* +test-pzstd: clean googletest pzstd tests check + +.PHONY: test-pzstd32 +test-pzstd32: clean googletest32 all32 check + +.PHONY: test-pzstd-tsan +test-pzstd-tsan: LDFLAGS=-fuse-ld=gold +test-pzstd-tsan: TESTFLAGS=--gtest_filter=-*ExtremelyLarge* +test-pzstd-tsan: clean googletest tsan check + +.PHONY: test-pzstd-asan +test-pzstd-asan: LDFLAGS=-fuse-ld=gold +test-pzstd-asan: TESTFLAGS=--gtest_filter=-*ExtremelyLarge* +test-pzstd-asan: clean asan check + +.PHONY: check +check: + $(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS) + +.PHONY: install +install: PZSTD_CPPFLAGS += -DNDEBUG +install: pzstd$(EXT) + install -d -m 755 $(BINDIR)/ + install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT) + +.PHONY: uninstall +uninstall: + $(RM) $(BINDIR)/pzstd$(EXT) + +# Targets for many different builds +.PHONY: all +all: PZSTD_CPPFLAGS += -DNDEBUG +all: pzstd$(EXT) + +.PHONY: debug +debug: EXTRA_FLAGS += -g +debug: pzstd$(EXT) tests roundtrip + +.PHONY: tsan +tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC +tsan: PZSTD_LDFLAGS += -fsanitize=thread +tsan: debug + +.PHONY: asan +asan: EXTRA_FLAGS += -fsanitize=address +asan: debug + +.PHONY: ubsan +ubsan: EXTRA_FLAGS += -fsanitize=undefined +ubsan: debug + +.PHONY: all32 +all32: EXTRA_FLAGS += -m32 +all32: all tests roundtrip + +.PHONY: debug32 +debug32: EXTRA_FLAGS += -m32 +debug32: debug + +.PHONY: asan32 +asan32: EXTRA_FLAGS += -m32 +asan32: asan + +.PHONY: tsan32 +tsan32: EXTRA_FLAGS += -m32 +tsan32: tsan + +.PHONY: ubsan32 +ubsan32: EXTRA_FLAGS += -m32 +ubsan32: ubsan + +# Run long round trip tests +.PHONY: roundtripcheck +roundtripcheck: roundtrip check + $(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS) + +# Build the main binary +pzstd$(EXT): main.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a + $(LD_COMMAND) + +# Target that depends on all the tests +.PHONY: tests +tests: EXTRA_FLAGS += -Wno-deprecated-declarations +tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS))) + +# Build the round trip tests +.PHONY: roundtrip +roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations +roundtrip: test/RoundTripTest$(EXT) + +# Use the static library that zstd builds for simplicity and +# so we get the compiler options correct +$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES) + CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)" $(MAKE) -C $(ZSTDDIR) libzstd.a + +# Rules to build the tests +test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o Options.o \ + Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a + $(LD_COMMAND) + +test/%Test$(EXT): PZSTD_LDFLAGS += $(GTEST_LIB) +test/%Test$(EXT): LIBS += -lgtest -lgtest_main +test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o Options.o Pzstd.o \ + SkippableFrame.o $(ZSTDDIR)/libzstd.a + $(LD_COMMAND) + +utils/test/%Test$(EXT): PZSTD_LDFLAGS += $(GTEST_LIB) +utils/test/%Test$(EXT): LIBS += -lgtest -lgtest_main +utils/test/%Test$(EXT): utils/test/%Test.o + $(LD_COMMAND) + + +GTEST_CMAKEFLAGS = + +# Install googletest +.PHONY: googletest +googletest: PZSTD_CCXXFLAGS += -fPIC +googletest: + @$(RM) -rf googletest + @git clone https://github.com/google/googletest + @mkdir -p googletest/build + @cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE) + +.PHONY: googletest32 +googletest32: PZSTD_CCXXFLAGS += -m32 +googletest32: googletest + +.PHONY: googletest-mingw64 +googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles" +googletest-mingw64: googletest + +.PHONY: clean +clean: + $(RM) -f *.o pzstd$(EXT) *.Td *.d + $(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d + $(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d + $(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d + $(MAKE) -C $(ZSTDDIR) clean + @echo Cleaning completed + + +# Cancel implicit rules +%.o: %.c +%.o: %.cpp + +# Object file rules +%.o: %.c + $(CC_COMMAND) + $(POSTCOMPILE) + +$(PROGDIR)/%.o: $(PROGDIR)/%.c + $(CC_COMMAND) + $(POSTCOMPILE) + +%.o: %.cpp + $(CXX_COMMAND) + $(POSTCOMPILE) + +test/%.o: PZSTD_CPPFLAGS += $(GTEST_INC) +test/%.o: test/%.cpp + $(CXX_COMMAND) + $(POSTCOMPILE) + +utils/test/%.o: PZSTD_CPPFLAGS += $(GTEST_INC) +utils/test/%.o: utils/test/%.cpp + $(CXX_COMMAND) + $(POSTCOMPILE) + +# Dependency file stuff +.PRECIOUS: %.d test/%.d utils/test/%.d + +# Include rules that specify header file dependencies +-include $(patsubst %,%.d,$(basename $(ALL_SRCS))) diff --git a/src/zstd/contrib/pzstd/Options.cpp b/src/zstd/contrib/pzstd/Options.cpp new file mode 100644 index 00000000..d9b216b4 --- /dev/null +++ b/src/zstd/contrib/pzstd/Options.cpp @@ -0,0 +1,439 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "Options.h" +#include "util.h" +#include "utils/ScopeGuard.h" + +#include <algorithm> +#include <cassert> +#include <cstdio> +#include <cstring> +#include <iterator> +#include <thread> +#include <vector> + +#if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(_WIN32) || \ + defined(__CYGWIN__) +#include <io.h> /* _isatty */ +#define IS_CONSOLE(stdStream) _isatty(_fileno(stdStream)) +#elif defined(_POSIX_C_SOURCE) || defined(_XOPEN_SOURCE) || defined(_POSIX_SOURCE) || (defined(__APPLE__) && defined(__MACH__)) || \ + defined(__DragonFly__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) /* https://sourceforge.net/p/predef/wiki/OperatingSystems/ */ +#include <unistd.h> /* isatty */ +#define IS_CONSOLE(stdStream) isatty(fileno(stdStream)) +#else +#define IS_CONSOLE(stdStream) 0 +#endif + +namespace pzstd { + +namespace { +unsigned defaultNumThreads() { +#ifdef PZSTD_NUM_THREADS + return PZSTD_NUM_THREADS; +#else + return std::thread::hardware_concurrency(); +#endif +} + +unsigned parseUnsigned(const char **arg) { + unsigned result = 0; + while (**arg >= '0' && **arg <= '9') { + result *= 10; + result += **arg - '0'; + ++(*arg); + } + return result; +} + +const char *getArgument(const char *options, const char **argv, int &i, + int argc) { + if (options[1] != 0) { + return options + 1; + } + ++i; + if (i == argc) { + std::fprintf(stderr, "Option -%c requires an argument, but none provided\n", + *options); + return nullptr; + } + return argv[i]; +} + +const std::string kZstdExtension = ".zst"; +constexpr char kStdIn[] = "-"; +constexpr char kStdOut[] = "-"; +constexpr unsigned kDefaultCompressionLevel = 3; +constexpr unsigned kMaxNonUltraCompressionLevel = 19; + +#ifdef _WIN32 +const char nullOutput[] = "nul"; +#else +const char nullOutput[] = "/dev/null"; +#endif + +void notSupported(const char *option) { + std::fprintf(stderr, "Operation not supported: %s\n", option); +} + +void usage() { + std::fprintf(stderr, "Usage:\n"); + std::fprintf(stderr, " pzstd [args] [FILE(s)]\n"); + std::fprintf(stderr, "Parallel ZSTD options:\n"); + std::fprintf(stderr, " -p, --processes # : number of threads to use for (de)compression (default:%d)\n", defaultNumThreads()); + + std::fprintf(stderr, "ZSTD options:\n"); + std::fprintf(stderr, " -# : # compression level (1-%d, default:%d)\n", kMaxNonUltraCompressionLevel, kDefaultCompressionLevel); + std::fprintf(stderr, " -d, --decompress : decompression\n"); + std::fprintf(stderr, " -o file : result stored into `file` (only if 1 input file)\n"); + std::fprintf(stderr, " -f, --force : overwrite output without prompting, (de)compress links\n"); + std::fprintf(stderr, " --rm : remove source file(s) after successful (de)compression\n"); + std::fprintf(stderr, " -k, --keep : preserve source file(s) (default)\n"); + std::fprintf(stderr, " -h, --help : display help and exit\n"); + std::fprintf(stderr, " -V, --version : display version number and exit\n"); + std::fprintf(stderr, " -v, --verbose : verbose mode; specify multiple times to increase log level (default:2)\n"); + std::fprintf(stderr, " -q, --quiet : suppress warnings; specify twice to suppress errors too\n"); + std::fprintf(stderr, " -c, --stdout : force write to standard output, even if it is the console\n"); +#ifdef UTIL_HAS_CREATEFILELIST + std::fprintf(stderr, " -r : operate recursively on directories\n"); +#endif + std::fprintf(stderr, " --ultra : enable levels beyond %i, up to %i (requires more memory)\n", kMaxNonUltraCompressionLevel, ZSTD_maxCLevel()); + std::fprintf(stderr, " -C, --check : integrity check (default)\n"); + std::fprintf(stderr, " --no-check : no integrity check\n"); + std::fprintf(stderr, " -t, --test : test compressed file integrity\n"); + std::fprintf(stderr, " -- : all arguments after \"--\" are treated as files\n"); +} +} // anonymous namespace + +Options::Options() + : numThreads(defaultNumThreads()), maxWindowLog(23), + compressionLevel(kDefaultCompressionLevel), decompress(false), + overwrite(false), keepSource(true), writeMode(WriteMode::Auto), + checksum(true), verbosity(2) {} + +Options::Status Options::parse(int argc, const char **argv) { + bool test = false; + bool recursive = false; + bool ultra = false; + bool forceStdout = false; + bool followLinks = false; + // Local copy of input files, which are pointers into argv. + std::vector<const char *> localInputFiles; + for (int i = 1; i < argc; ++i) { + const char *arg = argv[i]; + // Protect against empty arguments + if (arg[0] == 0) { + continue; + } + // Everything after "--" is an input file + if (!std::strcmp(arg, "--")) { + ++i; + std::copy(argv + i, argv + argc, std::back_inserter(localInputFiles)); + break; + } + // Long arguments that don't have a short option + { + bool isLongOption = true; + if (!std::strcmp(arg, "--rm")) { + keepSource = false; + } else if (!std::strcmp(arg, "--ultra")) { + ultra = true; + maxWindowLog = 0; + } else if (!std::strcmp(arg, "--no-check")) { + checksum = false; + } else if (!std::strcmp(arg, "--sparse")) { + writeMode = WriteMode::Sparse; + notSupported("Sparse mode"); + return Status::Failure; + } else if (!std::strcmp(arg, "--no-sparse")) { + writeMode = WriteMode::Regular; + notSupported("Sparse mode"); + return Status::Failure; + } else if (!std::strcmp(arg, "--dictID")) { + notSupported(arg); + return Status::Failure; + } else if (!std::strcmp(arg, "--no-dictID")) { + notSupported(arg); + return Status::Failure; + } else { + isLongOption = false; + } + if (isLongOption) { + continue; + } + } + // Arguments with a short option simply set their short option. + const char *options = nullptr; + if (!std::strcmp(arg, "--processes")) { + options = "p"; + } else if (!std::strcmp(arg, "--version")) { + options = "V"; + } else if (!std::strcmp(arg, "--help")) { + options = "h"; + } else if (!std::strcmp(arg, "--decompress")) { + options = "d"; + } else if (!std::strcmp(arg, "--force")) { + options = "f"; + } else if (!std::strcmp(arg, "--stdout")) { + options = "c"; + } else if (!std::strcmp(arg, "--keep")) { + options = "k"; + } else if (!std::strcmp(arg, "--verbose")) { + options = "v"; + } else if (!std::strcmp(arg, "--quiet")) { + options = "q"; + } else if (!std::strcmp(arg, "--check")) { + options = "C"; + } else if (!std::strcmp(arg, "--test")) { + options = "t"; + } else if (arg[0] == '-' && arg[1] != 0) { + options = arg + 1; + } else { + localInputFiles.emplace_back(arg); + continue; + } + assert(options != nullptr); + + bool finished = false; + while (!finished && *options != 0) { + // Parse the compression level + if (*options >= '0' && *options <= '9') { + compressionLevel = parseUnsigned(&options); + continue; + } + + switch (*options) { + case 'h': + case 'H': + usage(); + return Status::Message; + case 'V': + std::fprintf(stderr, "PZSTD version: %s.\n", ZSTD_VERSION_STRING); + return Status::Message; + case 'p': { + finished = true; + const char *optionArgument = getArgument(options, argv, i, argc); + if (optionArgument == nullptr) { + return Status::Failure; + } + if (*optionArgument < '0' || *optionArgument > '9') { + std::fprintf(stderr, "Option -p expects a number, but %s provided\n", + optionArgument); + return Status::Failure; + } + numThreads = parseUnsigned(&optionArgument); + if (*optionArgument != 0) { + std::fprintf(stderr, + "Option -p expects a number, but %u%s provided\n", + numThreads, optionArgument); + return Status::Failure; + } + break; + } + case 'o': { + finished = true; + const char *optionArgument = getArgument(options, argv, i, argc); + if (optionArgument == nullptr) { + return Status::Failure; + } + outputFile = optionArgument; + break; + } + case 'C': + checksum = true; + break; + case 'k': + keepSource = true; + break; + case 'd': + decompress = true; + break; + case 'f': + overwrite = true; + forceStdout = true; + followLinks = true; + break; + case 't': + test = true; + decompress = true; + break; +#ifdef UTIL_HAS_CREATEFILELIST + case 'r': + recursive = true; + break; +#endif + case 'c': + outputFile = kStdOut; + forceStdout = true; + break; + case 'v': + ++verbosity; + break; + case 'q': + --verbosity; + // Ignore them for now + break; + // Unsupported options from Zstd + case 'D': + case 's': + notSupported("Zstd dictionaries."); + return Status::Failure; + case 'b': + case 'e': + case 'i': + case 'B': + notSupported("Zstd benchmarking options."); + return Status::Failure; + default: + std::fprintf(stderr, "Invalid argument: %s\n", arg); + return Status::Failure; + } + if (!finished) { + ++options; + } + } // while (*options != 0); + } // for (int i = 1; i < argc; ++i); + + // Set options for test mode + if (test) { + outputFile = nullOutput; + keepSource = true; + } + + // Input file defaults to standard input if not provided. + if (localInputFiles.empty()) { + localInputFiles.emplace_back(kStdIn); + } + + // Check validity of input files + if (localInputFiles.size() > 1) { + const auto it = std::find(localInputFiles.begin(), localInputFiles.end(), + std::string{kStdIn}); + if (it != localInputFiles.end()) { + std::fprintf( + stderr, + "Cannot specify standard input when handling multiple files\n"); + return Status::Failure; + } + } + if (localInputFiles.size() > 1 || recursive) { + if (!outputFile.empty() && outputFile != nullOutput) { + std::fprintf( + stderr, + "Cannot specify an output file when handling multiple inputs\n"); + return Status::Failure; + } + } + + g_utilDisplayLevel = verbosity; + // Remove local input files that are symbolic links + if (!followLinks) { + std::remove_if(localInputFiles.begin(), localInputFiles.end(), + [&](const char *path) { + bool isLink = UTIL_isLink(path); + if (isLink && verbosity >= 2) { + std::fprintf( + stderr, + "Warning : %s is symbolic link, ignoring\n", + path); + } + return isLink; + }); + } + + // Translate input files/directories into files to (de)compress + if (recursive) { + char *scratchBuffer = nullptr; + unsigned numFiles = 0; + const char **files = + UTIL_createFileList(localInputFiles.data(), localInputFiles.size(), + &scratchBuffer, &numFiles, followLinks); + if (files == nullptr) { + std::fprintf(stderr, "Error traversing directories\n"); + return Status::Failure; + } + auto guard = + makeScopeGuard([&] { UTIL_freeFileList(files, scratchBuffer); }); + if (numFiles == 0) { + std::fprintf(stderr, "No files found\n"); + return Status::Failure; + } + inputFiles.resize(numFiles); + std::copy(files, files + numFiles, inputFiles.begin()); + } else { + inputFiles.resize(localInputFiles.size()); + std::copy(localInputFiles.begin(), localInputFiles.end(), + inputFiles.begin()); + } + localInputFiles.clear(); + assert(!inputFiles.empty()); + + // If reading from standard input, default to standard output + if (inputFiles[0] == kStdIn && outputFile.empty()) { + assert(inputFiles.size() == 1); + outputFile = "-"; + } + + if (inputFiles[0] == kStdIn && IS_CONSOLE(stdin)) { + assert(inputFiles.size() == 1); + std::fprintf(stderr, "Cannot read input from interactive console\n"); + return Status::Failure; + } + if (outputFile == "-" && IS_CONSOLE(stdout) && !(forceStdout && decompress)) { + std::fprintf(stderr, "Will not write to console stdout unless -c or -f is " + "specified and decompressing\n"); + return Status::Failure; + } + + // Check compression level + { + unsigned maxCLevel = + ultra ? ZSTD_maxCLevel() : kMaxNonUltraCompressionLevel; + if (compressionLevel > maxCLevel || compressionLevel == 0) { + std::fprintf(stderr, "Invalid compression level %u.\n", compressionLevel); + return Status::Failure; + } + } + + // Check that numThreads is set + if (numThreads == 0) { + std::fprintf(stderr, "Invalid arguments: # of threads not specified " + "and unable to determine hardware concurrency.\n"); + return Status::Failure; + } + + // Modify verbosity + // If we are piping input and output, turn off interaction + if (inputFiles[0] == kStdIn && outputFile == kStdOut && verbosity == 2) { + verbosity = 1; + } + // If we are in multi-file mode, turn off interaction + if (inputFiles.size() > 1 && verbosity == 2) { + verbosity = 1; + } + + return Status::Success; +} + +std::string Options::getOutputFile(const std::string &inputFile) const { + if (!outputFile.empty()) { + return outputFile; + } + // Attempt to add/remove zstd extension from the input file + if (decompress) { + int stemSize = inputFile.size() - kZstdExtension.size(); + if (stemSize > 0 && inputFile.substr(stemSize) == kZstdExtension) { + return inputFile.substr(0, stemSize); + } else { + return ""; + } + } else { + return inputFile + kZstdExtension; + } +} +} diff --git a/src/zstd/contrib/pzstd/Options.h b/src/zstd/contrib/pzstd/Options.h new file mode 100644 index 00000000..f4f2aaa4 --- /dev/null +++ b/src/zstd/contrib/pzstd/Options.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd.h" +#undef ZSTD_STATIC_LINKING_ONLY + +#include <cstdint> +#include <string> +#include <vector> + +namespace pzstd { + +struct Options { + enum class WriteMode { Regular, Auto, Sparse }; + + unsigned numThreads; + unsigned maxWindowLog; + unsigned compressionLevel; + bool decompress; + std::vector<std::string> inputFiles; + std::string outputFile; + bool overwrite; + bool keepSource; + WriteMode writeMode; + bool checksum; + int verbosity; + + enum class Status { + Success, // Successfully parsed options + Failure, // Failure to parse options + Message // Options specified to print a message (e.g. "-h") + }; + + Options(); + Options(unsigned numThreads, unsigned maxWindowLog, unsigned compressionLevel, + bool decompress, std::vector<std::string> inputFiles, + std::string outputFile, bool overwrite, bool keepSource, + WriteMode writeMode, bool checksum, int verbosity) + : numThreads(numThreads), maxWindowLog(maxWindowLog), + compressionLevel(compressionLevel), decompress(decompress), + inputFiles(std::move(inputFiles)), outputFile(std::move(outputFile)), + overwrite(overwrite), keepSource(keepSource), writeMode(writeMode), + checksum(checksum), verbosity(verbosity) {} + + Status parse(int argc, const char **argv); + + ZSTD_parameters determineParameters() const { + ZSTD_parameters params = ZSTD_getParams(compressionLevel, 0, 0); + params.fParams.contentSizeFlag = 0; + params.fParams.checksumFlag = checksum; + if (maxWindowLog != 0 && params.cParams.windowLog > maxWindowLog) { + params.cParams.windowLog = maxWindowLog; + params.cParams = ZSTD_adjustCParams(params.cParams, 0, 0); + } + return params; + } + + std::string getOutputFile(const std::string &inputFile) const; +}; +} diff --git a/src/zstd/contrib/pzstd/Pzstd.cpp b/src/zstd/contrib/pzstd/Pzstd.cpp new file mode 100644 index 00000000..1eb4ce14 --- /dev/null +++ b/src/zstd/contrib/pzstd/Pzstd.cpp @@ -0,0 +1,618 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "Pzstd.h" +#include "SkippableFrame.h" +#include "utils/FileSystem.h" +#include "utils/Range.h" +#include "utils/ScopeGuard.h" +#include "utils/ThreadPool.h" +#include "utils/WorkQueue.h" + +#include <chrono> +#include <cinttypes> +#include <cstddef> +#include <cstdio> +#include <memory> +#include <string> + +#if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(_WIN32) || defined(__CYGWIN__) +# include <fcntl.h> /* _O_BINARY */ +# include <io.h> /* _setmode, _isatty */ +# define SET_BINARY_MODE(file) { if (_setmode(_fileno(file), _O_BINARY) == -1) perror("Cannot set _O_BINARY"); } +#else +# include <unistd.h> /* isatty */ +# define SET_BINARY_MODE(file) +#endif + +namespace pzstd { + +namespace { +#ifdef _WIN32 +const std::string nullOutput = "nul"; +#else +const std::string nullOutput = "/dev/null"; +#endif +} + +using std::size_t; + +static std::uintmax_t fileSizeOrZero(const std::string &file) { + if (file == "-") { + return 0; + } + std::error_code ec; + auto size = file_size(file, ec); + if (ec) { + size = 0; + } + return size; +} + +static std::uint64_t handleOneInput(const Options &options, + const std::string &inputFile, + FILE* inputFd, + const std::string &outputFile, + FILE* outputFd, + SharedState& state) { + auto inputSize = fileSizeOrZero(inputFile); + // WorkQueue outlives ThreadPool so in the case of error we are certain + // we don't accidently try to call push() on it after it is destroyed + WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1}; + std::uint64_t bytesRead; + std::uint64_t bytesWritten; + { + // Initialize the (de)compression thread pool with numThreads + ThreadPool executor(options.numThreads); + // Run the reader thread on an extra thread + ThreadPool readExecutor(1); + if (!options.decompress) { + // Add a job that reads the input and starts all the compression jobs + readExecutor.add( + [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] { + bytesRead = asyncCompressChunks( + state, + outs, + executor, + inputFd, + inputSize, + options.numThreads, + options.determineParameters()); + }); + // Start writing + bytesWritten = writeFile(state, outs, outputFd, options.decompress); + } else { + // Add a job that reads the input and starts all the decompression jobs + readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] { + bytesRead = asyncDecompressFrames(state, outs, executor, inputFd); + }); + // Start writing + bytesWritten = writeFile(state, outs, outputFd, options.decompress); + } + } + if (!state.errorHolder.hasError()) { + std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; + std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; + if (!options.decompress) { + double ratio = static_cast<double>(bytesWritten) / + static_cast<double>(bytesRead + !bytesRead); + state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 + " bytes, %s)\n", + inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, + outputFileName.c_str()); + } else { + state.log(INFO, "%-20s: %" PRIu64 " bytes \n", + inputFileName.c_str(),bytesWritten); + } + } + return bytesWritten; +} + +static FILE *openInputFile(const std::string &inputFile, + ErrorHolder &errorHolder) { + if (inputFile == "-") { + SET_BINARY_MODE(stdin); + return stdin; + } + // Check if input file is a directory + { + std::error_code ec; + if (is_directory(inputFile, ec)) { + errorHolder.setError("Output file is a directory -- ignored"); + return nullptr; + } + } + auto inputFd = std::fopen(inputFile.c_str(), "rb"); + if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) { + return nullptr; + } + return inputFd; +} + +static FILE *openOutputFile(const Options &options, + const std::string &outputFile, + SharedState& state) { + if (outputFile == "-") { + SET_BINARY_MODE(stdout); + return stdout; + } + // Check if the output file exists and then open it + if (!options.overwrite && outputFile != nullOutput) { + auto outputFd = std::fopen(outputFile.c_str(), "rb"); + if (outputFd != nullptr) { + std::fclose(outputFd); + if (!state.log.logsAt(INFO)) { + state.errorHolder.setError("Output file exists"); + return nullptr; + } + state.log( + INFO, + "pzstd: %s already exists; do you wish to overwrite (y/n) ? ", + outputFile.c_str()); + int c = getchar(); + if (c != 'y' && c != 'Y') { + state.errorHolder.setError("Not overwritten"); + return nullptr; + } + } + } + auto outputFd = std::fopen(outputFile.c_str(), "wb"); + if (!state.errorHolder.check( + outputFd != nullptr, "Failed to open output file")) { + return nullptr; + } + return outputFd; +} + +int pzstdMain(const Options &options) { + int returnCode = 0; + SharedState state(options); + for (const auto& input : options.inputFiles) { + // Setup the shared state + auto printErrorGuard = makeScopeGuard([&] { + if (state.errorHolder.hasError()) { + returnCode = 1; + state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(), + state.errorHolder.getError().c_str()); + } + }); + // Open the input file + auto inputFd = openInputFile(input, state.errorHolder); + if (inputFd == nullptr) { + continue; + } + auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); }); + // Open the output file + auto outputFile = options.getOutputFile(input); + if (!state.errorHolder.check(outputFile != "", + "Input file does not have extension .zst")) { + continue; + } + auto outputFd = openOutputFile(options, outputFile, state); + if (outputFd == nullptr) { + continue; + } + auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); + // (de)compress the file + handleOneInput(options, input, inputFd, outputFile, outputFd, state); + if (state.errorHolder.hasError()) { + continue; + } + // Delete the input file if necessary + if (!options.keepSource) { + // Be sure that we are done and have written everything before we delete + if (!state.errorHolder.check(std::fclose(inputFd) == 0, + "Failed to close input file")) { + continue; + } + closeInputGuard.dismiss(); + if (!state.errorHolder.check(std::fclose(outputFd) == 0, + "Failed to close output file")) { + continue; + } + closeOutputGuard.dismiss(); + if (std::remove(input.c_str()) != 0) { + state.errorHolder.setError("Failed to remove input file"); + continue; + } + } + } + // Returns 1 if any of the files failed to (de)compress. + return returnCode; +} + +/// Construct a `ZSTD_inBuffer` that points to the data in `buffer`. +static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) { + return ZSTD_inBuffer{buffer.data(), buffer.size(), 0}; +} + +/** + * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by + * `inBuffer.pos`. + */ +void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) { + auto pos = inBuffer.pos; + inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos; + inBuffer.size -= pos; + inBuffer.pos = 0; + return buffer.advance(pos); +} + +/// Construct a `ZSTD_outBuffer` that points to the data in `buffer`. +static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) { + return ZSTD_outBuffer{buffer.data(), buffer.size(), 0}; +} + +/** + * Split `buffer` and advance `outBuffer` by the amount of data written, as + * indicated by `outBuffer.pos`. + */ +Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) { + auto pos = outBuffer.pos; + outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos; + outBuffer.size -= pos; + outBuffer.pos = 0; + return buffer.splitAt(pos); +} + +/** + * Stream chunks of input from `in`, compress it, and stream it out to `out`. + * + * @param state The shared state + * @param in Queue that we `pop()` input buffers from + * @param out Queue that we `push()` compressed output buffers to + * @param maxInputSize An upper bound on the size of the input + */ +static void compress( + SharedState& state, + std::shared_ptr<BufferWorkQueue> in, + std::shared_ptr<BufferWorkQueue> out, + size_t maxInputSize) { + auto& errorHolder = state.errorHolder; + auto guard = makeScopeGuard([&] { out->finish(); }); + // Initialize the CCtx + auto ctx = state.cStreamPool->get(); + if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) { + return; + } + { + auto err = ZSTD_resetCStream(ctx.get(), 0); + if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { + return; + } + } + + // Allocate space for the result + auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize)); + auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); + { + Buffer inBuffer; + // Read a buffer in from the input queue + while (in->pop(inBuffer) && !errorHolder.hasError()) { + auto zstdInBuffer = makeZstdInBuffer(inBuffer); + // Compress the whole buffer and send it to the output queue + while (!inBuffer.empty() && !errorHolder.hasError()) { + if (!errorHolder.check( + !outBuffer.empty(), "ZSTD_compressBound() was too small")) { + return; + } + // Compress + auto err = + ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); + if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { + return; + } + // Split the compressed data off outBuffer and pass to the output queue + out->push(split(outBuffer, zstdOutBuffer)); + // Forget about the data we already compressed + advance(inBuffer, zstdInBuffer); + } + } + } + // Write the epilog + size_t bytesLeft; + do { + if (!errorHolder.check( + !outBuffer.empty(), "ZSTD_compressBound() was too small")) { + return; + } + bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer); + if (!errorHolder.check( + !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) { + return; + } + out->push(split(outBuffer, zstdOutBuffer)); + } while (bytesLeft != 0 && !errorHolder.hasError()); +} + +/** + * Calculates how large each independently compressed frame should be. + * + * @param size The size of the source if known, 0 otherwise + * @param numThreads The number of threads available to run compression jobs on + * @param params The zstd parameters to be used for compression + */ +static size_t calculateStep( + std::uintmax_t size, + size_t numThreads, + const ZSTD_parameters ¶ms) { + (void)size; + (void)numThreads; + return size_t{1} << (params.cParams.windowLog + 2); +} + +namespace { +enum class FileStatus { Continue, Done, Error }; +/// Determines the status of the file descriptor `fd`. +FileStatus fileStatus(FILE* fd) { + if (std::feof(fd)) { + return FileStatus::Done; + } else if (std::ferror(fd)) { + return FileStatus::Error; + } + return FileStatus::Continue; +} +} // anonymous namespace + +/** + * Reads `size` data in chunks of `chunkSize` and puts it into `queue`. + * Will read less if an error or EOF occurs. + * Returns the status of the file after all of the reads have occurred. + */ +static FileStatus +readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, + std::uint64_t *totalBytesRead) { + Buffer buffer(size); + while (!buffer.empty()) { + auto bytesRead = + std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); + *totalBytesRead += bytesRead; + queue.push(buffer.splitAt(bytesRead)); + auto status = fileStatus(fd); + if (status != FileStatus::Continue) { + return status; + } + } + return FileStatus::Continue; +} + +std::uint64_t asyncCompressChunks( + SharedState& state, + WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, + ThreadPool& executor, + FILE* fd, + std::uintmax_t size, + size_t numThreads, + ZSTD_parameters params) { + auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); + std::uint64_t bytesRead = 0; + + // Break the input up into chunks of size `step` and compress each chunk + // independently. + size_t step = calculateStep(size, numThreads, params); + state.log(DEBUG, "Chosen frame size: %zu\n", step); + auto status = FileStatus::Continue; + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { + // Make a new input queue that we will put the chunk's input data into. + auto in = std::make_shared<BufferWorkQueue>(); + auto inGuard = makeScopeGuard([&] { in->finish(); }); + // Make a new output queue that compress will put the compressed data into. + auto out = std::make_shared<BufferWorkQueue>(); + // Start compression in the thread pool + executor.add([&state, in, out, step] { + return compress( + state, std::move(in), std::move(out), step); + }); + // Pass the output queue to the writer thread. + chunks.push(std::move(out)); + state.log(VERBOSE, "%s\n", "Starting a new frame"); + // Fill the input queue for the compression job we just started + status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); + } + state.errorHolder.check(status != FileStatus::Error, "Error reading input"); + return bytesRead; +} + +/** + * Decompress a frame, whose data is streamed into `in`, and stream the output + * to `out`. + * + * @param state The shared state + * @param in Queue that we `pop()` input buffers from. It contains + * exactly one compressed frame. + * @param out Queue that we `push()` decompressed output buffers to + */ +static void decompress( + SharedState& state, + std::shared_ptr<BufferWorkQueue> in, + std::shared_ptr<BufferWorkQueue> out) { + auto& errorHolder = state.errorHolder; + auto guard = makeScopeGuard([&] { out->finish(); }); + // Initialize the DCtx + auto ctx = state.dStreamPool->get(); + if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) { + return; + } + { + auto err = ZSTD_resetDStream(ctx.get()); + if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { + return; + } + } + + const size_t outSize = ZSTD_DStreamOutSize(); + Buffer inBuffer; + size_t returnCode = 0; + // Read a buffer in from the input queue + while (in->pop(inBuffer) && !errorHolder.hasError()) { + auto zstdInBuffer = makeZstdInBuffer(inBuffer); + // Decompress the whole buffer and send it to the output queue + while (!inBuffer.empty() && !errorHolder.hasError()) { + // Allocate a buffer with at least outSize bytes. + Buffer outBuffer(outSize); + auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); + // Decompress + returnCode = + ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); + if (!errorHolder.check( + !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) { + return; + } + // Pass the buffer with the decompressed data to the output queue + out->push(split(outBuffer, zstdOutBuffer)); + // Advance past the input we already read + advance(inBuffer, zstdInBuffer); + if (returnCode == 0) { + // The frame is over, prepare to (maybe) start a new frame + ZSTD_initDStream(ctx.get()); + } + } + } + if (!errorHolder.check(returnCode <= 1, "Incomplete block")) { + return; + } + // We've given ZSTD_decompressStream all of our data, but there may still + // be data to read. + while (returnCode == 1) { + // Allocate a buffer with at least outSize bytes. + Buffer outBuffer(outSize); + auto zstdOutBuffer = makeZstdOutBuffer(outBuffer); + // Pass in no input. + ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0}; + // Decompress + returnCode = + ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer); + if (!errorHolder.check( + !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) { + return; + } + // Pass the buffer with the decompressed data to the output queue + out->push(split(outBuffer, zstdOutBuffer)); + } +} + +std::uint64_t asyncDecompressFrames( + SharedState& state, + WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, + ThreadPool& executor, + FILE* fd) { + auto framesGuard = makeScopeGuard([&] { frames.finish(); }); + std::uint64_t totalBytesRead = 0; + + // Split the source up into its component frames. + // If we find our recognized skippable frame we know the next frames size + // which means that we can decompress each standard frame in independently. + // Otherwise, we will decompress using only one decompression task. + const size_t chunkSize = ZSTD_DStreamInSize(); + auto status = FileStatus::Continue; + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { + // Make a new input queue that we will put the frames's bytes into. + auto in = std::make_shared<BufferWorkQueue>(); + auto inGuard = makeScopeGuard([&] { in->finish(); }); + // Make a output queue that decompress will put the decompressed data into + auto out = std::make_shared<BufferWorkQueue>(); + + size_t frameSize; + { + // Calculate the size of the next frame. + // frameSize is 0 if the frame info can't be decoded. + Buffer buffer(SkippableFrame::kSize); + auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); + totalBytesRead += bytesRead; + status = fileStatus(fd); + if (bytesRead == 0 && status != FileStatus::Continue) { + break; + } + buffer.subtract(buffer.size() - bytesRead); + frameSize = SkippableFrame::tryRead(buffer.range()); + in->push(std::move(buffer)); + } + if (frameSize == 0) { + // We hit a non SkippableFrame, so this will be the last job. + // Make sure that we don't use too much memory + in->setMaxSize(64); + out->setMaxSize(64); + } + // Start decompression in the thread pool + executor.add([&state, in, out] { + return decompress(state, std::move(in), std::move(out)); + }); + // Pass the output queue to the writer thread + frames.push(std::move(out)); + if (frameSize == 0) { + // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted + // Pass the rest of the source to this decompression task + state.log(VERBOSE, "%s\n", + "Input not in pzstd format, falling back to serial decompression"); + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { + status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); + } + break; + } + state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize); + // Fill the input queue for the decompression job we just started + status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); + } + state.errorHolder.check(status != FileStatus::Error, "Error reading input"); + return totalBytesRead; +} + +/// Write `data` to `fd`, returns true iff success. +static bool writeData(ByteRange data, FILE* fd) { + while (!data.empty()) { + data.advance(std::fwrite(data.begin(), 1, data.size(), fd)); + if (std::ferror(fd)) { + return false; + } + } + return true; +} + +std::uint64_t writeFile( + SharedState& state, + WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, + FILE* outputFd, + bool decompress) { + auto& errorHolder = state.errorHolder; + auto lineClearGuard = makeScopeGuard([&state] { + state.log.clear(INFO); + }); + std::uint64_t bytesWritten = 0; + std::shared_ptr<BufferWorkQueue> out; + // Grab the output queue for each decompression job (in order). + while (outs.pop(out)) { + if (errorHolder.hasError()) { + continue; + } + if (!decompress) { + // If we are compressing and want to write skippable frames we can't + // start writing before compression is done because we need to know the + // compressed size. + // Wait for the compressed size to be available and write skippable frame + SkippableFrame frame(out->size()); + if (!writeData(frame.data(), outputFd)) { + errorHolder.setError("Failed to write output"); + return bytesWritten; + } + bytesWritten += frame.kSize; + } + // For each chunk of the frame: Pop it from the queue and write it + Buffer buffer; + while (out->pop(buffer) && !errorHolder.hasError()) { + if (!writeData(buffer.range(), outputFd)) { + errorHolder.setError("Failed to write output"); + return bytesWritten; + } + bytesWritten += buffer.size(); + state.log.update(INFO, "Written: %u MB ", + static_cast<std::uint32_t>(bytesWritten >> 20)); + } + } + return bytesWritten; +} +} diff --git a/src/zstd/contrib/pzstd/Pzstd.h b/src/zstd/contrib/pzstd/Pzstd.h new file mode 100644 index 00000000..79d1fcca --- /dev/null +++ b/src/zstd/contrib/pzstd/Pzstd.h @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "ErrorHolder.h" +#include "Logging.h" +#include "Options.h" +#include "utils/Buffer.h" +#include "utils/Range.h" +#include "utils/ResourcePool.h" +#include "utils/ThreadPool.h" +#include "utils/WorkQueue.h" +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd.h" +#undef ZSTD_STATIC_LINKING_ONLY + +#include <cstddef> +#include <cstdint> +#include <memory> + +namespace pzstd { +/** + * Runs pzstd with `options` and returns the number of bytes written. + * An error occurred if `errorHandler.hasError()`. + * + * @param options The pzstd options to use for (de)compression + * @returns 0 upon success and non-zero on failure. + */ +int pzstdMain(const Options& options); + +class SharedState { + public: + SharedState(const Options& options) : log(options.verbosity) { + if (!options.decompress) { + auto parameters = options.determineParameters(); + cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ + [this, parameters]() -> ZSTD_CStream* { + this->log(VERBOSE, "%s\n", "Creating new ZSTD_CStream"); + auto zcs = ZSTD_createCStream(); + if (zcs) { + auto err = ZSTD_initCStream_advanced( + zcs, nullptr, 0, parameters, 0); + if (ZSTD_isError(err)) { + ZSTD_freeCStream(zcs); + return nullptr; + } + } + return zcs; + }, + [](ZSTD_CStream *zcs) { + ZSTD_freeCStream(zcs); + }}); + } else { + dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ + [this]() -> ZSTD_DStream* { + this->log(VERBOSE, "%s\n", "Creating new ZSTD_DStream"); + auto zds = ZSTD_createDStream(); + if (zds) { + auto err = ZSTD_initDStream(zds); + if (ZSTD_isError(err)) { + ZSTD_freeDStream(zds); + return nullptr; + } + } + return zds; + }, + [](ZSTD_DStream *zds) { + ZSTD_freeDStream(zds); + }}); + } + } + + ~SharedState() { + // The resource pools have references to this, so destroy them first. + cStreamPool.reset(); + dStreamPool.reset(); + } + + Logger log; + ErrorHolder errorHolder; + std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; + std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; +}; + +/** + * Streams input from `fd`, breaks input up into chunks, and compresses each + * chunk independently. Output of each chunk gets streamed to a queue, and + * the output queues get put into `chunks` in order. + * + * @param state The shared state + * @param chunks Each compression jobs output queue gets `pushed()` here + * as soon as it is available + * @param executor The thread pool to run compression jobs in + * @param fd The input file descriptor + * @param size The size of the input file if known, 0 otherwise + * @param numThreads The number of threads in the thread pool + * @param parameters The zstd parameters to use for compression + * @returns The number of bytes read from the file + */ +std::uint64_t asyncCompressChunks( + SharedState& state, + WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, + ThreadPool& executor, + FILE* fd, + std::uintmax_t size, + std::size_t numThreads, + ZSTD_parameters parameters); + +/** + * Streams input from `fd`. If pzstd headers are available it breaks the input + * up into independent frames. It sends each frame to an independent + * decompression job. Output of each frame gets streamed to a queue, and + * the output queues get put into `frames` in order. + * + * @param state The shared state + * @param frames Each decompression jobs output queue gets `pushed()` here + * as soon as it is available + * @param executor The thread pool to run compression jobs in + * @param fd The input file descriptor + * @returns The number of bytes read from the file + */ +std::uint64_t asyncDecompressFrames( + SharedState& state, + WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, + ThreadPool& executor, + FILE* fd); + +/** + * Streams input in from each queue in `outs` in order, and writes the data to + * `outputFd`. + * + * @param state The shared state + * @param outs A queue of output queues, one for each + * (de)compression job. + * @param outputFd The file descriptor to write to + * @param decompress Are we decompressing? + * @returns The number of bytes written + */ +std::uint64_t writeFile( + SharedState& state, + WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, + FILE* outputFd, + bool decompress); +} diff --git a/src/zstd/contrib/pzstd/README.md b/src/zstd/contrib/pzstd/README.md new file mode 100644 index 00000000..84d94581 --- /dev/null +++ b/src/zstd/contrib/pzstd/README.md @@ -0,0 +1,56 @@ +# Parallel Zstandard (PZstandard) + +Parallel Zstandard is a Pigz-like tool for Zstandard. +It provides Zstandard format compatible compression and decompression that is able to utilize multiple cores. +It breaks the input up into equal sized chunks and compresses each chunk independently into a Zstandard frame. +It then concatenates the frames together to produce the final compressed output. +Pzstandard will write a 12 byte header for each frame that is a skippable frame in the Zstandard format, which tells PZstandard the size of the next compressed frame. +PZstandard supports parallel decompression of files compressed with PZstandard. +When decompressing files compressed with Zstandard, PZstandard does IO in one thread, and decompression in another. + +## Usage + +PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads. +Dictionary mode is not currently supported. + +Basic usage + + pzstd input-file -o output-file -p num-threads -# # Compression + pzstd -d input-file -o output-file -p num-threads # Decompression + +PZstandard also supports piping and fifo pipes + + cat input-file | pzstd -p num-threads -# -c > /dev/null + +For more options + + pzstd --help + +PZstandard tries to pick a smart default number of threads if not specified (displayed in `pzstd --help`). +If this number is not suitable, during compilation you can define `PZSTD_NUM_THREADS` to the number of threads you prefer. + +## Benchmarks + +As a reference, PZstandard and Pigz were compared on an Intel Core i7 @ 3.1 GHz, each using 4 threads, with the [Silesia compression corpus](http://sun.aei.polsl.pl/~sdeor/index.php?page=silesia). + +Compression Speed vs Ratio with 4 Threads | Decompression Speed with 4 Threads +------------------------------------------|----------------------------------- +![Compression Speed vs Ratio](images/Cspeed.png "Compression Speed vs Ratio") | ![Decompression Speed](images/Dspeed.png "Decompression Speed") + +The test procedure was to run each of the following commands 2 times for each compression level, and take the minimum time. + + time pzstd -# -p 4 -c silesia.tar > silesia.tar.zst + time pzstd -d -p 4 -c silesia.tar.zst > /dev/null + + time pigz -# -p 4 -k -c silesia.tar > silesia.tar.gz + time pigz -d -p 4 -k -c silesia.tar.gz > /dev/null + +PZstandard was tested using compression levels 1-19, and Pigz was tested using compression levels 1-9. +Pigz cannot do parallel decompression, it simply does each of reading, decompression, and writing on separate threads. + +## Tests + +Tests require that you have [gtest](https://github.com/google/googletest) installed. +Set `GTEST_INC` and `GTEST_LIB` in `Makefile` to specify the location of the gtest headers and libraries. +Alternatively, run `make googletest`, which will clone googletest and build it. +Run `make tests && make check` to run tests. diff --git a/src/zstd/contrib/pzstd/SkippableFrame.cpp b/src/zstd/contrib/pzstd/SkippableFrame.cpp new file mode 100644 index 00000000..769866df --- /dev/null +++ b/src/zstd/contrib/pzstd/SkippableFrame.cpp @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "SkippableFrame.h" +#include "mem.h" +#include "utils/Range.h" + +#include <cstdio> + +using namespace pzstd; + +SkippableFrame::SkippableFrame(std::uint32_t size) : frameSize_(size) { + MEM_writeLE32(data_.data(), kSkippableFrameMagicNumber); + MEM_writeLE32(data_.data() + 4, kFrameContentsSize); + MEM_writeLE32(data_.data() + 8, frameSize_); +} + +/* static */ std::size_t SkippableFrame::tryRead(ByteRange bytes) { + if (bytes.size() < SkippableFrame::kSize || + MEM_readLE32(bytes.begin()) != kSkippableFrameMagicNumber || + MEM_readLE32(bytes.begin() + 4) != kFrameContentsSize) { + return 0; + } + return MEM_readLE32(bytes.begin() + 8); +} diff --git a/src/zstd/contrib/pzstd/SkippableFrame.h b/src/zstd/contrib/pzstd/SkippableFrame.h new file mode 100644 index 00000000..60deed04 --- /dev/null +++ b/src/zstd/contrib/pzstd/SkippableFrame.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Range.h" + +#include <array> +#include <cstddef> +#include <cstdint> +#include <cstdio> + +namespace pzstd { +/** + * We put a skippable frame before each frame. + * It contains a skippable frame magic number, the size of the skippable frame, + * and the size of the next frame. + * Each skippable frame is exactly 12 bytes in little endian format. + * The first 8 bytes are for compatibility with the ZSTD format. + * If we have N threads, the output will look like + * + * [0x184D2A50|4|size1] [frame1 of size size1] + * [0x184D2A50|4|size2] [frame2 of size size2] + * ... + * [0x184D2A50|4|sizeN] [frameN of size sizeN] + * + * Each sizeX is 4 bytes. + * + * These skippable frames should allow us to skip through the compressed file + * and only load at most N pages. + */ +class SkippableFrame { + public: + static constexpr std::size_t kSize = 12; + + private: + std::uint32_t frameSize_; + std::array<std::uint8_t, kSize> data_; + static constexpr std::uint32_t kSkippableFrameMagicNumber = 0x184D2A50; + // Could be improved if the size fits in less bytes + static constexpr std::uint32_t kFrameContentsSize = kSize - 8; + + public: + // Write the skippable frame to data_ in LE format. + explicit SkippableFrame(std::uint32_t size); + + // Read the skippable frame from bytes in LE format. + static std::size_t tryRead(ByteRange bytes); + + ByteRange data() const { + return {data_.data(), data_.size()}; + } + + // Size of the next frame. + std::size_t frameSize() const { + return frameSize_; + } +}; +} diff --git a/src/zstd/contrib/pzstd/images/Cspeed.png b/src/zstd/contrib/pzstd/images/Cspeed.png Binary files differnew file mode 100644 index 00000000..aca4f663 --- /dev/null +++ b/src/zstd/contrib/pzstd/images/Cspeed.png diff --git a/src/zstd/contrib/pzstd/images/Dspeed.png b/src/zstd/contrib/pzstd/images/Dspeed.png Binary files differnew file mode 100644 index 00000000..e48881bc --- /dev/null +++ b/src/zstd/contrib/pzstd/images/Dspeed.png diff --git a/src/zstd/contrib/pzstd/main.cpp b/src/zstd/contrib/pzstd/main.cpp new file mode 100644 index 00000000..b93f043b --- /dev/null +++ b/src/zstd/contrib/pzstd/main.cpp @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "ErrorHolder.h" +#include "Options.h" +#include "Pzstd.h" + +using namespace pzstd; + +int main(int argc, const char** argv) { + Options options; + switch (options.parse(argc, argv)) { + case Options::Status::Failure: + return 1; + case Options::Status::Message: + return 0; + default: + break; + } + + return pzstdMain(options); +} diff --git a/src/zstd/contrib/pzstd/test/BUCK b/src/zstd/contrib/pzstd/test/BUCK new file mode 100644 index 00000000..6d3fdd3c --- /dev/null +++ b/src/zstd/contrib/pzstd/test/BUCK @@ -0,0 +1,37 @@ +cxx_test( + name='options_test', + srcs=['OptionsTest.cpp'], + deps=['//contrib/pzstd:options'], +) + +cxx_test( + name='pzstd_test', + srcs=['PzstdTest.cpp'], + deps=[ + ':round_trip', + '//contrib/pzstd:libpzstd', + '//contrib/pzstd/utils:scope_guard', + '//programs:datagen', + ], +) + +cxx_binary( + name='round_trip_test', + srcs=['RoundTripTest.cpp'], + deps=[ + ':round_trip', + '//contrib/pzstd/utils:scope_guard', + '//programs:datagen', + ] +) + +cxx_library( + name='round_trip', + header_namespace='test', + exported_headers=['RoundTrip.h'], + deps=[ + '//contrib/pzstd:libpzstd', + '//contrib/pzstd:options', + '//contrib/pzstd/utils:scope_guard', + ] +) diff --git a/src/zstd/contrib/pzstd/test/OptionsTest.cpp b/src/zstd/contrib/pzstd/test/OptionsTest.cpp new file mode 100644 index 00000000..e6011482 --- /dev/null +++ b/src/zstd/contrib/pzstd/test/OptionsTest.cpp @@ -0,0 +1,536 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "Options.h" + +#include <array> +#include <gtest/gtest.h> + +using namespace pzstd; + +namespace pzstd { +bool operator==(const Options &lhs, const Options &rhs) { + return lhs.numThreads == rhs.numThreads && + lhs.maxWindowLog == rhs.maxWindowLog && + lhs.compressionLevel == rhs.compressionLevel && + lhs.decompress == rhs.decompress && lhs.inputFiles == rhs.inputFiles && + lhs.outputFile == rhs.outputFile && lhs.overwrite == rhs.overwrite && + lhs.keepSource == rhs.keepSource && lhs.writeMode == rhs.writeMode && + lhs.checksum == rhs.checksum && lhs.verbosity == rhs.verbosity; +} + +std::ostream &operator<<(std::ostream &out, const Options &opt) { + out << "{"; + { + out << "\n\t" + << "numThreads: " << opt.numThreads; + out << ",\n\t" + << "maxWindowLog: " << opt.maxWindowLog; + out << ",\n\t" + << "compressionLevel: " << opt.compressionLevel; + out << ",\n\t" + << "decompress: " << opt.decompress; + out << ",\n\t" + << "inputFiles: {"; + { + bool first = true; + for (const auto &file : opt.inputFiles) { + if (!first) { + out << ","; + } + first = false; + out << "\n\t\t" << file; + } + } + out << "\n\t}"; + out << ",\n\t" + << "outputFile: " << opt.outputFile; + out << ",\n\t" + << "overwrite: " << opt.overwrite; + out << ",\n\t" + << "keepSource: " << opt.keepSource; + out << ",\n\t" + << "writeMode: " << static_cast<int>(opt.writeMode); + out << ",\n\t" + << "checksum: " << opt.checksum; + out << ",\n\t" + << "verbosity: " << opt.verbosity; + } + out << "\n}"; + return out; +} +} + +namespace { +#ifdef _WIN32 +const char nullOutput[] = "nul"; +#else +const char nullOutput[] = "/dev/null"; +#endif + +constexpr auto autoMode = Options::WriteMode::Auto; +} // anonymous namespace + +#define EXPECT_SUCCESS(...) EXPECT_EQ(Options::Status::Success, __VA_ARGS__) +#define EXPECT_FAILURE(...) EXPECT_EQ(Options::Status::Failure, __VA_ARGS__) +#define EXPECT_MESSAGE(...) EXPECT_EQ(Options::Status::Message, __VA_ARGS__) + +template <typename... Args> +std::array<const char *, sizeof...(Args) + 1> makeArray(Args... args) { + return {{nullptr, args...}}; +} + +TEST(Options, ValidInputs) { + { + Options options; + auto args = makeArray("--processes", "5", "-o", "x", "y", "-f"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {5, 23, 3, false, {"y"}, "x", + true, true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("-p", "1", "input", "-19"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {1, 23, 19, false, {"input"}, "", + false, true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = + makeArray("--ultra", "-22", "-p", "1", "-o", "x", "-d", "x.zst", "-f"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {1, 0, 22, true, {"x.zst"}, "x", + true, true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("--processes", "100", "hello.zst", "--decompress", + "--force"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {100, 23, 3, true, {"hello.zst"}, "", true, + true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("x", "-dp", "1", "-c"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {1, 23, 3, true, {"x"}, "-", + false, true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("x", "-dp", "1", "--stdout"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {1, 23, 3, true, {"x"}, "-", + false, true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("-p", "1", "x", "-5", "-fo", "-", "--ultra", "-d"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {1, 0, 5, true, {"x"}, "-", + true, true, autoMode, true, 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("silesia.tar", "-o", "silesia.tar.pzstd", "-p", "2"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {2, + 23, + 3, + false, + {"silesia.tar"}, + "silesia.tar.pzstd", + false, + true, + autoMode, + true, + 2}; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("x", "-p", "1"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "-p", "1"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + } +} + +TEST(Options, GetOutputFile) { + { + Options options; + auto args = makeArray("x"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ("x.zst", options.getOutputFile(options.inputFiles[0])); + } + { + Options options; + auto args = makeArray("x", "y", "-o", nullOutput); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(nullOutput, options.getOutputFile(options.inputFiles[0])); + } + { + Options options; + auto args = makeArray("x.zst", "-do", nullOutput); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(nullOutput, options.getOutputFile(options.inputFiles[0])); + } + { + Options options; + auto args = makeArray("x.zst", "-d"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ("x", options.getOutputFile(options.inputFiles[0])); + } + { + Options options; + auto args = makeArray("xzst", "-d"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ("", options.getOutputFile(options.inputFiles[0])); + } + { + Options options; + auto args = makeArray("xzst", "-doxx"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ("xx", options.getOutputFile(options.inputFiles[0])); + } +} + +TEST(Options, MultipleFiles) { + { + Options options; + auto args = makeArray("x", "y", "z"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected; + expected.inputFiles = {"x", "y", "z"}; + expected.verbosity = 1; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("x", "y", "z", "-o", nullOutput); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected; + expected.inputFiles = {"x", "y", "z"}; + expected.outputFile = nullOutput; + expected.verbosity = 1; + EXPECT_EQ(expected, options); + } + { + Options options; + auto args = makeArray("x", "y", "-o-"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "y", "-o", "file"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("-qqvd12qp4", "-f", "x", "--", "--rm", "-c"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + Options expected = {4, 23, 12, true, {"x", "--rm", "-c"}, + "", true, true, autoMode, true, + 0}; + EXPECT_EQ(expected, options); + } +} + +TEST(Options, NumThreads) { + { + Options options; + auto args = makeArray("x", "-dfo", "-"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "-p", "0", "-fo", "-"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("-f", "-p", "-o", "-"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, BadCompressionLevel) { + { + Options options; + auto args = makeArray("x", "-20"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "--ultra", "-23"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "--1"); // negative 1? + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, InvalidOption) { + { + Options options; + auto args = makeArray("x", "-x"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, BadOutputFile) { + { + Options options; + auto args = makeArray("notzst", "-d", "-p", "1"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ("", options.getOutputFile(options.inputFiles.front())); + } +} + +TEST(Options, BadOptionsWithArguments) { + { + Options options; + auto args = makeArray("x", "-pf"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "-p", "10f"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "-p"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "-o"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("x", "-o"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, KeepSource) { + { + Options options; + auto args = makeArray("x", "--rm", "-k"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.keepSource); + } + { + Options options; + auto args = makeArray("x", "--rm", "--keep"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.keepSource); + } + { + Options options; + auto args = makeArray("x"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.keepSource); + } + { + Options options; + auto args = makeArray("x", "--rm"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(false, options.keepSource); + } +} + +TEST(Options, Verbosity) { + { + Options options; + auto args = makeArray("x"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(2, options.verbosity); + } + { + Options options; + auto args = makeArray("--quiet", "-qq", "x"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(-1, options.verbosity); + } + { + Options options; + auto args = makeArray("x", "y"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(1, options.verbosity); + } + { + Options options; + auto args = makeArray("--", "x", "y"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(1, options.verbosity); + } + { + Options options; + auto args = makeArray("-qv", "x", "y"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(1, options.verbosity); + } + { + Options options; + auto args = makeArray("-v", "x", "y"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(3, options.verbosity); + } + { + Options options; + auto args = makeArray("-v", "x"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(3, options.verbosity); + } +} + +TEST(Options, TestMode) { + { + Options options; + auto args = makeArray("x", "-t"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.keepSource); + EXPECT_EQ(true, options.decompress); + EXPECT_EQ(nullOutput, options.outputFile); + } + { + Options options; + auto args = makeArray("x", "--test", "--rm", "-ohello"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.keepSource); + EXPECT_EQ(true, options.decompress); + EXPECT_EQ(nullOutput, options.outputFile); + } +} + +TEST(Options, Checksum) { + { + Options options; + auto args = makeArray("x.zst", "--no-check", "-Cd"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.checksum); + } + { + Options options; + auto args = makeArray("x"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.checksum); + } + { + Options options; + auto args = makeArray("x", "--no-check", "--check"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(true, options.checksum); + } + { + Options options; + auto args = makeArray("x", "--no-check"); + EXPECT_SUCCESS(options.parse(args.size(), args.data())); + EXPECT_EQ(false, options.checksum); + } +} + +TEST(Options, InputFiles) { + { + Options options; + auto args = makeArray("-cd"); + options.parse(args.size(), args.data()); + EXPECT_EQ(1, options.inputFiles.size()); + EXPECT_EQ("-", options.inputFiles[0]); + EXPECT_EQ("-", options.outputFile); + } + { + Options options; + auto args = makeArray(); + options.parse(args.size(), args.data()); + EXPECT_EQ(1, options.inputFiles.size()); + EXPECT_EQ("-", options.inputFiles[0]); + EXPECT_EQ("-", options.outputFile); + } + { + Options options; + auto args = makeArray("-d"); + options.parse(args.size(), args.data()); + EXPECT_EQ(1, options.inputFiles.size()); + EXPECT_EQ("-", options.inputFiles[0]); + EXPECT_EQ("-", options.outputFile); + } + { + Options options; + auto args = makeArray("x", "-"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, InvalidOptions) { + { + Options options; + auto args = makeArray("-ibasdf"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("- "); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("-n15"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("-0", "x"); + EXPECT_FAILURE(options.parse(args.size(), args.data())); + } +} + +TEST(Options, Extras) { + { + Options options; + auto args = makeArray("-h"); + EXPECT_MESSAGE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("-H"); + EXPECT_MESSAGE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("-V"); + EXPECT_MESSAGE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("--help"); + EXPECT_MESSAGE(options.parse(args.size(), args.data())); + } + { + Options options; + auto args = makeArray("--version"); + EXPECT_MESSAGE(options.parse(args.size(), args.data())); + } +} diff --git a/src/zstd/contrib/pzstd/test/PzstdTest.cpp b/src/zstd/contrib/pzstd/test/PzstdTest.cpp new file mode 100644 index 00000000..5c7d6631 --- /dev/null +++ b/src/zstd/contrib/pzstd/test/PzstdTest.cpp @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "Pzstd.h" +extern "C" { +#include "datagen.h" +} +#include "test/RoundTrip.h" +#include "utils/ScopeGuard.h" + +#include <cstddef> +#include <cstdio> +#include <gtest/gtest.h> +#include <memory> +#include <random> + +using namespace std; +using namespace pzstd; + +TEST(Pzstd, SmallSizes) { + unsigned seed = std::random_device{}(); + std::fprintf(stderr, "Pzstd.SmallSizes seed: %u\n", seed); + std::mt19937 gen(seed); + + for (unsigned len = 1; len < 256; ++len) { + if (len % 16 == 0) { + std::fprintf(stderr, "%u / 16\n", len / 16); + } + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + { + static uint8_t buf[256]; + RDG_genBuffer(buf, len, 0.5, 0.0, gen()); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto written = std::fwrite(buf, 1, len, fd); + std::fclose(fd); + ASSERT_EQ(written, len); + } + for (unsigned numThreads = 1; numThreads <= 2; ++numThreads) { + for (unsigned level = 1; level <= 4; level *= 4) { + auto errorGuard = makeScopeGuard([&] { + std::fprintf(stderr, "# threads: %u\n", numThreads); + std::fprintf(stderr, "compression level: %u\n", level); + }); + Options options; + options.overwrite = true; + options.inputFiles = {inputFile}; + options.numThreads = numThreads; + options.compressionLevel = level; + options.verbosity = 1; + ASSERT_TRUE(roundTrip(options)); + errorGuard.dismiss(); + } + } + } +} + +TEST(Pzstd, LargeSizes) { + unsigned seed = std::random_device{}(); + std::fprintf(stderr, "Pzstd.LargeSizes seed: %u\n", seed); + std::mt19937 gen(seed); + + for (unsigned len = 1 << 20; len <= (1 << 24); len *= 2) { + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + { + std::unique_ptr<uint8_t[]> buf(new uint8_t[len]); + RDG_genBuffer(buf.get(), len, 0.5, 0.0, gen()); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto written = std::fwrite(buf.get(), 1, len, fd); + std::fclose(fd); + ASSERT_EQ(written, len); + } + for (unsigned numThreads = 1; numThreads <= 16; numThreads *= 4) { + for (unsigned level = 1; level <= 4; level *= 4) { + auto errorGuard = makeScopeGuard([&] { + std::fprintf(stderr, "# threads: %u\n", numThreads); + std::fprintf(stderr, "compression level: %u\n", level); + }); + Options options; + options.overwrite = true; + options.inputFiles = {inputFile}; + options.numThreads = std::min(numThreads, options.numThreads); + options.compressionLevel = level; + options.verbosity = 1; + ASSERT_TRUE(roundTrip(options)); + errorGuard.dismiss(); + } + } + } +} + +TEST(Pzstd, DISABLED_ExtremelyLargeSize) { + unsigned seed = std::random_device{}(); + std::fprintf(stderr, "Pzstd.ExtremelyLargeSize seed: %u\n", seed); + std::mt19937 gen(seed); + + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + + { + // Write 4GB + 64 MB + constexpr size_t kLength = 1 << 26; + std::unique_ptr<uint8_t[]> buf(new uint8_t[kLength]); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto closeGuard = makeScopeGuard([&] { std::fclose(fd); }); + for (size_t i = 0; i < (1 << 6) + 1; ++i) { + RDG_genBuffer(buf.get(), kLength, 0.5, 0.0, gen()); + auto written = std::fwrite(buf.get(), 1, kLength, fd); + if (written != kLength) { + std::fprintf(stderr, "Failed to write file, skipping test\n"); + return; + } + } + } + + Options options; + options.overwrite = true; + options.inputFiles = {inputFile}; + options.compressionLevel = 1; + if (options.numThreads == 0) { + options.numThreads = 1; + } + ASSERT_TRUE(roundTrip(options)); +} + +TEST(Pzstd, ExtremelyCompressible) { + std::string inputFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + { + std::unique_ptr<uint8_t[]> buf(new uint8_t[10000]); + std::memset(buf.get(), 'a', 10000); + auto fd = std::fopen(inputFile.c_str(), "wb"); + auto written = std::fwrite(buf.get(), 1, 10000, fd); + std::fclose(fd); + ASSERT_EQ(written, 10000); + } + Options options; + options.overwrite = true; + options.inputFiles = {inputFile}; + options.numThreads = 1; + options.compressionLevel = 1; + ASSERT_TRUE(roundTrip(options)); +} diff --git a/src/zstd/contrib/pzstd/test/RoundTrip.h b/src/zstd/contrib/pzstd/test/RoundTrip.h new file mode 100644 index 00000000..c6364ecb --- /dev/null +++ b/src/zstd/contrib/pzstd/test/RoundTrip.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "Options.h" +#include "Pzstd.h" +#include "utils/ScopeGuard.h" + +#include <cstdio> +#include <string> +#include <cstdint> +#include <memory> + +namespace pzstd { + +inline bool check(std::string source, std::string decompressed) { + std::unique_ptr<std::uint8_t[]> sBuf(new std::uint8_t[1024]); + std::unique_ptr<std::uint8_t[]> dBuf(new std::uint8_t[1024]); + + auto sFd = std::fopen(source.c_str(), "rb"); + auto dFd = std::fopen(decompressed.c_str(), "rb"); + auto guard = makeScopeGuard([&] { + std::fclose(sFd); + std::fclose(dFd); + }); + + size_t sRead, dRead; + + do { + sRead = std::fread(sBuf.get(), 1, 1024, sFd); + dRead = std::fread(dBuf.get(), 1, 1024, dFd); + if (std::ferror(sFd) || std::ferror(dFd)) { + return false; + } + if (sRead != dRead) { + return false; + } + + for (size_t i = 0; i < sRead; ++i) { + if (sBuf.get()[i] != dBuf.get()[i]) { + return false; + } + } + } while (sRead == 1024); + if (!std::feof(sFd) || !std::feof(dFd)) { + return false; + } + return true; +} + +inline bool roundTrip(Options& options) { + if (options.inputFiles.size() != 1) { + return false; + } + std::string source = options.inputFiles.front(); + std::string compressedFile = std::tmpnam(nullptr); + std::string decompressedFile = std::tmpnam(nullptr); + auto guard = makeScopeGuard([&] { + std::remove(compressedFile.c_str()); + std::remove(decompressedFile.c_str()); + }); + + { + options.outputFile = compressedFile; + options.decompress = false; + if (pzstdMain(options) != 0) { + return false; + } + } + { + options.decompress = true; + options.inputFiles.front() = compressedFile; + options.outputFile = decompressedFile; + if (pzstdMain(options) != 0) { + return false; + } + } + return check(source, decompressedFile); +} +} diff --git a/src/zstd/contrib/pzstd/test/RoundTripTest.cpp b/src/zstd/contrib/pzstd/test/RoundTripTest.cpp new file mode 100644 index 00000000..36af0673 --- /dev/null +++ b/src/zstd/contrib/pzstd/test/RoundTripTest.cpp @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +extern "C" { +#include "datagen.h" +} +#include "Options.h" +#include "test/RoundTrip.h" +#include "utils/ScopeGuard.h" + +#include <cstddef> +#include <cstdio> +#include <cstdlib> +#include <memory> +#include <random> + +using namespace std; +using namespace pzstd; + +namespace { +string +writeData(size_t size, double matchProba, double litProba, unsigned seed) { + std::unique_ptr<uint8_t[]> buf(new uint8_t[size]); + RDG_genBuffer(buf.get(), size, matchProba, litProba, seed); + string file = tmpnam(nullptr); + auto fd = std::fopen(file.c_str(), "wb"); + auto guard = makeScopeGuard([&] { std::fclose(fd); }); + auto bytesWritten = std::fwrite(buf.get(), 1, size, fd); + if (bytesWritten != size) { + std::abort(); + } + return file; +} + +template <typename Generator> +string generateInputFile(Generator& gen) { + // Use inputs ranging from 1 Byte to 2^16 Bytes + std::uniform_int_distribution<size_t> size{1, 1 << 16}; + std::uniform_real_distribution<> prob{0, 1}; + return writeData(size(gen), prob(gen), prob(gen), gen()); +} + +template <typename Generator> +Options generateOptions(Generator& gen, const string& inputFile) { + Options options; + options.inputFiles = {inputFile}; + options.overwrite = true; + + std::uniform_int_distribution<unsigned> numThreads{1, 32}; + std::uniform_int_distribution<unsigned> compressionLevel{1, 10}; + + options.numThreads = numThreads(gen); + options.compressionLevel = compressionLevel(gen); + + return options; +} +} + +int main() { + std::mt19937 gen(std::random_device{}()); + + auto newlineGuard = makeScopeGuard([] { std::fprintf(stderr, "\n"); }); + for (unsigned i = 0; i < 10000; ++i) { + if (i % 100 == 0) { + std::fprintf(stderr, "Progress: %u%%\r", i / 100); + } + auto inputFile = generateInputFile(gen); + auto inputGuard = makeScopeGuard([&] { std::remove(inputFile.c_str()); }); + for (unsigned i = 0; i < 10; ++i) { + auto options = generateOptions(gen, inputFile); + if (!roundTrip(options)) { + std::fprintf(stderr, "numThreads: %u\n", options.numThreads); + std::fprintf(stderr, "level: %u\n", options.compressionLevel); + std::fprintf(stderr, "decompress? %u\n", (unsigned)options.decompress); + std::fprintf(stderr, "file: %s\n", inputFile.c_str()); + return 1; + } + } + } + return 0; +} diff --git a/src/zstd/contrib/pzstd/utils/BUCK b/src/zstd/contrib/pzstd/utils/BUCK new file mode 100644 index 00000000..e757f412 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/BUCK @@ -0,0 +1,75 @@ +cxx_library( + name='buffer', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['Buffer.h'], + deps=[':range'], +) + +cxx_library( + name='file_system', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['FileSystem.h'], + deps=[':range'], +) + +cxx_library( + name='likely', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['Likely.h'], +) + +cxx_library( + name='range', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['Range.h'], + deps=[':likely'], +) + +cxx_library( + name='resource_pool', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['ResourcePool.h'], +) + +cxx_library( + name='scope_guard', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['ScopeGuard.h'], +) + +cxx_library( + name='thread_pool', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['ThreadPool.h'], + deps=[':work_queue'], +) + +cxx_library( + name='work_queue', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['WorkQueue.h'], + deps=[':buffer'], +) + +cxx_library( + name='utils', + visibility=['PUBLIC'], + deps=[ + ':buffer', + ':file_system', + ':likely', + ':range', + ':resource_pool', + ':scope_guard', + ':thread_pool', + ':work_queue', + ], +) diff --git a/src/zstd/contrib/pzstd/utils/Buffer.h b/src/zstd/contrib/pzstd/utils/Buffer.h new file mode 100644 index 00000000..f69c3b4d --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/Buffer.h @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Range.h" + +#include <array> +#include <cstddef> +#include <memory> + +namespace pzstd { + +/** + * A `Buffer` has a pointer to a shared buffer, and a range of the buffer that + * it owns. + * The idea is that you can allocate one buffer, and write chunks into it + * and break off those chunks. + * The underlying buffer is reference counted, and will be destroyed when all + * `Buffer`s that reference it are destroyed. + */ +class Buffer { + std::shared_ptr<unsigned char> buffer_; + MutableByteRange range_; + + static void delete_buffer(unsigned char* buffer) { + delete[] buffer; + } + + public: + /// Construct an empty buffer that owns no data. + explicit Buffer() {} + + /// Construct a `Buffer` that owns a new underlying buffer of size `size`. + explicit Buffer(std::size_t size) + : buffer_(new unsigned char[size], delete_buffer), + range_(buffer_.get(), buffer_.get() + size) {} + + explicit Buffer(std::shared_ptr<unsigned char> buffer, MutableByteRange data) + : buffer_(buffer), range_(data) {} + + Buffer(Buffer&&) = default; + Buffer& operator=(Buffer&&) & = default; + + /** + * Splits the data into two pieces: [begin, begin + n), [begin + n, end). + * Their data both points into the same underlying buffer. + * Modifies the original `Buffer` to point to only [begin + n, end). + * + * @param n The offset to split at. + * @returns A buffer that owns the data [begin, begin + n). + */ + Buffer splitAt(std::size_t n) { + auto firstPiece = range_.subpiece(0, n); + range_.advance(n); + return Buffer(buffer_, firstPiece); + } + + /// Modifies the buffer to point to the range [begin + n, end). + void advance(std::size_t n) { + range_.advance(n); + } + + /// Modifies the buffer to point to the range [begin, end - n). + void subtract(std::size_t n) { + range_.subtract(n); + } + + /// Returns a read only `Range` pointing to the `Buffer`s data. + ByteRange range() const { + return range_; + } + /// Returns a mutable `Range` pointing to the `Buffer`s data. + MutableByteRange range() { + return range_; + } + + const unsigned char* data() const { + return range_.data(); + } + + unsigned char* data() { + return range_.data(); + } + + std::size_t size() const { + return range_.size(); + } + + bool empty() const { + return range_.empty(); + } +}; +} diff --git a/src/zstd/contrib/pzstd/utils/FileSystem.h b/src/zstd/contrib/pzstd/utils/FileSystem.h new file mode 100644 index 00000000..3cfbe86e --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/FileSystem.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Range.h" + +#include <sys/stat.h> +#include <cerrno> +#include <cstdint> +#include <system_error> + +// A small subset of `std::filesystem`. +// `std::filesystem` should be a drop in replacement. +// See http://en.cppreference.com/w/cpp/filesystem for documentation. + +namespace pzstd { + +// using file_status = ... causes gcc to emit a false positive warning +#if defined(_MSC_VER) +typedef struct ::_stat64 file_status; +#else +typedef struct ::stat file_status; +#endif + +/// http://en.cppreference.com/w/cpp/filesystem/status +inline file_status status(StringPiece path, std::error_code& ec) noexcept { + file_status status; +#if defined(_MSC_VER) + const auto error = ::_stat64(path.data(), &status); +#else + const auto error = ::stat(path.data(), &status); +#endif + if (error) { + ec.assign(errno, std::generic_category()); + } else { + ec.clear(); + } + return status; +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file +inline bool is_regular_file(file_status status) noexcept { +#if defined(S_ISREG) + return S_ISREG(status.st_mode); +#elif !defined(S_ISREG) && defined(S_IFMT) && defined(S_IFREG) + return (status.st_mode & S_IFMT) == S_IFREG; +#else + static_assert(false, "No POSIX stat() support."); +#endif +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file +inline bool is_regular_file(StringPiece path, std::error_code& ec) noexcept { + return is_regular_file(status(path, ec)); +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_directory +inline bool is_directory(file_status status) noexcept { +#if defined(S_ISDIR) + return S_ISDIR(status.st_mode); +#elif !defined(S_ISDIR) && defined(S_IFMT) && defined(S_IFDIR) + return (status.st_mode & S_IFMT) == S_IFDIR; +#else + static_assert(false, "NO POSIX stat() support."); +#endif +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_directory +inline bool is_directory(StringPiece path, std::error_code& ec) noexcept { + return is_directory(status(path, ec)); +} + +/// http://en.cppreference.com/w/cpp/filesystem/file_size +inline std::uintmax_t file_size( + StringPiece path, + std::error_code& ec) noexcept { + auto stat = status(path, ec); + if (ec) { + return -1; + } + if (!is_regular_file(stat)) { + ec.assign(ENOTSUP, std::generic_category()); + return -1; + } + ec.clear(); + return stat.st_size; +} +} diff --git a/src/zstd/contrib/pzstd/utils/Likely.h b/src/zstd/contrib/pzstd/utils/Likely.h new file mode 100644 index 00000000..7cea8da2 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/Likely.h @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ + +/** + * Compiler hints to indicate the fast path of an "if" branch: whether + * the if condition is likely to be true or false. + * + * @author Tudor Bosman (tudorb@fb.com) + */ + +#pragma once + +#undef LIKELY +#undef UNLIKELY + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif diff --git a/src/zstd/contrib/pzstd/utils/Range.h b/src/zstd/contrib/pzstd/utils/Range.h new file mode 100644 index 00000000..7e2559cc --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/Range.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ + +/** + * A subset of `folly/Range.h`. + * All code copied verbatiam modulo formatting + */ +#pragma once + +#include "utils/Likely.h" + +#include <cstddef> +#include <cstring> +#include <stdexcept> +#include <string> +#include <type_traits> + +namespace pzstd { + +namespace detail { +/* + *Use IsCharPointer<T>::type to enable const char* or char*. + *Use IsCharPointer<T>::const_type to enable only const char*. +*/ +template <class T> +struct IsCharPointer {}; + +template <> +struct IsCharPointer<char*> { + typedef int type; +}; + +template <> +struct IsCharPointer<const char*> { + typedef int const_type; + typedef int type; +}; + +} // namespace detail + +template <typename Iter> +class Range { + Iter b_; + Iter e_; + + public: + using size_type = std::size_t; + using iterator = Iter; + using const_iterator = Iter; + using value_type = typename std::remove_reference< + typename std::iterator_traits<Iter>::reference>::type; + using reference = typename std::iterator_traits<Iter>::reference; + + constexpr Range() : b_(), e_() {} + constexpr Range(Iter begin, Iter end) : b_(begin), e_(end) {} + + constexpr Range(Iter begin, size_type size) : b_(begin), e_(begin + size) {} + + template <class T = Iter, typename detail::IsCharPointer<T>::type = 0> + /* implicit */ Range(Iter str) : b_(str), e_(str + std::strlen(str)) {} + + template <class T = Iter, typename detail::IsCharPointer<T>::const_type = 0> + /* implicit */ Range(const std::string& str) + : b_(str.data()), e_(b_ + str.size()) {} + + // Allow implicit conversion from Range<From> to Range<To> if From is + // implicitly convertible to To. + template < + class OtherIter, + typename std::enable_if< + (!std::is_same<Iter, OtherIter>::value && + std::is_convertible<OtherIter, Iter>::value), + int>::type = 0> + constexpr /* implicit */ Range(const Range<OtherIter>& other) + : b_(other.begin()), e_(other.end()) {} + + Range(const Range&) = default; + Range(Range&&) = default; + + Range& operator=(const Range&) & = default; + Range& operator=(Range&&) & = default; + + constexpr size_type size() const { + return e_ - b_; + } + bool empty() const { + return b_ == e_; + } + Iter data() const { + return b_; + } + Iter begin() const { + return b_; + } + Iter end() const { + return e_; + } + + void advance(size_type n) { + if (UNLIKELY(n > size())) { + throw std::out_of_range("index out of range"); + } + b_ += n; + } + + void subtract(size_type n) { + if (UNLIKELY(n > size())) { + throw std::out_of_range("index out of range"); + } + e_ -= n; + } + + Range subpiece(size_type first, size_type length = std::string::npos) const { + if (UNLIKELY(first > size())) { + throw std::out_of_range("index out of range"); + } + + return Range(b_ + first, std::min(length, size() - first)); + } +}; + +using ByteRange = Range<const unsigned char*>; +using MutableByteRange = Range<unsigned char*>; +using StringPiece = Range<const char*>; +} diff --git a/src/zstd/contrib/pzstd/utils/ResourcePool.h b/src/zstd/contrib/pzstd/utils/ResourcePool.h new file mode 100644 index 00000000..a6ff5ffc --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/ResourcePool.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include <cassert> +#include <functional> +#include <memory> +#include <mutex> +#include <vector> + +namespace pzstd { + +/** + * An unbounded pool of resources. + * A `ResourcePool<T>` requires a factory function that takes allocates `T*` and + * a free function that frees a `T*`. + * Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr` + * to a `T`, and when it goes out of scope the resource will be returned to the + * pool. + * The `ResourcePool<T>` *must* survive longer than any resources it hands out. + * Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean + * up the resource before or after every use. + */ +template <typename T> +class ResourcePool { + public: + class Deleter; + using Factory = std::function<T*()>; + using Free = std::function<void(T*)>; + using UniquePtr = std::unique_ptr<T, Deleter>; + + private: + std::mutex mutex_; + Factory factory_; + Free free_; + std::vector<T*> resources_; + unsigned inUse_; + + public: + /** + * Creates a `ResourcePool`. + * + * @param factory The function to use to create new resources. + * @param free The function to use to free resources created by `factory`. + */ + ResourcePool(Factory factory, Free free) + : factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {} + + /** + * @returns A unique pointer to a resource. The resource is null iff + * there are no avaiable resources and `factory()` returns null. + */ + UniquePtr get() { + std::lock_guard<std::mutex> lock(mutex_); + if (!resources_.empty()) { + UniquePtr resource{resources_.back(), Deleter{*this}}; + resources_.pop_back(); + ++inUse_; + return resource; + } + UniquePtr resource{factory_(), Deleter{*this}}; + ++inUse_; + return resource; + } + + ~ResourcePool() noexcept { + assert(inUse_ == 0); + for (const auto resource : resources_) { + free_(resource); + } + } + + class Deleter { + ResourcePool *pool_; + public: + explicit Deleter(ResourcePool &pool) : pool_(&pool) {} + + void operator() (T *resource) { + std::lock_guard<std::mutex> lock(pool_->mutex_); + // Make sure we don't put null resources into the pool + if (resource) { + pool_->resources_.push_back(resource); + } + assert(pool_->inUse_ > 0); + --pool_->inUse_; + } + }; +}; + +} diff --git a/src/zstd/contrib/pzstd/utils/ScopeGuard.h b/src/zstd/contrib/pzstd/utils/ScopeGuard.h new file mode 100644 index 00000000..31768f43 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/ScopeGuard.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include <utility> + +namespace pzstd { + +/** + * Dismissable scope guard. + * `Function` must be callable and take no parameters. + * Unless `dissmiss()` is called, the callable is executed upon destruction of + * `ScopeGuard`. + * + * Example: + * + * auto guard = makeScopeGuard([&] { cleanup(); }); + */ +template <typename Function> +class ScopeGuard { + Function function; + bool dismissed; + + public: + explicit ScopeGuard(Function&& function) + : function(std::move(function)), dismissed(false) {} + + void dismiss() { + dismissed = true; + } + + ~ScopeGuard() noexcept { + if (!dismissed) { + function(); + } + } +}; + +/// Creates a scope guard from `function`. +template <typename Function> +ScopeGuard<Function> makeScopeGuard(Function&& function) { + return ScopeGuard<Function>(std::forward<Function>(function)); +} +} diff --git a/src/zstd/contrib/pzstd/utils/ThreadPool.h b/src/zstd/contrib/pzstd/utils/ThreadPool.h new file mode 100644 index 00000000..8ece8e0d --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/ThreadPool.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/WorkQueue.h" + +#include <cstddef> +#include <functional> +#include <thread> +#include <vector> + +namespace pzstd { +/// A simple thread pool that pulls tasks off its queue in FIFO order. +class ThreadPool { + std::vector<std::thread> threads_; + + WorkQueue<std::function<void()>> tasks_; + + public: + /// Constructs a thread pool with `numThreads` threads. + explicit ThreadPool(std::size_t numThreads) { + threads_.reserve(numThreads); + for (std::size_t i = 0; i < numThreads; ++i) { + threads_.emplace_back([this] { + std::function<void()> task; + while (tasks_.pop(task)) { + task(); + } + }); + } + } + + /// Finishes all tasks currently in the queue. + ~ThreadPool() { + tasks_.finish(); + for (auto& thread : threads_) { + thread.join(); + } + } + + /** + * Adds `task` to the queue of tasks to execute. Since `task` is a + * `std::function<>`, it cannot be a move only type. So any lambda passed must + * not capture move only types (like `std::unique_ptr`). + * + * @param task The task to execute. + */ + void add(std::function<void()> task) { + tasks_.push(std::move(task)); + } +}; +} diff --git a/src/zstd/contrib/pzstd/utils/WorkQueue.h b/src/zstd/contrib/pzstd/utils/WorkQueue.h new file mode 100644 index 00000000..1d14d922 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/WorkQueue.h @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Buffer.h" + +#include <atomic> +#include <cassert> +#include <cstddef> +#include <condition_variable> +#include <cstddef> +#include <functional> +#include <mutex> +#include <queue> + +namespace pzstd { + +/// Unbounded thread-safe work queue. +template <typename T> +class WorkQueue { + // Protects all member variable access + std::mutex mutex_; + std::condition_variable readerCv_; + std::condition_variable writerCv_; + std::condition_variable finishCv_; + + std::queue<T> queue_; + bool done_; + std::size_t maxSize_; + + // Must have lock to call this function + bool full() const { + if (maxSize_ == 0) { + return false; + } + return queue_.size() >= maxSize_; + } + + public: + /** + * Constructs an empty work queue with an optional max size. + * If `maxSize == 0` the queue size is unbounded. + * + * @param maxSize The maximum allowed size of the work queue. + */ + WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} + + /** + * Push an item onto the work queue. Notify a single thread that work is + * available. If `finish()` has been called, do nothing and return false. + * If `push()` returns false, then `item` has not been moved from. + * + * @param item Item to push onto the queue. + * @returns True upon success, false if `finish()` has been called. An + * item was pushed iff `push()` returns true. + */ + bool push(T&& item) { + { + std::unique_lock<std::mutex> lock(mutex_); + while (full() && !done_) { + writerCv_.wait(lock); + } + if (done_) { + return false; + } + queue_.push(std::move(item)); + } + readerCv_.notify_one(); + return true; + } + + /** + * Attempts to pop an item off the work queue. It will block until data is + * available or `finish()` has been called. + * + * @param[out] item If `pop` returns `true`, it contains the popped item. + * If `pop` returns `false`, it is unmodified. + * @returns True upon success. False if the queue is empty and + * `finish()` has been called. + */ + bool pop(T& item) { + { + std::unique_lock<std::mutex> lock(mutex_); + while (queue_.empty() && !done_) { + readerCv_.wait(lock); + } + if (queue_.empty()) { + assert(done_); + return false; + } + item = std::move(queue_.front()); + queue_.pop(); + } + writerCv_.notify_one(); + return true; + } + + /** + * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. + * + * @param maxSize The new maximum queue size. + */ + void setMaxSize(std::size_t maxSize) { + { + std::lock_guard<std::mutex> lock(mutex_); + maxSize_ = maxSize; + } + writerCv_.notify_all(); + } + + /** + * Promise that `push()` won't be called again, so once the queue is empty + * there will never any more work. + */ + void finish() { + { + std::lock_guard<std::mutex> lock(mutex_); + assert(!done_); + done_ = true; + } + readerCv_.notify_all(); + writerCv_.notify_all(); + finishCv_.notify_all(); + } + + /// Blocks until `finish()` has been called (but the queue may not be empty). + void waitUntilFinished() { + std::unique_lock<std::mutex> lock(mutex_); + while (!done_) { + finishCv_.wait(lock); + } + } +}; + +/// Work queue for `Buffer`s that knows the total number of bytes in the queue. +class BufferWorkQueue { + WorkQueue<Buffer> queue_; + std::atomic<std::size_t> size_; + + public: + BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {} + + void push(Buffer buffer) { + size_.fetch_add(buffer.size()); + queue_.push(std::move(buffer)); + } + + bool pop(Buffer& buffer) { + bool result = queue_.pop(buffer); + if (result) { + size_.fetch_sub(buffer.size()); + } + return result; + } + + void setMaxSize(std::size_t maxSize) { + queue_.setMaxSize(maxSize); + } + + void finish() { + queue_.finish(); + } + + /** + * Blocks until `finish()` has been called. + * + * @returns The total number of bytes of all the `Buffer`s currently in the + * queue. + */ + std::size_t size() { + queue_.waitUntilFinished(); + return size_.load(); + } +}; +} diff --git a/src/zstd/contrib/pzstd/utils/test/BUCK b/src/zstd/contrib/pzstd/utils/test/BUCK new file mode 100644 index 00000000..a5113cab --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/BUCK @@ -0,0 +1,35 @@ +cxx_test( + name='buffer_test', + srcs=['BufferTest.cpp'], + deps=['//contrib/pzstd/utils:buffer'], +) + +cxx_test( + name='range_test', + srcs=['RangeTest.cpp'], + deps=['//contrib/pzstd/utils:range'], +) + +cxx_test( + name='resource_pool_test', + srcs=['ResourcePoolTest.cpp'], + deps=['//contrib/pzstd/utils:resource_pool'], +) + +cxx_test( + name='scope_guard_test', + srcs=['ScopeGuardTest.cpp'], + deps=['//contrib/pzstd/utils:scope_guard'], +) + +cxx_test( + name='thread_pool_test', + srcs=['ThreadPoolTest.cpp'], + deps=['//contrib/pzstd/utils:thread_pool'], +) + +cxx_test( + name='work_queue_test', + srcs=['RangeTest.cpp'], + deps=['//contrib/pzstd/utils:work_queue'], +) diff --git a/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp b/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp new file mode 100644 index 00000000..fbba74e8 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/Buffer.h" +#include "utils/Range.h" + +#include <gtest/gtest.h> +#include <memory> + +using namespace pzstd; + +namespace { +void deleter(const unsigned char* buf) { + delete[] buf; +} +} + +TEST(Buffer, Constructors) { + Buffer empty; + EXPECT_TRUE(empty.empty()); + EXPECT_EQ(0, empty.size()); + + Buffer sized(5); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); + + Buffer moved(std::move(sized)); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); + + Buffer assigned; + assigned = std::move(moved); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); +} + +TEST(Buffer, BufferManagement) { + std::shared_ptr<unsigned char> buf(new unsigned char[10], deleter); + { + Buffer acquired(buf, MutableByteRange(buf.get(), buf.get() + 10)); + EXPECT_EQ(2, buf.use_count()); + Buffer moved(std::move(acquired)); + EXPECT_EQ(2, buf.use_count()); + Buffer assigned; + assigned = std::move(moved); + EXPECT_EQ(2, buf.use_count()); + + Buffer split = assigned.splitAt(5); + EXPECT_EQ(3, buf.use_count()); + + split.advance(1); + assigned.subtract(1); + EXPECT_EQ(3, buf.use_count()); + } + EXPECT_EQ(1, buf.use_count()); +} + +TEST(Buffer, Modifiers) { + Buffer buf(10); + { + unsigned char i = 0; + for (auto& byte : buf.range()) { + byte = i++; + } + } + + auto prefix = buf.splitAt(2); + + ASSERT_EQ(2, prefix.size()); + EXPECT_EQ(0, *prefix.data()); + + ASSERT_EQ(8, buf.size()); + EXPECT_EQ(2, *buf.data()); + + buf.advance(2); + EXPECT_EQ(4, *buf.data()); + + EXPECT_EQ(9, *(buf.range().end() - 1)); + + buf.subtract(2); + EXPECT_EQ(7, *(buf.range().end() - 1)); + + EXPECT_EQ(4, buf.size()); +} diff --git a/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp b/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp new file mode 100644 index 00000000..755b50fa --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/Range.h" + +#include <gtest/gtest.h> +#include <string> + +using namespace pzstd; + +// Range is directly copied from folly. +// Just some sanity tests to make sure everything seems to work. + +TEST(Range, Constructors) { + StringPiece empty; + EXPECT_TRUE(empty.empty()); + EXPECT_EQ(0, empty.size()); + + std::string str = "hello"; + { + Range<std::string::const_iterator> piece(str.begin(), str.end()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str.data(), str.size()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str.c_str()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } +} + +TEST(Range, Modifiers) { + StringPiece range("hello world"); + ASSERT_EQ(11, range.size()); + + { + auto hello = range.subpiece(0, 5); + EXPECT_EQ(5, hello.size()); + EXPECT_EQ('h', *hello.data()); + EXPECT_EQ('o', *(hello.end() - 1)); + } + { + auto hello = range; + hello.subtract(6); + EXPECT_EQ(5, hello.size()); + EXPECT_EQ('h', *hello.data()); + EXPECT_EQ('o', *(hello.end() - 1)); + } + { + auto world = range; + world.advance(6); + EXPECT_EQ(5, world.size()); + EXPECT_EQ('w', *world.data()); + EXPECT_EQ('d', *(world.end() - 1)); + } + + std::string expected = "hello world"; + EXPECT_EQ(expected, std::string(range.begin(), range.end())); + EXPECT_EQ(expected, std::string(range.data(), range.size())); +} diff --git a/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp b/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp new file mode 100644 index 00000000..6fe14518 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/ResourcePool.h" + +#include <gtest/gtest.h> +#include <atomic> +#include <thread> + +using namespace pzstd; + +TEST(ResourcePool, FullTest) { + unsigned numCreated = 0; + unsigned numDeleted = 0; + { + ResourcePool<int> pool( + [&numCreated] { ++numCreated; return new int{5}; }, + [&numDeleted](int *x) { ++numDeleted; delete x; }); + + { + auto i = pool.get(); + EXPECT_EQ(5, *i); + *i = 6; + } + { + auto i = pool.get(); + EXPECT_EQ(6, *i); + auto j = pool.get(); + EXPECT_EQ(5, *j); + *j = 7; + } + { + auto i = pool.get(); + EXPECT_EQ(6, *i); + auto j = pool.get(); + EXPECT_EQ(7, *j); + } + } + EXPECT_EQ(2, numCreated); + EXPECT_EQ(numCreated, numDeleted); +} + +TEST(ResourcePool, ThreadSafe) { + std::atomic<unsigned> numCreated{0}; + std::atomic<unsigned> numDeleted{0}; + { + ResourcePool<int> pool( + [&numCreated] { ++numCreated; return new int{0}; }, + [&numDeleted](int *x) { ++numDeleted; delete x; }); + auto push = [&pool] { + for (int i = 0; i < 100; ++i) { + auto x = pool.get(); + ++*x; + } + }; + std::thread t1{push}; + std::thread t2{push}; + t1.join(); + t2.join(); + + auto x = pool.get(); + auto y = pool.get(); + EXPECT_EQ(200, *x + *y); + } + EXPECT_GE(2, numCreated); + EXPECT_EQ(numCreated, numDeleted); +} diff --git a/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp b/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp new file mode 100644 index 00000000..7bc624da --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/ScopeGuard.h" + +#include <gtest/gtest.h> + +using namespace pzstd; + +TEST(ScopeGuard, Dismiss) { + { + auto guard = makeScopeGuard([&] { EXPECT_TRUE(false); }); + guard.dismiss(); + } +} + +TEST(ScopeGuard, Executes) { + bool executed = false; + { + auto guard = makeScopeGuard([&] { executed = true; }); + } + EXPECT_TRUE(executed); +} diff --git a/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp b/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp new file mode 100644 index 00000000..703fd4c9 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/ThreadPool.h" + +#include <gtest/gtest.h> +#include <atomic> +#include <iostream> +#include <thread> +#include <vector> + +using namespace pzstd; + +TEST(ThreadPool, Ordering) { + std::vector<int> results; + + { + ThreadPool executor(1); + for (int i = 0; i < 10; ++i) { + executor.add([ &results, i ] { results.push_back(i); }); + } + } + + for (int i = 0; i < 10; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(ThreadPool, AllJobsFinished) { + std::atomic<unsigned> numFinished{0}; + std::atomic<bool> start{false}; + { + std::cerr << "Creating executor" << std::endl; + ThreadPool executor(5); + for (int i = 0; i < 10; ++i) { + executor.add([ &numFinished, &start ] { + while (!start.load()) { + std::this_thread::yield(); + } + ++numFinished; + }); + } + std::cerr << "Starting" << std::endl; + start.store(true); + std::cerr << "Finishing" << std::endl; + } + EXPECT_EQ(10, numFinished.load()); +} + +TEST(ThreadPool, AddJobWhileJoining) { + std::atomic<bool> done{false}; + { + ThreadPool executor(1); + executor.add([&executor, &done] { + while (!done.load()) { + std::this_thread::yield(); + } + // Sleep for a second to be sure that we are joining + std::this_thread::sleep_for(std::chrono::seconds(1)); + executor.add([] { + EXPECT_TRUE(false); + }); + }); + done.store(true); + } +} diff --git a/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp b/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp new file mode 100644 index 00000000..14cf7730 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -0,0 +1,282 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/Buffer.h" +#include "utils/WorkQueue.h" + +#include <gtest/gtest.h> +#include <iostream> +#include <memory> +#include <mutex> +#include <thread> +#include <vector> + +using namespace pzstd; + +namespace { +struct Popper { + WorkQueue<int>* queue; + int* results; + std::mutex* mutex; + + void operator()() { + int result; + while (queue->pop(result)) { + std::lock_guard<std::mutex> lock(*mutex); + results[result] = result; + } + } +}; +} + +TEST(WorkQueue, SingleThreaded) { + WorkQueue<int> queue; + int result; + + queue.push(5); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + + queue.push(1); + queue.push(2); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + + queue.push(1); + queue.push(2); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + EXPECT_FALSE(queue.pop(result)); + + queue.waitUntilFinished(); +} + +TEST(WorkQueue, SPSC) { + WorkQueue<int> queue; + const int max = 100; + + for (int i = 0; i < 10; ++i) { + queue.push(int{i}); + } + + std::thread thread([ &queue, max ] { + int result; + for (int i = 0;; ++i) { + if (!queue.pop(result)) { + EXPECT_EQ(i, max); + break; + } + EXPECT_EQ(i, result); + } + }); + + std::this_thread::yield(); + for (int i = 10; i < max; ++i) { + queue.push(int{i}); + } + queue.finish(); + + thread.join(); +} + +TEST(WorkQueue, SPMC) { + WorkQueue<int> queue; + std::vector<int> results(50, -1); + std::mutex mutex; + std::vector<std::thread> threads; + for (int i = 0; i < 5; ++i) { + threads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + for (int i = 0; i < 50; ++i) { + queue.push(int{i}); + } + queue.finish(); + + for (auto& thread : threads) { + thread.join(); + } + + for (int i = 0; i < 50; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, MPMC) { + WorkQueue<int> queue; + std::vector<int> results(100, -1); + std::mutex mutex; + std::vector<std::thread> popperThreads; + for (int i = 0; i < 4; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::vector<std::thread> pusherThreads; + for (int i = 0; i < 2; ++i) { + auto min = i * 50; + auto max = (i + 1) * 50; + pusherThreads.emplace_back( + [ &queue, min, max ] { + for (int i = min; i < max; ++i) { + queue.push(int{i}); + } + }); + } + + for (auto& thread : pusherThreads) { + thread.join(); + } + queue.finish(); + + for (auto& thread : popperThreads) { + thread.join(); + } + + for (int i = 0; i < 100; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, BoundedSizeWorks) { + WorkQueue<int> queue(1); + int result; + queue.push(5); + queue.pop(result); + queue.push(5); + queue.pop(result); + queue.push(5); + queue.finish(); + queue.pop(result); + EXPECT_EQ(5, result); +} + +TEST(WorkQueue, BoundedSizePushAfterFinish) { + WorkQueue<int> queue(1); + int result; + queue.push(5); + std::thread pusher([&queue] { + queue.push(6); + }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, SetMaxSize) { + WorkQueue<int> queue(2); + int result; + queue.push(5); + queue.push(6); + queue.setMaxSize(1); + std::thread pusher([&queue] { + queue.push(7); + }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(6, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, BoundedSizeMPMC) { + WorkQueue<int> queue(10); + std::vector<int> results(200, -1); + std::mutex mutex; + std::cerr << "Creating popperThreads" << std::endl; + std::vector<std::thread> popperThreads; + for (int i = 0; i < 4; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::cerr << "Creating pusherThreads" << std::endl; + std::vector<std::thread> pusherThreads; + for (int i = 0; i < 2; ++i) { + auto min = i * 100; + auto max = (i + 1) * 100; + pusherThreads.emplace_back( + [ &queue, min, max ] { + for (int i = min; i < max; ++i) { + queue.push(int{i}); + } + }); + } + + std::cerr << "Joining pusherThreads" << std::endl; + for (auto& thread : pusherThreads) { + thread.join(); + } + std::cerr << "Finishing queue" << std::endl; + queue.finish(); + + std::cerr << "Joining popperThreads" << std::endl; + for (auto& thread : popperThreads) { + thread.join(); + } + + std::cerr << "Inspecting results" << std::endl; + for (int i = 0; i < 200; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, FailedPush) { + WorkQueue<std::unique_ptr<int>> queue; + std::unique_ptr<int> x(new int{5}); + EXPECT_TRUE(queue.push(std::move(x))); + EXPECT_EQ(nullptr, x); + queue.finish(); + x.reset(new int{6}); + EXPECT_FALSE(queue.push(std::move(x))); + EXPECT_NE(nullptr, x); + EXPECT_EQ(6, *x); +} + +TEST(BufferWorkQueue, SizeCalculatedCorrectly) { + { + BufferWorkQueue queue; + queue.finish(); + EXPECT_EQ(0, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.finish(); + EXPECT_EQ(10, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.push(Buffer(5)); + queue.finish(); + EXPECT_EQ(15, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.push(Buffer(5)); + queue.finish(); + Buffer buffer; + queue.pop(buffer); + EXPECT_EQ(5, queue.size()); + } +} |