summaryrefslogtreecommitdiffstats
path: root/src/zstd/contrib/pzstd
diff options
context:
space:
mode:
Diffstat (limited to 'src/zstd/contrib/pzstd')
-rw-r--r--src/zstd/contrib/pzstd/.gitignore2
-rw-r--r--src/zstd/contrib/pzstd/BUCK72
-rw-r--r--src/zstd/contrib/pzstd/ErrorHolder.h54
-rw-r--r--src/zstd/contrib/pzstd/Logging.h72
-rw-r--r--src/zstd/contrib/pzstd/Makefile271
-rw-r--r--src/zstd/contrib/pzstd/Options.cpp424
-rw-r--r--src/zstd/contrib/pzstd/Options.h68
-rw-r--r--src/zstd/contrib/pzstd/Pzstd.cpp611
-rw-r--r--src/zstd/contrib/pzstd/Pzstd.h150
-rw-r--r--src/zstd/contrib/pzstd/README.md56
-rw-r--r--src/zstd/contrib/pzstd/SkippableFrame.cpp30
-rw-r--r--src/zstd/contrib/pzstd/SkippableFrame.h64
-rw-r--r--src/zstd/contrib/pzstd/images/Cspeed.pngbin0 -> 69804 bytes
-rw-r--r--src/zstd/contrib/pzstd/images/Dspeed.pngbin0 -> 26335 bytes
-rw-r--r--src/zstd/contrib/pzstd/main.cpp27
-rw-r--r--src/zstd/contrib/pzstd/test/BUCK37
-rw-r--r--src/zstd/contrib/pzstd/test/OptionsTest.cpp536
-rw-r--r--src/zstd/contrib/pzstd/test/PzstdTest.cpp149
-rw-r--r--src/zstd/contrib/pzstd/test/RoundTrip.h86
-rw-r--r--src/zstd/contrib/pzstd/test/RoundTripTest.cpp86
-rw-r--r--src/zstd/contrib/pzstd/utils/BUCK75
-rw-r--r--src/zstd/contrib/pzstd/utils/Buffer.h99
-rw-r--r--src/zstd/contrib/pzstd/utils/FileSystem.h94
-rw-r--r--src/zstd/contrib/pzstd/utils/Likely.h28
-rw-r--r--src/zstd/contrib/pzstd/utils/Range.h131
-rw-r--r--src/zstd/contrib/pzstd/utils/ResourcePool.h96
-rw-r--r--src/zstd/contrib/pzstd/utils/ScopeGuard.h50
-rw-r--r--src/zstd/contrib/pzstd/utils/ThreadPool.h58
-rw-r--r--src/zstd/contrib/pzstd/utils/WorkQueue.h181
-rw-r--r--src/zstd/contrib/pzstd/utils/test/BUCK35
-rw-r--r--src/zstd/contrib/pzstd/utils/test/BufferTest.cpp89
-rw-r--r--src/zstd/contrib/pzstd/utils/test/RangeTest.cpp82
-rw-r--r--src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp72
-rw-r--r--src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp28
-rw-r--r--src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp71
-rw-r--r--src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp282
36 files changed, 4266 insertions, 0 deletions
diff --git a/src/zstd/contrib/pzstd/.gitignore b/src/zstd/contrib/pzstd/.gitignore
new file mode 100644
index 000000000..84e68fb07
--- /dev/null
+++ b/src/zstd/contrib/pzstd/.gitignore
@@ -0,0 +1,2 @@
+# compilation result
+pzstd
diff --git a/src/zstd/contrib/pzstd/BUCK b/src/zstd/contrib/pzstd/BUCK
new file mode 100644
index 000000000..d04eeedd8
--- /dev/null
+++ b/src/zstd/contrib/pzstd/BUCK
@@ -0,0 +1,72 @@
+cxx_library(
+ name='libpzstd',
+ visibility=['PUBLIC'],
+ header_namespace='',
+ exported_headers=[
+ 'ErrorHolder.h',
+ 'Logging.h',
+ 'Pzstd.h',
+ ],
+ headers=[
+ 'SkippableFrame.h',
+ ],
+ srcs=[
+ 'Pzstd.cpp',
+ 'SkippableFrame.cpp',
+ ],
+ deps=[
+ ':options',
+ '//contrib/pzstd/utils:utils',
+ '//lib:mem',
+ '//lib:zstd',
+ ],
+)
+
+cxx_library(
+ name='options',
+ visibility=['PUBLIC'],
+ header_namespace='',
+ exported_headers=['Options.h'],
+ srcs=['Options.cpp'],
+ deps=[
+ '//contrib/pzstd/utils:scope_guard',
+ '//lib:zstd',
+ '//programs:util',
+ ],
+)
+
+cxx_binary(
+ name='pzstd',
+ visibility=['PUBLIC'],
+ srcs=['main.cpp'],
+ deps=[
+ ':libpzstd',
+ ':options',
+ ],
+)
+
+# Must run "make googletest" first
+cxx_library(
+ name='gtest',
+ srcs=glob([
+ 'googletest/googletest/src/gtest-all.cc',
+ 'googletest/googlemock/src/gmock-all.cc',
+ 'googletest/googlemock/src/gmock_main.cc',
+ ]),
+ header_namespace='',
+ exported_headers=subdir_glob([
+ ('googletest/googletest/include', '**/*.h'),
+ ('googletest/googlemock/include', '**/*.h'),
+ ]),
+ headers=subdir_glob([
+ ('googletest/googletest', 'src/*.cc'),
+ ('googletest/googletest', 'src/*.h'),
+ ('googletest/googlemock', 'src/*.cc'),
+ ('googletest/googlemock', 'src/*.h'),
+ ]),
+ platform_linker_flags=[
+ ('android', []),
+ ('', ['-lpthread']),
+ ],
+ visibility=['PUBLIC'],
+)
diff --git a/src/zstd/contrib/pzstd/ErrorHolder.h b/src/zstd/contrib/pzstd/ErrorHolder.h
new file mode 100644
index 000000000..829651c59
--- /dev/null
+++ b/src/zstd/contrib/pzstd/ErrorHolder.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <stdexcept>
+#include <string>
+
+namespace pzstd {
+
+// Coordinates graceful shutdown of the pzstd pipeline
+class ErrorHolder {
+ std::atomic<bool> error_;
+ std::string message_;
+
+ public:
+ ErrorHolder() : error_(false) {}
+
+ bool hasError() noexcept {
+ return error_.load();
+ }
+
+ void setError(std::string message) noexcept {
+ // Given multiple possibly concurrent calls, exactly one will ever succeed.
+ bool expected = false;
+ if (error_.compare_exchange_strong(expected, true)) {
+ message_ = std::move(message);
+ }
+ }
+
+ bool check(bool predicate, std::string message) noexcept {
+ if (!predicate) {
+ setError(std::move(message));
+ }
+ return !hasError();
+ }
+
+ std::string getError() noexcept {
+ error_.store(false);
+ return std::move(message_);
+ }
+
+ ~ErrorHolder() {
+ assert(!hasError());
+ }
+};
+}
diff --git a/src/zstd/contrib/pzstd/Logging.h b/src/zstd/contrib/pzstd/Logging.h
new file mode 100644
index 000000000..16a63932c
--- /dev/null
+++ b/src/zstd/contrib/pzstd/Logging.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include <cstdio>
+#include <mutex>
+
+namespace pzstd {
+
+constexpr int ERROR = 1;
+constexpr int INFO = 2;
+constexpr int DEBUG = 3;
+constexpr int VERBOSE = 4;
+
+class Logger {
+ std::mutex mutex_;
+ FILE* out_;
+ const int level_;
+
+ using Clock = std::chrono::system_clock;
+ Clock::time_point lastUpdate_;
+ std::chrono::milliseconds refreshRate_;
+
+ public:
+ explicit Logger(int level, FILE* out = stderr)
+ : out_(out), level_(level), lastUpdate_(Clock::now()),
+ refreshRate_(150) {}
+
+
+ bool logsAt(int level) {
+ return level <= level_;
+ }
+
+ template <typename... Args>
+ void operator()(int level, const char *fmt, Args... args) {
+ if (level > level_) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::fprintf(out_, fmt, args...);
+ }
+
+ template <typename... Args>
+ void update(int level, const char *fmt, Args... args) {
+ if (level > level_) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto now = Clock::now();
+ if (now - lastUpdate_ > refreshRate_) {
+ lastUpdate_ = now;
+ std::fprintf(out_, "\r");
+ std::fprintf(out_, fmt, args...);
+ }
+ }
+
+ void clear(int level) {
+ if (level > level_) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::fprintf(out_, "\r%79s\r", "");
+ }
+};
+
+}
diff --git a/src/zstd/contrib/pzstd/Makefile b/src/zstd/contrib/pzstd/Makefile
new file mode 100644
index 000000000..8d2b1932e
--- /dev/null
+++ b/src/zstd/contrib/pzstd/Makefile
@@ -0,0 +1,271 @@
+# ################################################################
+# Copyright (c) 2016-present, Facebook, Inc.
+# All rights reserved.
+#
+# This source code is licensed under both the BSD-style license (found in the
+# LICENSE file in the root directory of this source tree) and the GPLv2 (found
+# in the COPYING file in the root directory of this source tree).
+# ################################################################
+
+# Standard variables for installation
+DESTDIR ?=
+PREFIX ?= /usr/local
+BINDIR := $(DESTDIR)$(PREFIX)/bin
+
+ZSTDDIR = ../../lib
+PROGDIR = ../../programs
+
+# External program to use to run tests, e.g. qemu or valgrind
+TESTPROG ?=
+# Flags to pass to the tests
+TESTFLAGS ?=
+
+# We use gcc/clang to generate the header dependencies of files
+DEPFLAGS = -MMD -MP -MF $*.Td
+POSTCOMPILE = mv -f $*.Td $*.d
+
+# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override
+CFLAGS ?= -O3 -Wall -Wextra
+CXXFLAGS ?= -O3 -Wall -Wextra -pedantic
+CPPFLAGS ?=
+LDFLAGS ?=
+
+# Include flags
+PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
+GTEST_INC = -isystem googletest/googletest/include
+
+PZSTD_CPPFLAGS = $(PZSTD_INC)
+PZSTD_CCXXFLAGS =
+PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS)
+PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS) -std=c++11
+PZSTD_LDFLAGS =
+EXTRA_FLAGS =
+ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS)
+ALL_CXXFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS)
+ALL_LDFLAGS = $(EXTRA_FLAGS) $(CXXFLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS)
+
+
+# gtest libraries need to go before "-lpthread" because they depend on it.
+GTEST_LIB = -L googletest/build/googlemock/gtest
+LIBS =
+
+# Compilation commands
+LD_COMMAND = $(CXX) $^ $(ALL_LDFLAGS) $(LIBS) -pthread -o $@
+CC_COMMAND = $(CC) $(DEPFLAGS) $(ALL_CFLAGS) -c $< -o $@
+CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $< -o $@
+
+# Get a list of all zstd files so we rebuild the static library when we need to
+ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \
+ $(wildcard $(ZSTDDIR)/common/*.h)
+ZSTDCOMP_FILES := $(wildcard $(ZSTDDIR)/compress/*.c) \
+ $(wildcard $(ZSTDDIR)/compress/*.h)
+ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \
+ $(wildcard $(ZSTDDIR)/decompress/*.h)
+ZSTDPROG_FILES := $(wildcard $(PROGDIR)/*.c) \
+ $(wildcard $(PROGDIR)/*.h)
+ZSTD_FILES := $(wildcard $(ZSTDDIR)/*.h) \
+ $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \
+ $(ZSTDPROG_FILES)
+
+# List all the pzstd source files so we can determine their dependencies
+PZSTD_SRCS := $(wildcard *.cpp)
+PZSTD_TESTS := $(wildcard test/*.cpp)
+UTILS_TESTS := $(wildcard utils/test/*.cpp)
+ALL_SRCS := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS)
+
+
+# Define *.exe as extension for Windows systems
+ifneq (,$(filter Windows%,$(OS)))
+EXT =.exe
+else
+EXT =
+endif
+
+# Standard targets
+.PHONY: default
+default: all
+
+.PHONY: test-pzstd
+test-pzstd: TESTFLAGS=--gtest_filter=-*ExtremelyLarge*
+test-pzstd: clean googletest pzstd tests check
+
+.PHONY: test-pzstd32
+test-pzstd32: clean googletest32 all32 check
+
+.PHONY: test-pzstd-tsan
+test-pzstd-tsan: LDFLAGS=-fuse-ld=gold
+test-pzstd-tsan: TESTFLAGS=--gtest_filter=-*ExtremelyLarge*
+test-pzstd-tsan: clean googletest tsan check
+
+.PHONY: test-pzstd-asan
+test-pzstd-asan: LDFLAGS=-fuse-ld=gold
+test-pzstd-asan: TESTFLAGS=--gtest_filter=-*ExtremelyLarge*
+test-pzstd-asan: clean asan check
+
+.PHONY: check
+check:
+ $(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS)
+
+.PHONY: install
+install: PZSTD_CPPFLAGS += -DNDEBUG
+install: pzstd$(EXT)
+ install -d -m 755 $(BINDIR)/
+ install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT)
+
+.PHONY: uninstall
+uninstall:
+ $(RM) $(BINDIR)/pzstd$(EXT)
+
+# Targets for many different builds
+.PHONY: all
+all: PZSTD_CPPFLAGS += -DNDEBUG
+all: pzstd$(EXT)
+
+.PHONY: debug
+debug: EXTRA_FLAGS += -g
+debug: pzstd$(EXT) tests roundtrip
+
+.PHONY: tsan
+tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC
+tsan: PZSTD_LDFLAGS += -fsanitize=thread
+tsan: debug
+
+.PHONY: asan
+asan: EXTRA_FLAGS += -fsanitize=address
+asan: debug
+
+.PHONY: ubsan
+ubsan: EXTRA_FLAGS += -fsanitize=undefined
+ubsan: debug
+
+.PHONY: all32
+all32: EXTRA_FLAGS += -m32
+all32: all tests roundtrip
+
+.PHONY: debug32
+debug32: EXTRA_FLAGS += -m32
+debug32: debug
+
+.PHONY: asan32
+asan32: EXTRA_FLAGS += -m32
+asan32: asan
+
+.PHONY: tsan32
+tsan32: EXTRA_FLAGS += -m32
+tsan32: tsan
+
+.PHONY: ubsan32
+ubsan32: EXTRA_FLAGS += -m32
+ubsan32: ubsan
+
+# Run long round trip tests
+.PHONY: roundtripcheck
+roundtripcheck: roundtrip check
+ $(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS)
+
+# Build the main binary
+pzstd$(EXT): main.o $(PROGDIR)/util.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
+ $(LD_COMMAND)
+
+# Target that depends on all the tests
+.PHONY: tests
+tests: EXTRA_FLAGS += -Wno-deprecated-declarations
+tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS)))
+
+# Build the round trip tests
+.PHONY: roundtrip
+roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations
+roundtrip: test/RoundTripTest$(EXT)
+
+# Use the static library that zstd builds for simplicity and
+# so we get the compiler options correct
+$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES)
+ CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)" $(MAKE) -C $(ZSTDDIR) libzstd.a
+
+# Rules to build the tests
+test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o \
+ $(PROGDIR)/util.o Options.o \
+ Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
+ $(LD_COMMAND)
+
+test/%Test$(EXT): PZSTD_LDFLAGS += $(GTEST_LIB)
+test/%Test$(EXT): LIBS += -lgtest -lgtest_main
+test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o \
+ $(PROGDIR)/util.o Options.o Pzstd.o \
+ SkippableFrame.o $(ZSTDDIR)/libzstd.a
+ $(LD_COMMAND)
+
+utils/test/%Test$(EXT): PZSTD_LDFLAGS += $(GTEST_LIB)
+utils/test/%Test$(EXT): LIBS += -lgtest -lgtest_main
+utils/test/%Test$(EXT): utils/test/%Test.o
+ $(LD_COMMAND)
+
+
+GTEST_CMAKEFLAGS =
+
+# Install googletest
+.PHONY: googletest
+googletest: PZSTD_CCXXFLAGS += -fPIC
+googletest:
+ @$(RM) -rf googletest
+ @git clone https://github.com/google/googletest
+ @mkdir -p googletest/build
+ @cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE)
+
+.PHONY: googletest32
+googletest32: PZSTD_CCXXFLAGS += -m32
+googletest32: googletest
+
+.PHONY: googletest-mingw64
+googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles"
+googletest-mingw64: googletest
+
+.PHONY: clean
+clean:
+ $(RM) -f *.o pzstd$(EXT) *.Td *.d
+ $(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d
+ $(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d
+ $(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d
+ $(MAKE) -C $(ZSTDDIR) clean
+ @echo Cleaning completed
+
+
+# Cancel implicit rules
+%.o: %.c
+%.o: %.cpp
+
+# Object file rules
+%.o: %.c
+ $(CC_COMMAND)
+ $(POSTCOMPILE)
+
+$(PROGDIR)/%.o: $(PROGDIR)/%.c
+ $(CC_COMMAND)
+ $(POSTCOMPILE)
+
+%.o: %.cpp
+ $(CXX_COMMAND)
+ $(POSTCOMPILE)
+
+test/%.o: PZSTD_CPPFLAGS += $(GTEST_INC)
+test/%.o: test/%.cpp
+ $(CXX_COMMAND)
+ $(POSTCOMPILE)
+
+utils/test/%.o: PZSTD_CPPFLAGS += $(GTEST_INC)
+utils/test/%.o: utils/test/%.cpp
+ $(CXX_COMMAND)
+ $(POSTCOMPILE)
+
+# Dependency file stuff
+.PRECIOUS: %.d test/%.d utils/test/%.d
+
+# Include rules that specify header file dependencies
+-include $(patsubst %,%.d,$(basename $(ALL_SRCS)))
diff --git a/src/zstd/contrib/pzstd/Options.cpp b/src/zstd/contrib/pzstd/Options.cpp
new file mode 100644
index 000000000..37292221b
--- /dev/null
+++ b/src/zstd/contrib/pzstd/Options.cpp
@@ -0,0 +1,424 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "Options.h"
+#include "util.h"
+#include "utils/ScopeGuard.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cstdio>
+#include <cstring>
+#include <iterator>
+#include <thread>
+#include <vector>
+
+
+namespace pzstd {
+
+namespace {
+unsigned defaultNumThreads() {
+#ifdef PZSTD_NUM_THREADS
+ return PZSTD_NUM_THREADS;
+#else
+ return std::thread::hardware_concurrency();
+#endif
+}
+
+unsigned parseUnsigned(const char **arg) {
+ unsigned result = 0;
+ while (**arg >= '0' && **arg <= '9') {
+ result *= 10;
+ result += **arg - '0';
+ ++(*arg);
+ }
+ return result;
+}
+
+const char *getArgument(const char *options, const char **argv, int &i,
+ int argc) {
+ if (options[1] != 0) {
+ return options + 1;
+ }
+ ++i;
+ if (i == argc) {
+ std::fprintf(stderr, "Option -%c requires an argument, but none provided\n",
+ *options);
+ return nullptr;
+ }
+ return argv[i];
+}
+
+const std::string kZstdExtension = ".zst";
+constexpr char kStdIn[] = "-";
+constexpr char kStdOut[] = "-";
+constexpr unsigned kDefaultCompressionLevel = 3;
+constexpr unsigned kMaxNonUltraCompressionLevel = 19;
+
+#ifdef _WIN32
+const char nullOutput[] = "nul";
+#else
+const char nullOutput[] = "/dev/null";
+#endif
+
+void notSupported(const char *option) {
+ std::fprintf(stderr, "Operation not supported: %s\n", option);
+}
+
+void usage() {
+ std::fprintf(stderr, "Usage:\n");
+ std::fprintf(stderr, " pzstd [args] [FILE(s)]\n");
+ std::fprintf(stderr, "Parallel ZSTD options:\n");
+ std::fprintf(stderr, " -p, --processes # : number of threads to use for (de)compression (default:<numcpus>)\n");
+
+ std::fprintf(stderr, "ZSTD options:\n");
+ std::fprintf(stderr, " -# : # compression level (1-%d, default:%d)\n", kMaxNonUltraCompressionLevel, kDefaultCompressionLevel);
+ std::fprintf(stderr, " -d, --decompress : decompression\n");
+ std::fprintf(stderr, " -o file : result stored into `file` (only if 1 input file)\n");
+ std::fprintf(stderr, " -f, --force : overwrite output without prompting, (de)compress links\n");
+ std::fprintf(stderr, " --rm : remove source file(s) after successful (de)compression\n");
+ std::fprintf(stderr, " -k, --keep : preserve source file(s) (default)\n");
+ std::fprintf(stderr, " -h, --help : display help and exit\n");
+ std::fprintf(stderr, " -V, --version : display version number and exit\n");
+ std::fprintf(stderr, " -v, --verbose : verbose mode; specify multiple times to increase log level (default:2)\n");
+ std::fprintf(stderr, " -q, --quiet : suppress warnings; specify twice to suppress errors too\n");
+ std::fprintf(stderr, " -c, --stdout : force write to standard output, even if it is the console\n");
+#ifdef UTIL_HAS_CREATEFILELIST
+ std::fprintf(stderr, " -r : operate recursively on directories\n");
+#endif
+ std::fprintf(stderr, " --ultra : enable levels beyond %i, up to %i (requires more memory)\n", kMaxNonUltraCompressionLevel, ZSTD_maxCLevel());
+ std::fprintf(stderr, " -C, --check : integrity check (default)\n");
+ std::fprintf(stderr, " --no-check : no integrity check\n");
+ std::fprintf(stderr, " -t, --test : test compressed file integrity\n");
+ std::fprintf(stderr, " -- : all arguments after \"--\" are treated as files\n");
+}
+} // anonymous namespace
+
+Options::Options()
+ : numThreads(defaultNumThreads()), maxWindowLog(23),
+ compressionLevel(kDefaultCompressionLevel), decompress(false),
+ overwrite(false), keepSource(true), writeMode(WriteMode::Auto),
+ checksum(true), verbosity(2) {}
+
+Options::Status Options::parse(int argc, const char **argv) {
+ bool test = false;
+ bool recursive = false;
+ bool ultra = false;
+ bool forceStdout = false;
+ bool followLinks = false;
+ // Local copy of input files, which are pointers into argv.
+ std::vector<const char *> localInputFiles;
+ for (int i = 1; i < argc; ++i) {
+ const char *arg = argv[i];
+ // Protect against empty arguments
+ if (arg[0] == 0) {
+ continue;
+ }
+ // Everything after "--" is an input file
+ if (!std::strcmp(arg, "--")) {
+ ++i;
+ std::copy(argv + i, argv + argc, std::back_inserter(localInputFiles));
+ break;
+ }
+ // Long arguments that don't have a short option
+ {
+ bool isLongOption = true;
+ if (!std::strcmp(arg, "--rm")) {
+ keepSource = false;
+ } else if (!std::strcmp(arg, "--ultra")) {
+ ultra = true;
+ maxWindowLog = 0;
+ } else if (!std::strcmp(arg, "--no-check")) {
+ checksum = false;
+ } else if (!std::strcmp(arg, "--sparse")) {
+ writeMode = WriteMode::Sparse;
+ notSupported("Sparse mode");
+ return Status::Failure;
+ } else if (!std::strcmp(arg, "--no-sparse")) {
+ writeMode = WriteMode::Regular;
+ notSupported("Sparse mode");
+ return Status::Failure;
+ } else if (!std::strcmp(arg, "--dictID")) {
+ notSupported(arg);
+ return Status::Failure;
+ } else if (!std::strcmp(arg, "--no-dictID")) {
+ notSupported(arg);
+ return Status::Failure;
+ } else {
+ isLongOption = false;
+ }
+ if (isLongOption) {
+ continue;
+ }
+ }
+ // Arguments with a short option simply set their short option.
+ const char *options = nullptr;
+ if (!std::strcmp(arg, "--processes")) {
+ options = "p";
+ } else if (!std::strcmp(arg, "--version")) {
+ options = "V";
+ } else if (!std::strcmp(arg, "--help")) {
+ options = "h";
+ } else if (!std::strcmp(arg, "--decompress")) {
+ options = "d";
+ } else if (!std::strcmp(arg, "--force")) {
+ options = "f";
+ } else if (!std::strcmp(arg, "--stdout")) {
+ options = "c";
+ } else if (!std::strcmp(arg, "--keep")) {
+ options = "k";
+ } else if (!std::strcmp(arg, "--verbose")) {
+ options = "v";
+ } else if (!std::strcmp(arg, "--quiet")) {
+ options = "q";
+ } else if (!std::strcmp(arg, "--check")) {
+ options = "C";
+ } else if (!std::strcmp(arg, "--test")) {
+ options = "t";
+ } else if (arg[0] == '-' && arg[1] != 0) {
+ options = arg + 1;
+ } else {
+ localInputFiles.emplace_back(arg);
+ continue;
+ }
+ assert(options != nullptr);
+
+ bool finished = false;
+ while (!finished && *options != 0) {
+ // Parse the compression level
+ if (*options >= '0' && *options <= '9') {
+ compressionLevel = parseUnsigned(&options);
+ continue;
+ }
+
+ switch (*options) {
+ case 'h':
+ case 'H':
+ usage();
+ return Status::Message;
+ case 'V':
+ std::fprintf(stderr, "PZSTD version: %s.\n", ZSTD_VERSION_STRING);
+ return Status::Message;
+ case 'p': {
+ finished = true;
+ const char *optionArgument = getArgument(options, argv, i, argc);
+ if (optionArgument == nullptr) {
+ return Status::Failure;
+ }
+ if (*optionArgument < '0' || *optionArgument > '9') {
+ std::fprintf(stderr, "Option -p expects a number, but %s provided\n",
+ optionArgument);
+ return Status::Failure;
+ }
+ numThreads = parseUnsigned(&optionArgument);
+ if (*optionArgument != 0) {
+ std::fprintf(stderr,
+ "Option -p expects a number, but %u%s provided\n",
+ numThreads, optionArgument);
+ return Status::Failure;
+ }
+ break;
+ }
+ case 'o': {
+ finished = true;
+ const char *optionArgument = getArgument(options, argv, i, argc);
+ if (optionArgument == nullptr) {
+ return Status::Failure;
+ }
+ outputFile = optionArgument;
+ break;
+ }
+ case 'C':
+ checksum = true;
+ break;
+ case 'k':
+ keepSource = true;
+ break;
+ case 'd':
+ decompress = true;
+ break;
+ case 'f':
+ overwrite = true;
+ forceStdout = true;
+ followLinks = true;
+ break;
+ case 't':
+ test = true;
+ decompress = true;
+ break;
+#ifdef UTIL_HAS_CREATEFILELIST
+ case 'r':
+ recursive = true;
+ break;
+#endif
+ case 'c':
+ outputFile = kStdOut;
+ forceStdout = true;
+ break;
+ case 'v':
+ ++verbosity;
+ break;
+ case 'q':
+ --verbosity;
+ // Ignore them for now
+ break;
+ // Unsupported options from Zstd
+ case 'D':
+ case 's':
+ notSupported("Zstd dictionaries.");
+ return Status::Failure;
+ case 'b':
+ case 'e':
+ case 'i':
+ case 'B':
+ notSupported("Zstd benchmarking options.");
+ return Status::Failure;
+ default:
+ std::fprintf(stderr, "Invalid argument: %s\n", arg);
+ return Status::Failure;
+ }
+ if (!finished) {
+ ++options;
+ }
+ } // while (*options != 0);
+ } // for (int i = 1; i < argc; ++i);
+
+ // Set options for test mode
+ if (test) {
+ outputFile = nullOutput;
+ keepSource = true;
+ }
+
+ // Input file defaults to standard input if not provided.
+ if (localInputFiles.empty()) {
+ localInputFiles.emplace_back(kStdIn);
+ }
+
+ // Check validity of input files
+ if (localInputFiles.size() > 1) {
+ const auto it = std::find(localInputFiles.begin(), localInputFiles.end(),
+ std::string{kStdIn});
+ if (it != localInputFiles.end()) {
+ std::fprintf(
+ stderr,
+ "Cannot specify standard input when handling multiple files\n");
+ return Status::Failure;
+ }
+ }
+ if (localInputFiles.size() > 1 || recursive) {
+ if (!outputFile.empty() && outputFile != nullOutput) {
+ std::fprintf(
+ stderr,
+ "Cannot specify an output file when handling multiple inputs\n");
+ return Status::Failure;
+ }
+ }
+
+ g_utilDisplayLevel = verbosity;
+ // Remove local input files that are symbolic links
+ if (!followLinks) {
+ std::remove_if(localInputFiles.begin(), localInputFiles.end(),
+ [&](const char *path) {
+ bool isLink = UTIL_isLink(path);
+ if (isLink && verbosity >= 2) {
+ std::fprintf(
+ stderr,
+ "Warning : %s is symbolic link, ignoring\n",
+ path);
+ }
+ return isLink;
+ });
+ }
+
+ // Translate input files/directories into files to (de)compress
+ if (recursive) {
+ FileNamesTable* const files = UTIL_createExpandedFNT(localInputFiles.data(), localInputFiles.size(), followLinks);
+ if (files == nullptr) {
+ std::fprintf(stderr, "Error traversing directories\n");
+ return Status::Failure;
+ }
+ auto guard =
+ makeScopeGuard([&] { UTIL_freeFileNamesTable(files); });
+ if (files->tableSize == 0) {
+ std::fprintf(stderr, "No files found\n");
+ return Status::Failure;
+ }
+ inputFiles.resize(files->tableSize);
+ std::copy(files->fileNames, files->fileNames + files->tableSize, inputFiles.begin());
+ } else {
+ inputFiles.resize(localInputFiles.size());
+ std::copy(localInputFiles.begin(), localInputFiles.end(),
+ inputFiles.begin());
+ }
+ localInputFiles.clear();
+ assert(!inputFiles.empty());
+
+ // If reading from standard input, default to standard output
+ if (inputFiles[0] == kStdIn && outputFile.empty()) {
+ assert(inputFiles.size() == 1);
+ outputFile = "-";
+ }
+
+ if (inputFiles[0] == kStdIn && IS_CONSOLE(stdin)) {
+ assert(inputFiles.size() == 1);
+ std::fprintf(stderr, "Cannot read input from interactive console\n");
+ return Status::Failure;
+ }
+ if (outputFile == "-" && IS_CONSOLE(stdout) && !(forceStdout && decompress)) {
+ std::fprintf(stderr, "Will not write to console stdout unless -c or -f is "
+ "specified and decompressing\n");
+ return Status::Failure;
+ }
+
+ // Check compression level
+ {
+ unsigned maxCLevel =
+ ultra ? ZSTD_maxCLevel() : kMaxNonUltraCompressionLevel;
+ if (compressionLevel > maxCLevel || compressionLevel == 0) {
+ std::fprintf(stderr, "Invalid compression level %u.\n", compressionLevel);
+ return Status::Failure;
+ }
+ }
+
+ // Check that numThreads is set
+ if (numThreads == 0) {
+ std::fprintf(stderr, "Invalid arguments: # of threads not specified "
+ "and unable to determine hardware concurrency.\n");
+ return Status::Failure;
+ }
+
+ // Modify verbosity
+ // If we are piping input and output, turn off interaction
+ if (inputFiles[0] == kStdIn && outputFile == kStdOut && verbosity == 2) {
+ verbosity = 1;
+ }
+ // If we are in multi-file mode, turn off interaction
+ if (inputFiles.size() > 1 && verbosity == 2) {
+ verbosity = 1;
+ }
+
+ return Status::Success;
+}
+
+std::string Options::getOutputFile(const std::string &inputFile) const {
+ if (!outputFile.empty()) {
+ return outputFile;
+ }
+ // Attempt to add/remove zstd extension from the input file
+ if (decompress) {
+ int stemSize = inputFile.size() - kZstdExtension.size();
+ if (stemSize > 0 && inputFile.substr(stemSize) == kZstdExtension) {
+ return inputFile.substr(0, stemSize);
+ } else {
+ return "";
+ }
+ } else {
+ return inputFile + kZstdExtension;
+ }
+}
+}
diff --git a/src/zstd/contrib/pzstd/Options.h b/src/zstd/contrib/pzstd/Options.h
new file mode 100644
index 000000000..f4f2aaa49
--- /dev/null
+++ b/src/zstd/contrib/pzstd/Options.h
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#define ZSTD_STATIC_LINKING_ONLY
+#include "zstd.h"
+#undef ZSTD_STATIC_LINKING_ONLY
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+namespace pzstd {
+
+struct Options {
+ enum class WriteMode { Regular, Auto, Sparse };
+
+ unsigned numThreads;
+ unsigned maxWindowLog;
+ unsigned compressionLevel;
+ bool decompress;
+ std::vector<std::string> inputFiles;
+ std::string outputFile;
+ bool overwrite;
+ bool keepSource;
+ WriteMode writeMode;
+ bool checksum;
+ int verbosity;
+
+ enum class Status {
+ Success, // Successfully parsed options
+ Failure, // Failure to parse options
+ Message // Options specified to print a message (e.g. "-h")
+ };
+
+ Options();
+ Options(unsigned numThreads, unsigned maxWindowLog, unsigned compressionLevel,
+ bool decompress, std::vector<std::string> inputFiles,
+ std::string outputFile, bool overwrite, bool keepSource,
+ WriteMode writeMode, bool checksum, int verbosity)
+ : numThreads(numThreads), maxWindowLog(maxWindowLog),
+ compressionLevel(compressionLevel), decompress(decompress),
+ inputFiles(std::move(inputFiles)), outputFile(std::move(outputFile)),
+ overwrite(overwrite), keepSource(keepSource), writeMode(writeMode),
+ checksum(checksum), verbosity(verbosity) {}
+
+ Status parse(int argc, const char **argv);
+
+ ZSTD_parameters determineParameters() const {
+ ZSTD_parameters params = ZSTD_getParams(compressionLevel, 0, 0);
+ params.fParams.contentSizeFlag = 0;
+ params.fParams.checksumFlag = checksum;
+ if (maxWindowLog != 0 && params.cParams.windowLog > maxWindowLog) {
+ params.cParams.windowLog = maxWindowLog;
+ params.cParams = ZSTD_adjustCParams(params.cParams, 0, 0);
+ }
+ return params;
+ }
+
+ std::string getOutputFile(const std::string &inputFile) const;
+};
+}
diff --git a/src/zstd/contrib/pzstd/Pzstd.cpp b/src/zstd/contrib/pzstd/Pzstd.cpp
new file mode 100644
index 000000000..652187c3b
--- /dev/null
+++ b/src/zstd/contrib/pzstd/Pzstd.cpp
@@ -0,0 +1,611 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "platform.h" /* Large Files support, SET_BINARY_MODE */
+#include "Pzstd.h"
+#include "SkippableFrame.h"
+#include "utils/FileSystem.h"
+#include "utils/Range.h"
+#include "utils/ScopeGuard.h"
+#include "utils/ThreadPool.h"
+#include "utils/WorkQueue.h"
+
+#include <chrono>
+#include <cinttypes>
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <string>
+
+
+namespace pzstd {
+
+namespace {
+#ifdef _WIN32
+const std::string nullOutput = "nul";
+#else
+const std::string nullOutput = "/dev/null";
+#endif
+}
+
+using std::size_t;
+
+static std::uintmax_t fileSizeOrZero(const std::string &file) {
+ if (file == "-") {
+ return 0;
+ }
+ std::error_code ec;
+ auto size = file_size(file, ec);
+ if (ec) {
+ size = 0;
+ }
+ return size;
+}
+
+static std::uint64_t handleOneInput(const Options &options,
+ const std::string &inputFile,
+ FILE* inputFd,
+ const std::string &outputFile,
+ FILE* outputFd,
+ SharedState& state) {
+ auto inputSize = fileSizeOrZero(inputFile);
+ // WorkQueue outlives ThreadPool so in the case of error we are certain
+ // we don't accidentally try to call push() on it after it is destroyed
+ WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
+ std::uint64_t bytesRead;
+ std::uint64_t bytesWritten;
+ {
+ // Initialize the (de)compression thread pool with numThreads
+ ThreadPool executor(options.numThreads);
+ // Run the reader thread on an extra thread
+ ThreadPool readExecutor(1);
+ if (!options.decompress) {
+ // Add a job that reads the input and starts all the compression jobs
+ readExecutor.add(
+ [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
+ bytesRead = asyncCompressChunks(
+ state,
+ outs,
+ executor,
+ inputFd,
+ inputSize,
+ options.numThreads,
+ options.determineParameters());
+ });
+ // Start writing
+ bytesWritten = writeFile(state, outs, outputFd, options.decompress);
+ } else {
+ // Add a job that reads the input and starts all the decompression jobs
+ readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
+ bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
+ });
+ // Start writing
+ bytesWritten = writeFile(state, outs, outputFd, options.decompress);
+ }
+ }
+ if (!state.errorHolder.hasError()) {
+ std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
+ std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
+ if (!options.decompress) {
+ double ratio = static_cast<double>(bytesWritten) /
+ static_cast<double>(bytesRead + !bytesRead);
+ state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
+ " bytes, %s)\n",
+ inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
+ outputFileName.c_str());
+ } else {
+ state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
+ inputFileName.c_str(),bytesWritten);
+ }
+ }
+ return bytesWritten;
+}
+
+static FILE *openInputFile(const std::string &inputFile,
+ ErrorHolder &errorHolder) {
+ if (inputFile == "-") {
+ SET_BINARY_MODE(stdin);
+ return stdin;
+ }
+ // Check if input file is a directory
+ {
+ std::error_code ec;
+ if (is_directory(inputFile, ec)) {
+ errorHolder.setError("Output file is a directory -- ignored");
+ return nullptr;
+ }
+ }
+ auto inputFd = std::fopen(inputFile.c_str(), "rb");
+ if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
+ return nullptr;
+ }
+ return inputFd;
+}
+
+static FILE *openOutputFile(const Options &options,
+ const std::string &outputFile,
+ SharedState& state) {
+ if (outputFile == "-") {
+ SET_BINARY_MODE(stdout);
+ return stdout;
+ }
+ // Check if the output file exists and then open it
+ if (!options.overwrite && outputFile != nullOutput) {
+ auto outputFd = std::fopen(outputFile.c_str(), "rb");
+ if (outputFd != nullptr) {
+ std::fclose(outputFd);
+ if (!state.log.logsAt(INFO)) {
+ state.errorHolder.setError("Output file exists");
+ return nullptr;
+ }
+ state.log(
+ INFO,
+ "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
+ outputFile.c_str());
+ int c = getchar();
+ if (c != 'y' && c != 'Y') {
+ state.errorHolder.setError("Not overwritten");
+ return nullptr;
+ }
+ }
+ }
+ auto outputFd = std::fopen(outputFile.c_str(), "wb");
+ if (!state.errorHolder.check(
+ outputFd != nullptr, "Failed to open output file")) {
+ return nullptr;
+ }
+ return outputFd;
+}
+
+int pzstdMain(const Options &options) {
+ int returnCode = 0;
+ SharedState state(options);
+ for (const auto& input : options.inputFiles) {
+ // Setup the shared state
+ auto printErrorGuard = makeScopeGuard([&] {
+ if (state.errorHolder.hasError()) {
+ returnCode = 1;
+ state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
+ state.errorHolder.getError().c_str());
+ }
+ });
+ // Open the input file
+ auto inputFd = openInputFile(input, state.errorHolder);
+ if (inputFd == nullptr) {
+ continue;
+ }
+ auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
+ // Open the output file
+ auto outputFile = options.getOutputFile(input);
+ if (!state.errorHolder.check(outputFile != "",
+ "Input file does not have extension .zst")) {
+ continue;
+ }
+ auto outputFd = openOutputFile(options, outputFile, state);
+ if (outputFd == nullptr) {
+ continue;
+ }
+ auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
+ // (de)compress the file
+ handleOneInput(options, input, inputFd, outputFile, outputFd, state);
+ if (state.errorHolder.hasError()) {
+ continue;
+ }
+ // Delete the input file if necessary
+ if (!options.keepSource) {
+ // Be sure that we are done and have written everything before we delete
+ if (!state.errorHolder.check(std::fclose(inputFd) == 0,
+ "Failed to close input file")) {
+ continue;
+ }
+ closeInputGuard.dismiss();
+ if (!state.errorHolder.check(std::fclose(outputFd) == 0,
+ "Failed to close output file")) {
+ continue;
+ }
+ closeOutputGuard.dismiss();
+ if (std::remove(input.c_str()) != 0) {
+ state.errorHolder.setError("Failed to remove input file");
+ continue;
+ }
+ }
+ }
+ // Returns 1 if any of the files failed to (de)compress.
+ return returnCode;
+}
+
+/// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
+static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
+ return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
+}
+
+/**
+ * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
+ * `inBuffer.pos`.
+ */
+void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
+ auto pos = inBuffer.pos;
+ inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
+ inBuffer.size -= pos;
+ inBuffer.pos = 0;
+ return buffer.advance(pos);
+}
+
+/// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
+static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
+ return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
+}
+
+/**
+ * Split `buffer` and advance `outBuffer` by the amount of data written, as
+ * indicated by `outBuffer.pos`.
+ */
+Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
+ auto pos = outBuffer.pos;
+ outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
+ outBuffer.size -= pos;
+ outBuffer.pos = 0;
+ return buffer.splitAt(pos);
+}
+
+/**
+ * Stream chunks of input from `in`, compress it, and stream it out to `out`.
+ *
+ * @param state The shared state
+ * @param in Queue that we `pop()` input buffers from
+ * @param out Queue that we `push()` compressed output buffers to
+ * @param maxInputSize An upper bound on the size of the input
+ */
+static void compress(
+ SharedState& state,
+ std::shared_ptr<BufferWorkQueue> in,
+ std::shared_ptr<BufferWorkQueue> out,
+ size_t maxInputSize) {
+ auto& errorHolder = state.errorHolder;
+ auto guard = makeScopeGuard([&] { out->finish(); });
+ // Initialize the CCtx
+ auto ctx = state.cStreamPool->get();
+ if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
+ return;
+ }
+ {
+ auto err = ZSTD_resetCStream(ctx.get(), 0);
+ if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
+ return;
+ }
+ }
+
+ // Allocate space for the result
+ auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
+ auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
+ {
+ Buffer inBuffer;
+ // Read a buffer in from the input queue
+ while (in->pop(inBuffer) && !errorHolder.hasError()) {
+ auto zstdInBuffer = makeZstdInBuffer(inBuffer);
+ // Compress the whole buffer and send it to the output queue
+ while (!inBuffer.empty() && !errorHolder.hasError()) {
+ if (!errorHolder.check(
+ !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
+ return;
+ }
+ // Compress
+ auto err =
+ ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
+ if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
+ return;
+ }
+ // Split the compressed data off outBuffer and pass to the output queue
+ out->push(split(outBuffer, zstdOutBuffer));
+ // Forget about the data we already compressed
+ advance(inBuffer, zstdInBuffer);
+ }
+ }
+ }
+ // Write the epilog
+ size_t bytesLeft;
+ do {
+ if (!errorHolder.check(
+ !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
+ return;
+ }
+ bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
+ if (!errorHolder.check(
+ !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
+ return;
+ }
+ out->push(split(outBuffer, zstdOutBuffer));
+ } while (bytesLeft != 0 && !errorHolder.hasError());
+}
+
+/**
+ * Calculates how large each independently compressed frame should be.
+ *
+ * @param size The size of the source if known, 0 otherwise
+ * @param numThreads The number of threads available to run compression jobs on
+ * @param params The zstd parameters to be used for compression
+ */
+static size_t calculateStep(
+ std::uintmax_t size,
+ size_t numThreads,
+ const ZSTD_parameters &params) {
+ (void)size;
+ (void)numThreads;
+ return size_t{1} << (params.cParams.windowLog + 2);
+}
+
+namespace {
+enum class FileStatus { Continue, Done, Error };
+/// Determines the status of the file descriptor `fd`.
+FileStatus fileStatus(FILE* fd) {
+ if (std::feof(fd)) {
+ return FileStatus::Done;
+ } else if (std::ferror(fd)) {
+ return FileStatus::Error;
+ }
+ return FileStatus::Continue;
+}
+} // anonymous namespace
+
+/**
+ * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
+ * Will read less if an error or EOF occurs.
+ * Returns the status of the file after all of the reads have occurred.
+ */
+static FileStatus
+readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
+ std::uint64_t *totalBytesRead) {
+ Buffer buffer(size);
+ while (!buffer.empty()) {
+ auto bytesRead =
+ std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
+ *totalBytesRead += bytesRead;
+ queue.push(buffer.splitAt(bytesRead));
+ auto status = fileStatus(fd);
+ if (status != FileStatus::Continue) {
+ return status;
+ }
+ }
+ return FileStatus::Continue;
+}
+
+std::uint64_t asyncCompressChunks(
+ SharedState& state,
+ WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
+ ThreadPool& executor,
+ FILE* fd,
+ std::uintmax_t size,
+ size_t numThreads,
+ ZSTD_parameters params) {
+ auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
+ std::uint64_t bytesRead = 0;
+
+ // Break the input up into chunks of size `step` and compress each chunk
+ // independently.
+ size_t step = calculateStep(size, numThreads, params);
+ state.log(DEBUG, "Chosen frame size: %zu\n", step);
+ auto status = FileStatus::Continue;
+ while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
+ // Make a new input queue that we will put the chunk's input data into.
+ auto in = std::make_shared<BufferWorkQueue>();
+ auto inGuard = makeScopeGuard([&] { in->finish(); });
+ // Make a new output queue that compress will put the compressed data into.
+ auto out = std::make_shared<BufferWorkQueue>();
+ // Start compression in the thread pool
+ executor.add([&state, in, out, step] {
+ return compress(
+ state, std::move(in), std::move(out), step);
+ });
+ // Pass the output queue to the writer thread.
+ chunks.push(std::move(out));
+ state.log(VERBOSE, "%s\n", "Starting a new frame");
+ // Fill the input queue for the compression job we just started
+ status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
+ }
+ state.errorHolder.check(status != FileStatus::Error, "Error reading input");
+ return bytesRead;
+}
+
+/**
+ * Decompress a frame, whose data is streamed into `in`, and stream the output
+ * to `out`.
+ *
+ * @param state The shared state
+ * @param in Queue that we `pop()` input buffers from. It contains
+ * exactly one compressed frame.
+ * @param out Queue that we `push()` decompressed output buffers to
+ */
+static void decompress(
+ SharedState& state,
+ std::shared_ptr<BufferWorkQueue> in,
+ std::shared_ptr<BufferWorkQueue> out) {
+ auto& errorHolder = state.errorHolder;
+ auto guard = makeScopeGuard([&] { out->finish(); });
+ // Initialize the DCtx
+ auto ctx = state.dStreamPool->get();
+ if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
+ return;
+ }
+ {
+ auto err = ZSTD_resetDStream(ctx.get());
+ if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
+ return;
+ }
+ }
+
+ const size_t outSize = ZSTD_DStreamOutSize();
+ Buffer inBuffer;
+ size_t returnCode = 0;
+ // Read a buffer in from the input queue
+ while (in->pop(inBuffer) && !errorHolder.hasError()) {
+ auto zstdInBuffer = makeZstdInBuffer(inBuffer);
+ // Decompress the whole buffer and send it to the output queue
+ while (!inBuffer.empty() && !errorHolder.hasError()) {
+ // Allocate a buffer with at least outSize bytes.
+ Buffer outBuffer(outSize);
+ auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
+ // Decompress
+ returnCode =
+ ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
+ if (!errorHolder.check(
+ !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
+ return;
+ }
+ // Pass the buffer with the decompressed data to the output queue
+ out->push(split(outBuffer, zstdOutBuffer));
+ // Advance past the input we already read
+ advance(inBuffer, zstdInBuffer);
+ if (returnCode == 0) {
+ // The frame is over, prepare to (maybe) start a new frame
+ ZSTD_initDStream(ctx.get());
+ }
+ }
+ }
+ if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
+ return;
+ }
+ // We've given ZSTD_decompressStream all of our data, but there may still
+ // be data to read.
+ while (returnCode == 1) {
+ // Allocate a buffer with at least outSize bytes.
+ Buffer outBuffer(outSize);
+ auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
+ // Pass in no input.
+ ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
+ // Decompress
+ returnCode =
+ ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
+ if (!errorHolder.check(
+ !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
+ return;
+ }
+ // Pass the buffer with the decompressed data to the output queue
+ out->push(split(outBuffer, zstdOutBuffer));
+ }
+}
+
+std::uint64_t asyncDecompressFrames(
+ SharedState& state,
+ WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
+ ThreadPool& executor,
+ FILE* fd) {
+ auto framesGuard = makeScopeGuard([&] { frames.finish(); });
+ std::uint64_t totalBytesRead = 0;
+
+ // Split the source up into its component frames.
+ // If we find our recognized skippable frame we know the next frames size
+ // which means that we can decompress each standard frame in independently.
+ // Otherwise, we will decompress using only one decompression task.
+ const size_t chunkSize = ZSTD_DStreamInSize();
+ auto status = FileStatus::Continue;
+ while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
+ // Make a new input queue that we will put the frames's bytes into.
+ auto in = std::make_shared<BufferWorkQueue>();
+ auto inGuard = makeScopeGuard([&] { in->finish(); });
+ // Make a output queue that decompress will put the decompressed data into
+ auto out = std::make_shared<BufferWorkQueue>();
+
+ size_t frameSize;
+ {
+ // Calculate the size of the next frame.
+ // frameSize is 0 if the frame info can't be decoded.
+ Buffer buffer(SkippableFrame::kSize);
+ auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
+ totalBytesRead += bytesRead;
+ status = fileStatus(fd);
+ if (bytesRead == 0 && status != FileStatus::Continue) {
+ break;
+ }
+ buffer.subtract(buffer.size() - bytesRead);
+ frameSize = SkippableFrame::tryRead(buffer.range());
+ in->push(std::move(buffer));
+ }
+ if (frameSize == 0) {
+ // We hit a non SkippableFrame, so this will be the last job.
+ // Make sure that we don't use too much memory
+ in->setMaxSize(64);
+ out->setMaxSize(64);
+ }
+ // Start decompression in the thread pool
+ executor.add([&state, in, out] {
+ return decompress(state, std::move(in), std::move(out));
+ });
+ // Pass the output queue to the writer thread
+ frames.push(std::move(out));
+ if (frameSize == 0) {
+ // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
+ // Pass the rest of the source to this decompression task
+ state.log(VERBOSE, "%s\n",
+ "Input not in pzstd format, falling back to serial decompression");
+ while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
+ status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
+ }
+ break;
+ }
+ state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
+ // Fill the input queue for the decompression job we just started
+ status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
+ }
+ state.errorHolder.check(status != FileStatus::Error, "Error reading input");
+ return totalBytesRead;
+}
+
+/// Write `data` to `fd`, returns true iff success.
+static bool writeData(ByteRange data, FILE* fd) {
+ while (!data.empty()) {
+ data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
+ if (std::ferror(fd)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::uint64_t writeFile(
+ SharedState& state,
+ WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
+ FILE* outputFd,
+ bool decompress) {
+ auto& errorHolder = state.errorHolder;
+ auto lineClearGuard = makeScopeGuard([&state] {
+ state.log.clear(INFO);
+ });
+ std::uint64_t bytesWritten = 0;
+ std::shared_ptr<BufferWorkQueue> out;
+ // Grab the output queue for each decompression job (in order).
+ while (outs.pop(out)) {
+ if (errorHolder.hasError()) {
+ continue;
+ }
+ if (!decompress) {
+ // If we are compressing and want to write skippable frames we can't
+ // start writing before compression is done because we need to know the
+ // compressed size.
+ // Wait for the compressed size to be available and write skippable frame
+ SkippableFrame frame(out->size());
+ if (!writeData(frame.data(), outputFd)) {
+ errorHolder.setError("Failed to write output");
+ return bytesWritten;
+ }
+ bytesWritten += frame.kSize;
+ }
+ // For each chunk of the frame: Pop it from the queue and write it
+ Buffer buffer;
+ while (out->pop(buffer) && !errorHolder.hasError()) {
+ if (!writeData(buffer.range(), outputFd)) {
+ errorHolder.setError("Failed to write output");
+ return bytesWritten;
+ }
+ bytesWritten += buffer.size();
+ state.log.update(INFO, "Written: %u MB ",
+ static_cast<std::uint32_t>(bytesWritten >> 20));
+ }
+ }
+ return bytesWritten;
+}
+}
diff --git a/src/zstd/contrib/pzstd/Pzstd.h b/src/zstd/contrib/pzstd/Pzstd.h
new file mode 100644
index 000000000..79d1fcca2
--- /dev/null
+++ b/src/zstd/contrib/pzstd/Pzstd.h
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "ErrorHolder.h"
+#include "Logging.h"
+#include "Options.h"
+#include "utils/Buffer.h"
+#include "utils/Range.h"
+#include "utils/ResourcePool.h"
+#include "utils/ThreadPool.h"
+#include "utils/WorkQueue.h"
+#define ZSTD_STATIC_LINKING_ONLY
+#include "zstd.h"
+#undef ZSTD_STATIC_LINKING_ONLY
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
+namespace pzstd {
+/**
+ * Runs pzstd with `options` and returns the number of bytes written.
+ * An error occurred if `errorHandler.hasError()`.
+ *
+ * @param options The pzstd options to use for (de)compression
+ * @returns 0 upon success and non-zero on failure.
+ */
+int pzstdMain(const Options& options);
+
+class SharedState {
+ public:
+ SharedState(const Options& options) : log(options.verbosity) {
+ if (!options.decompress) {
+ auto parameters = options.determineParameters();
+ cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
+ [this, parameters]() -> ZSTD_CStream* {
+ this->log(VERBOSE, "%s\n", "Creating new ZSTD_CStream");
+ auto zcs = ZSTD_createCStream();
+ if (zcs) {
+ auto err = ZSTD_initCStream_advanced(
+ zcs, nullptr, 0, parameters, 0);
+ if (ZSTD_isError(err)) {
+ ZSTD_freeCStream(zcs);
+ return nullptr;
+ }
+ }
+ return zcs;
+ },
+ [](ZSTD_CStream *zcs) {
+ ZSTD_freeCStream(zcs);
+ }});
+ } else {
+ dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
+ [this]() -> ZSTD_DStream* {
+ this->log(VERBOSE, "%s\n", "Creating new ZSTD_DStream");
+ auto zds = ZSTD_createDStream();
+ if (zds) {
+ auto err = ZSTD_initDStream(zds);
+ if (ZSTD_isError(err)) {
+ ZSTD_freeDStream(zds);
+ return nullptr;
+ }
+ }
+ return zds;
+ },
+ [](ZSTD_DStream *zds) {
+ ZSTD_freeDStream(zds);
+ }});
+ }
+ }
+
+ ~SharedState() {
+ // The resource pools have references to this, so destroy them first.
+ cStreamPool.reset();
+ dStreamPool.reset();
+ }
+
+ Logger log;
+ ErrorHolder errorHolder;
+ std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
+ std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
+};
+
+/**
+ * Streams input from `fd`, breaks input up into chunks, and compresses each
+ * chunk independently. Output of each chunk gets streamed to a queue, and
+ * the output queues get put into `chunks` in order.
+ *
+ * @param state The shared state
+ * @param chunks Each compression jobs output queue gets `pushed()` here
+ * as soon as it is available
+ * @param executor The thread pool to run compression jobs in
+ * @param fd The input file descriptor
+ * @param size The size of the input file if known, 0 otherwise
+ * @param numThreads The number of threads in the thread pool
+ * @param parameters The zstd parameters to use for compression
+ * @returns The number of bytes read from the file
+ */
+std::uint64_t asyncCompressChunks(
+ SharedState& state,
+ WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
+ ThreadPool& executor,
+ FILE* fd,
+ std::uintmax_t size,
+ std::size_t numThreads,
+ ZSTD_parameters parameters);
+
+/**
+ * Streams input from `fd`. If pzstd headers are available it breaks the input
+ * up into independent frames. It sends each frame to an independent
+ * decompression job. Output of each frame gets streamed to a queue, and
+ * the output queues get put into `frames` in order.
+ *
+ * @param state The shared state
+ * @param frames Each decompression jobs output queue gets `pushed()` here
+ * as soon as it is available
+ * @param executor The thread pool to run compression jobs in
+ * @param fd The input file descriptor
+ * @returns The number of bytes read from the file
+ */
+std::uint64_t asyncDecompressFrames(
+ SharedState& state,
+ WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
+ ThreadPool& executor,
+ FILE* fd);
+
+/**
+ * Streams input in from each queue in `outs` in order, and writes the data to
+ * `outputFd`.
+ *
+ * @param state The shared state
+ * @param outs A queue of output queues, one for each
+ * (de)compression job.
+ * @param outputFd The file descriptor to write to
+ * @param decompress Are we decompressing?
+ * @returns The number of bytes written
+ */
+std::uint64_t writeFile(
+ SharedState& state,
+ WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
+ FILE* outputFd,
+ bool decompress);
+}
diff --git a/src/zstd/contrib/pzstd/README.md b/src/zstd/contrib/pzstd/README.md
new file mode 100644
index 000000000..84d945815
--- /dev/null
+++ b/src/zstd/contrib/pzstd/README.md
@@ -0,0 +1,56 @@
+# Parallel Zstandard (PZstandard)
+
+Parallel Zstandard is a Pigz-like tool for Zstandard.
+It provides Zstandard format compatible compression and decompression that is able to utilize multiple cores.
+It breaks the input up into equal sized chunks and compresses each chunk independently into a Zstandard frame.
+It then concatenates the frames together to produce the final compressed output.
+Pzstandard will write a 12 byte header for each frame that is a skippable frame in the Zstandard format, which tells PZstandard the size of the next compressed frame.
+PZstandard supports parallel decompression of files compressed with PZstandard.
+When decompressing files compressed with Zstandard, PZstandard does IO in one thread, and decompression in another.
+
+## Usage
+
+PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads.
+Dictionary mode is not currently supported.
+
+Basic usage
+
+ pzstd input-file -o output-file -p num-threads -# # Compression
+ pzstd -d input-file -o output-file -p num-threads # Decompression
+
+PZstandard also supports piping and fifo pipes
+
+ cat input-file | pzstd -p num-threads -# -c > /dev/null
+
+For more options
+
+ pzstd --help
+
+PZstandard tries to pick a smart default number of threads if not specified (displayed in `pzstd --help`).
+If this number is not suitable, during compilation you can define `PZSTD_NUM_THREADS` to the number of threads you prefer.
+
+## Benchmarks
+
+As a reference, PZstandard and Pigz were compared on an Intel Core i7 @ 3.1 GHz, each using 4 threads, with the [Silesia compression corpus](http://sun.aei.polsl.pl/~sdeor/index.php?page=silesia).
+
+Compression Speed vs Ratio with 4 Threads | Decompression Speed with 4 Threads
+------------------------------------------|-----------------------------------
+![Compression Speed vs Ratio](images/Cspeed.png "Compression Speed vs Ratio") | ![Decompression Speed](images/Dspeed.png "Decompression Speed")
+
+The test procedure was to run each of the following commands 2 times for each compression level, and take the minimum time.
+
+ time pzstd -# -p 4 -c silesia.tar > silesia.tar.zst
+ time pzstd -d -p 4 -c silesia.tar.zst > /dev/null
+
+ time pigz -# -p 4 -k -c silesia.tar > silesia.tar.gz
+ time pigz -d -p 4 -k -c silesia.tar.gz > /dev/null
+
+PZstandard was tested using compression levels 1-19, and Pigz was tested using compression levels 1-9.
+Pigz cannot do parallel decompression, it simply does each of reading, decompression, and writing on separate threads.
+
+## Tests
+
+Tests require that you have [gtest](https://github.com/google/googletest) installed.
+Set `GTEST_INC` and `GTEST_LIB` in `Makefile` to specify the location of the gtest headers and libraries.
+Alternatively, run `make googletest`, which will clone googletest and build it.
+Run `make tests && make check` to run tests.
diff --git a/src/zstd/contrib/pzstd/SkippableFrame.cpp b/src/zstd/contrib/pzstd/SkippableFrame.cpp
new file mode 100644
index 000000000..769866dfc
--- /dev/null
+++ b/src/zstd/contrib/pzstd/SkippableFrame.cpp
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "SkippableFrame.h"
+#include "mem.h"
+#include "utils/Range.h"
+
+#include <cstdio>
+
+using namespace pzstd;
+
+SkippableFrame::SkippableFrame(std::uint32_t size) : frameSize_(size) {
+ MEM_writeLE32(data_.data(), kSkippableFrameMagicNumber);
+ MEM_writeLE32(data_.data() + 4, kFrameContentsSize);
+ MEM_writeLE32(data_.data() + 8, frameSize_);
+}
+
+/* static */ std::size_t SkippableFrame::tryRead(ByteRange bytes) {
+ if (bytes.size() < SkippableFrame::kSize ||
+ MEM_readLE32(bytes.begin()) != kSkippableFrameMagicNumber ||
+ MEM_readLE32(bytes.begin() + 4) != kFrameContentsSize) {
+ return 0;
+ }
+ return MEM_readLE32(bytes.begin() + 8);
+}
diff --git a/src/zstd/contrib/pzstd/SkippableFrame.h b/src/zstd/contrib/pzstd/SkippableFrame.h
new file mode 100644
index 000000000..60deed040
--- /dev/null
+++ b/src/zstd/contrib/pzstd/SkippableFrame.h
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "utils/Range.h"
+
+#include <array>
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+
+namespace pzstd {
+/**
+ * We put a skippable frame before each frame.
+ * It contains a skippable frame magic number, the size of the skippable frame,
+ * and the size of the next frame.
+ * Each skippable frame is exactly 12 bytes in little endian format.
+ * The first 8 bytes are for compatibility with the ZSTD format.
+ * If we have N threads, the output will look like
+ *
+ * [0x184D2A50|4|size1] [frame1 of size size1]
+ * [0x184D2A50|4|size2] [frame2 of size size2]
+ * ...
+ * [0x184D2A50|4|sizeN] [frameN of size sizeN]
+ *
+ * Each sizeX is 4 bytes.
+ *
+ * These skippable frames should allow us to skip through the compressed file
+ * and only load at most N pages.
+ */
+class SkippableFrame {
+ public:
+ static constexpr std::size_t kSize = 12;
+
+ private:
+ std::uint32_t frameSize_;
+ std::array<std::uint8_t, kSize> data_;
+ static constexpr std::uint32_t kSkippableFrameMagicNumber = 0x184D2A50;
+ // Could be improved if the size fits in less bytes
+ static constexpr std::uint32_t kFrameContentsSize = kSize - 8;
+
+ public:
+ // Write the skippable frame to data_ in LE format.
+ explicit SkippableFrame(std::uint32_t size);
+
+ // Read the skippable frame from bytes in LE format.
+ static std::size_t tryRead(ByteRange bytes);
+
+ ByteRange data() const {
+ return {data_.data(), data_.size()};
+ }
+
+ // Size of the next frame.
+ std::size_t frameSize() const {
+ return frameSize_;
+ }
+};
+}
diff --git a/src/zstd/contrib/pzstd/images/Cspeed.png b/src/zstd/contrib/pzstd/images/Cspeed.png
new file mode 100644
index 000000000..aca4f663e
--- /dev/null
+++ b/src/zstd/contrib/pzstd/images/Cspeed.png
Binary files differ
diff --git a/src/zstd/contrib/pzstd/images/Dspeed.png b/src/zstd/contrib/pzstd/images/Dspeed.png
new file mode 100644
index 000000000..e48881bcd
--- /dev/null
+++ b/src/zstd/contrib/pzstd/images/Dspeed.png
Binary files differ
diff --git a/src/zstd/contrib/pzstd/main.cpp b/src/zstd/contrib/pzstd/main.cpp
new file mode 100644
index 000000000..b93f043b1
--- /dev/null
+++ b/src/zstd/contrib/pzstd/main.cpp
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "ErrorHolder.h"
+#include "Options.h"
+#include "Pzstd.h"
+
+using namespace pzstd;
+
+int main(int argc, const char** argv) {
+ Options options;
+ switch (options.parse(argc, argv)) {
+ case Options::Status::Failure:
+ return 1;
+ case Options::Status::Message:
+ return 0;
+ default:
+ break;
+ }
+
+ return pzstdMain(options);
+}
diff --git a/src/zstd/contrib/pzstd/test/BUCK b/src/zstd/contrib/pzstd/test/BUCK
new file mode 100644
index 000000000..6d3fdd3c2
--- /dev/null
+++ b/src/zstd/contrib/pzstd/test/BUCK
@@ -0,0 +1,37 @@
+cxx_test(
+ name='options_test',
+ srcs=['OptionsTest.cpp'],
+ deps=['//contrib/pzstd:options'],
+)
+
+cxx_test(
+ name='pzstd_test',
+ srcs=['PzstdTest.cpp'],
+ deps=[
+ ':round_trip',
+ '//contrib/pzstd:libpzstd',
+ '//contrib/pzstd/utils:scope_guard',
+ '//programs:datagen',
+ ],
+)
+
+cxx_binary(
+ name='round_trip_test',
+ srcs=['RoundTripTest.cpp'],
+ deps=[
+ ':round_trip',
+ '//contrib/pzstd/utils:scope_guard',
+ '//programs:datagen',
+ ]
+)
+
+cxx_library(
+ name='round_trip',
+ header_namespace='test',
+ exported_headers=['RoundTrip.h'],
+ deps=[
+ '//contrib/pzstd:libpzstd',
+ '//contrib/pzstd:options',
+ '//contrib/pzstd/utils:scope_guard',
+ ]
+)
diff --git a/src/zstd/contrib/pzstd/test/OptionsTest.cpp b/src/zstd/contrib/pzstd/test/OptionsTest.cpp
new file mode 100644
index 000000000..e60114825
--- /dev/null
+++ b/src/zstd/contrib/pzstd/test/OptionsTest.cpp
@@ -0,0 +1,536 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "Options.h"
+
+#include <array>
+#include <gtest/gtest.h>
+
+using namespace pzstd;
+
+namespace pzstd {
+bool operator==(const Options &lhs, const Options &rhs) {
+ return lhs.numThreads == rhs.numThreads &&
+ lhs.maxWindowLog == rhs.maxWindowLog &&
+ lhs.compressionLevel == rhs.compressionLevel &&
+ lhs.decompress == rhs.decompress && lhs.inputFiles == rhs.inputFiles &&
+ lhs.outputFile == rhs.outputFile && lhs.overwrite == rhs.overwrite &&
+ lhs.keepSource == rhs.keepSource && lhs.writeMode == rhs.writeMode &&
+ lhs.checksum == rhs.checksum && lhs.verbosity == rhs.verbosity;
+}
+
+std::ostream &operator<<(std::ostream &out, const Options &opt) {
+ out << "{";
+ {
+ out << "\n\t"
+ << "numThreads: " << opt.numThreads;
+ out << ",\n\t"
+ << "maxWindowLog: " << opt.maxWindowLog;
+ out << ",\n\t"
+ << "compressionLevel: " << opt.compressionLevel;
+ out << ",\n\t"
+ << "decompress: " << opt.decompress;
+ out << ",\n\t"
+ << "inputFiles: {";
+ {
+ bool first = true;
+ for (const auto &file : opt.inputFiles) {
+ if (!first) {
+ out << ",";
+ }
+ first = false;
+ out << "\n\t\t" << file;
+ }
+ }
+ out << "\n\t}";
+ out << ",\n\t"
+ << "outputFile: " << opt.outputFile;
+ out << ",\n\t"
+ << "overwrite: " << opt.overwrite;
+ out << ",\n\t"
+ << "keepSource: " << opt.keepSource;
+ out << ",\n\t"
+ << "writeMode: " << static_cast<int>(opt.writeMode);
+ out << ",\n\t"
+ << "checksum: " << opt.checksum;
+ out << ",\n\t"
+ << "verbosity: " << opt.verbosity;
+ }
+ out << "\n}";
+ return out;
+}
+}
+
+namespace {
+#ifdef _WIN32
+const char nullOutput[] = "nul";
+#else
+const char nullOutput[] = "/dev/null";
+#endif
+
+constexpr auto autoMode = Options::WriteMode::Auto;
+} // anonymous namespace
+
+#define EXPECT_SUCCESS(...) EXPECT_EQ(Options::Status::Success, __VA_ARGS__)
+#define EXPECT_FAILURE(...) EXPECT_EQ(Options::Status::Failure, __VA_ARGS__)
+#define EXPECT_MESSAGE(...) EXPECT_EQ(Options::Status::Message, __VA_ARGS__)
+
+template <typename... Args>
+std::array<const char *, sizeof...(Args) + 1> makeArray(Args... args) {
+ return {{nullptr, args...}};
+}
+
+TEST(Options, ValidInputs) {
+ {
+ Options options;
+ auto args = makeArray("--processes", "5", "-o", "x", "y", "-f");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {5, 23, 3, false, {"y"}, "x",
+ true, true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("-p", "1", "input", "-19");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {1, 23, 19, false, {"input"}, "",
+ false, true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args =
+ makeArray("--ultra", "-22", "-p", "1", "-o", "x", "-d", "x.zst", "-f");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {1, 0, 22, true, {"x.zst"}, "x",
+ true, true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("--processes", "100", "hello.zst", "--decompress",
+ "--force");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {100, 23, 3, true, {"hello.zst"}, "", true,
+ true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-dp", "1", "-c");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {1, 23, 3, true, {"x"}, "-",
+ false, true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-dp", "1", "--stdout");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {1, 23, 3, true, {"x"}, "-",
+ false, true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("-p", "1", "x", "-5", "-fo", "-", "--ultra", "-d");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {1, 0, 5, true, {"x"}, "-",
+ true, true, autoMode, true, 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("silesia.tar", "-o", "silesia.tar.pzstd", "-p", "2");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {2,
+ 23,
+ 3,
+ false,
+ {"silesia.tar"},
+ "silesia.tar.pzstd",
+ false,
+ true,
+ autoMode,
+ true,
+ 2};
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-p", "1");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-p", "1");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, GetOutputFile) {
+ {
+ Options options;
+ auto args = makeArray("x");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ("x.zst", options.getOutputFile(options.inputFiles[0]));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "y", "-o", nullOutput);
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(nullOutput, options.getOutputFile(options.inputFiles[0]));
+ }
+ {
+ Options options;
+ auto args = makeArray("x.zst", "-do", nullOutput);
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(nullOutput, options.getOutputFile(options.inputFiles[0]));
+ }
+ {
+ Options options;
+ auto args = makeArray("x.zst", "-d");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ("x", options.getOutputFile(options.inputFiles[0]));
+ }
+ {
+ Options options;
+ auto args = makeArray("xzst", "-d");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ("", options.getOutputFile(options.inputFiles[0]));
+ }
+ {
+ Options options;
+ auto args = makeArray("xzst", "-doxx");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ("xx", options.getOutputFile(options.inputFiles[0]));
+ }
+}
+
+TEST(Options, MultipleFiles) {
+ {
+ Options options;
+ auto args = makeArray("x", "y", "z");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected;
+ expected.inputFiles = {"x", "y", "z"};
+ expected.verbosity = 1;
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "y", "z", "-o", nullOutput);
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected;
+ expected.inputFiles = {"x", "y", "z"};
+ expected.outputFile = nullOutput;
+ expected.verbosity = 1;
+ EXPECT_EQ(expected, options);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "y", "-o-");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "y", "-o", "file");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("-qqvd12qp4", "-f", "x", "--", "--rm", "-c");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ Options expected = {4, 23, 12, true, {"x", "--rm", "-c"},
+ "", true, true, autoMode, true,
+ 0};
+ EXPECT_EQ(expected, options);
+ }
+}
+
+TEST(Options, NumThreads) {
+ {
+ Options options;
+ auto args = makeArray("x", "-dfo", "-");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-p", "0", "-fo", "-");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("-f", "-p", "-o", "-");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, BadCompressionLevel) {
+ {
+ Options options;
+ auto args = makeArray("x", "-20");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--ultra", "-23");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--1"); // negative 1?
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, InvalidOption) {
+ {
+ Options options;
+ auto args = makeArray("x", "-x");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, BadOutputFile) {
+ {
+ Options options;
+ auto args = makeArray("notzst", "-d", "-p", "1");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ("", options.getOutputFile(options.inputFiles.front()));
+ }
+}
+
+TEST(Options, BadOptionsWithArguments) {
+ {
+ Options options;
+ auto args = makeArray("x", "-pf");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-p", "10f");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-p");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-o");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-o");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, KeepSource) {
+ {
+ Options options;
+ auto args = makeArray("x", "--rm", "-k");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.keepSource);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--rm", "--keep");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.keepSource);
+ }
+ {
+ Options options;
+ auto args = makeArray("x");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.keepSource);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--rm");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(false, options.keepSource);
+ }
+}
+
+TEST(Options, Verbosity) {
+ {
+ Options options;
+ auto args = makeArray("x");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(2, options.verbosity);
+ }
+ {
+ Options options;
+ auto args = makeArray("--quiet", "-qq", "x");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(-1, options.verbosity);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "y");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(1, options.verbosity);
+ }
+ {
+ Options options;
+ auto args = makeArray("--", "x", "y");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(1, options.verbosity);
+ }
+ {
+ Options options;
+ auto args = makeArray("-qv", "x", "y");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(1, options.verbosity);
+ }
+ {
+ Options options;
+ auto args = makeArray("-v", "x", "y");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(3, options.verbosity);
+ }
+ {
+ Options options;
+ auto args = makeArray("-v", "x");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(3, options.verbosity);
+ }
+}
+
+TEST(Options, TestMode) {
+ {
+ Options options;
+ auto args = makeArray("x", "-t");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.keepSource);
+ EXPECT_EQ(true, options.decompress);
+ EXPECT_EQ(nullOutput, options.outputFile);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--test", "--rm", "-ohello");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.keepSource);
+ EXPECT_EQ(true, options.decompress);
+ EXPECT_EQ(nullOutput, options.outputFile);
+ }
+}
+
+TEST(Options, Checksum) {
+ {
+ Options options;
+ auto args = makeArray("x.zst", "--no-check", "-Cd");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.checksum);
+ }
+ {
+ Options options;
+ auto args = makeArray("x");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.checksum);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--no-check", "--check");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(true, options.checksum);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "--no-check");
+ EXPECT_SUCCESS(options.parse(args.size(), args.data()));
+ EXPECT_EQ(false, options.checksum);
+ }
+}
+
+TEST(Options, InputFiles) {
+ {
+ Options options;
+ auto args = makeArray("-cd");
+ options.parse(args.size(), args.data());
+ EXPECT_EQ(1, options.inputFiles.size());
+ EXPECT_EQ("-", options.inputFiles[0]);
+ EXPECT_EQ("-", options.outputFile);
+ }
+ {
+ Options options;
+ auto args = makeArray();
+ options.parse(args.size(), args.data());
+ EXPECT_EQ(1, options.inputFiles.size());
+ EXPECT_EQ("-", options.inputFiles[0]);
+ EXPECT_EQ("-", options.outputFile);
+ }
+ {
+ Options options;
+ auto args = makeArray("-d");
+ options.parse(args.size(), args.data());
+ EXPECT_EQ(1, options.inputFiles.size());
+ EXPECT_EQ("-", options.inputFiles[0]);
+ EXPECT_EQ("-", options.outputFile);
+ }
+ {
+ Options options;
+ auto args = makeArray("x", "-");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, InvalidOptions) {
+ {
+ Options options;
+ auto args = makeArray("-ibasdf");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("- ");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("-n15");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("-0", "x");
+ EXPECT_FAILURE(options.parse(args.size(), args.data()));
+ }
+}
+
+TEST(Options, Extras) {
+ {
+ Options options;
+ auto args = makeArray("-h");
+ EXPECT_MESSAGE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("-H");
+ EXPECT_MESSAGE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("-V");
+ EXPECT_MESSAGE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("--help");
+ EXPECT_MESSAGE(options.parse(args.size(), args.data()));
+ }
+ {
+ Options options;
+ auto args = makeArray("--version");
+ EXPECT_MESSAGE(options.parse(args.size(), args.data()));
+ }
+}
diff --git a/src/zstd/contrib/pzstd/test/PzstdTest.cpp b/src/zstd/contrib/pzstd/test/PzstdTest.cpp
new file mode 100644
index 000000000..5c7d66310
--- /dev/null
+++ b/src/zstd/contrib/pzstd/test/PzstdTest.cpp
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "Pzstd.h"
+extern "C" {
+#include "datagen.h"
+}
+#include "test/RoundTrip.h"
+#include "utils/ScopeGuard.h"
+
+#include <cstddef>
+#include <cstdio>
+#include <gtest/gtest.h>
+#include <memory>
+#include <random>
+
+using namespace std;
+using namespace pzstd;
+
+TEST(Pzstd, SmallSizes) {
+ unsigned seed = std::random_device{}();
+ std::fprintf(stderr, "Pzstd.SmallSizes seed: %u\n", seed);
+ std::mt19937 gen(seed);
+
+ for (unsigned len = 1; len < 256; ++len) {
+ if (len % 16 == 0) {
+ std::fprintf(stderr, "%u / 16\n", len / 16);
+ }
+ std::string inputFile = std::tmpnam(nullptr);
+ auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); });
+ {
+ static uint8_t buf[256];
+ RDG_genBuffer(buf, len, 0.5, 0.0, gen());
+ auto fd = std::fopen(inputFile.c_str(), "wb");
+ auto written = std::fwrite(buf, 1, len, fd);
+ std::fclose(fd);
+ ASSERT_EQ(written, len);
+ }
+ for (unsigned numThreads = 1; numThreads <= 2; ++numThreads) {
+ for (unsigned level = 1; level <= 4; level *= 4) {
+ auto errorGuard = makeScopeGuard([&] {
+ std::fprintf(stderr, "# threads: %u\n", numThreads);
+ std::fprintf(stderr, "compression level: %u\n", level);
+ });
+ Options options;
+ options.overwrite = true;
+ options.inputFiles = {inputFile};
+ options.numThreads = numThreads;
+ options.compressionLevel = level;
+ options.verbosity = 1;
+ ASSERT_TRUE(roundTrip(options));
+ errorGuard.dismiss();
+ }
+ }
+ }
+}
+
+TEST(Pzstd, LargeSizes) {
+ unsigned seed = std::random_device{}();
+ std::fprintf(stderr, "Pzstd.LargeSizes seed: %u\n", seed);
+ std::mt19937 gen(seed);
+
+ for (unsigned len = 1 << 20; len <= (1 << 24); len *= 2) {
+ std::string inputFile = std::tmpnam(nullptr);
+ auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); });
+ {
+ std::unique_ptr<uint8_t[]> buf(new uint8_t[len]);
+ RDG_genBuffer(buf.get(), len, 0.5, 0.0, gen());
+ auto fd = std::fopen(inputFile.c_str(), "wb");
+ auto written = std::fwrite(buf.get(), 1, len, fd);
+ std::fclose(fd);
+ ASSERT_EQ(written, len);
+ }
+ for (unsigned numThreads = 1; numThreads <= 16; numThreads *= 4) {
+ for (unsigned level = 1; level <= 4; level *= 4) {
+ auto errorGuard = makeScopeGuard([&] {
+ std::fprintf(stderr, "# threads: %u\n", numThreads);
+ std::fprintf(stderr, "compression level: %u\n", level);
+ });
+ Options options;
+ options.overwrite = true;
+ options.inputFiles = {inputFile};
+ options.numThreads = std::min(numThreads, options.numThreads);
+ options.compressionLevel = level;
+ options.verbosity = 1;
+ ASSERT_TRUE(roundTrip(options));
+ errorGuard.dismiss();
+ }
+ }
+ }
+}
+
+TEST(Pzstd, DISABLED_ExtremelyLargeSize) {
+ unsigned seed = std::random_device{}();
+ std::fprintf(stderr, "Pzstd.ExtremelyLargeSize seed: %u\n", seed);
+ std::mt19937 gen(seed);
+
+ std::string inputFile = std::tmpnam(nullptr);
+ auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); });
+
+ {
+ // Write 4GB + 64 MB
+ constexpr size_t kLength = 1 << 26;
+ std::unique_ptr<uint8_t[]> buf(new uint8_t[kLength]);
+ auto fd = std::fopen(inputFile.c_str(), "wb");
+ auto closeGuard = makeScopeGuard([&] { std::fclose(fd); });
+ for (size_t i = 0; i < (1 << 6) + 1; ++i) {
+ RDG_genBuffer(buf.get(), kLength, 0.5, 0.0, gen());
+ auto written = std::fwrite(buf.get(), 1, kLength, fd);
+ if (written != kLength) {
+ std::fprintf(stderr, "Failed to write file, skipping test\n");
+ return;
+ }
+ }
+ }
+
+ Options options;
+ options.overwrite = true;
+ options.inputFiles = {inputFile};
+ options.compressionLevel = 1;
+ if (options.numThreads == 0) {
+ options.numThreads = 1;
+ }
+ ASSERT_TRUE(roundTrip(options));
+}
+
+TEST(Pzstd, ExtremelyCompressible) {
+ std::string inputFile = std::tmpnam(nullptr);
+ auto guard = makeScopeGuard([&] { std::remove(inputFile.c_str()); });
+ {
+ std::unique_ptr<uint8_t[]> buf(new uint8_t[10000]);
+ std::memset(buf.get(), 'a', 10000);
+ auto fd = std::fopen(inputFile.c_str(), "wb");
+ auto written = std::fwrite(buf.get(), 1, 10000, fd);
+ std::fclose(fd);
+ ASSERT_EQ(written, 10000);
+ }
+ Options options;
+ options.overwrite = true;
+ options.inputFiles = {inputFile};
+ options.numThreads = 1;
+ options.compressionLevel = 1;
+ ASSERT_TRUE(roundTrip(options));
+}
diff --git a/src/zstd/contrib/pzstd/test/RoundTrip.h b/src/zstd/contrib/pzstd/test/RoundTrip.h
new file mode 100644
index 000000000..c6364ecb4
--- /dev/null
+++ b/src/zstd/contrib/pzstd/test/RoundTrip.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "Options.h"
+#include "Pzstd.h"
+#include "utils/ScopeGuard.h"
+
+#include <cstdio>
+#include <string>
+#include <cstdint>
+#include <memory>
+
+namespace pzstd {
+
+inline bool check(std::string source, std::string decompressed) {
+ std::unique_ptr<std::uint8_t[]> sBuf(new std::uint8_t[1024]);
+ std::unique_ptr<std::uint8_t[]> dBuf(new std::uint8_t[1024]);
+
+ auto sFd = std::fopen(source.c_str(), "rb");
+ auto dFd = std::fopen(decompressed.c_str(), "rb");
+ auto guard = makeScopeGuard([&] {
+ std::fclose(sFd);
+ std::fclose(dFd);
+ });
+
+ size_t sRead, dRead;
+
+ do {
+ sRead = std::fread(sBuf.get(), 1, 1024, sFd);
+ dRead = std::fread(dBuf.get(), 1, 1024, dFd);
+ if (std::ferror(sFd) || std::ferror(dFd)) {
+ return false;
+ }
+ if (sRead != dRead) {
+ return false;
+ }
+
+ for (size_t i = 0; i < sRead; ++i) {
+ if (sBuf.get()[i] != dBuf.get()[i]) {
+ return false;
+ }
+ }
+ } while (sRead == 1024);
+ if (!std::feof(sFd) || !std::feof(dFd)) {
+ return false;
+ }
+ return true;
+}
+
+inline bool roundTrip(Options& options) {
+ if (options.inputFiles.size() != 1) {
+ return false;
+ }
+ std::string source = options.inputFiles.front();
+ std::string compressedFile = std::tmpnam(nullptr);
+ std::string decompressedFile = std::tmpnam(nullptr);
+ auto guard = makeScopeGuard([&] {
+ std::remove(compressedFile.c_str());
+ std::remove(decompressedFile.c_str());
+ });
+
+ {
+ options.outputFile = compressedFile;
+ options.decompress = false;
+ if (pzstdMain(options) != 0) {
+ return false;
+ }
+ }
+ {
+ options.decompress = true;
+ options.inputFiles.front() = compressedFile;
+ options.outputFile = decompressedFile;
+ if (pzstdMain(options) != 0) {
+ return false;
+ }
+ }
+ return check(source, decompressedFile);
+}
+}
diff --git a/src/zstd/contrib/pzstd/test/RoundTripTest.cpp b/src/zstd/contrib/pzstd/test/RoundTripTest.cpp
new file mode 100644
index 000000000..36af0673a
--- /dev/null
+++ b/src/zstd/contrib/pzstd/test/RoundTripTest.cpp
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+extern "C" {
+#include "datagen.h"
+}
+#include "Options.h"
+#include "test/RoundTrip.h"
+#include "utils/ScopeGuard.h"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <random>
+
+using namespace std;
+using namespace pzstd;
+
+namespace {
+string
+writeData(size_t size, double matchProba, double litProba, unsigned seed) {
+ std::unique_ptr<uint8_t[]> buf(new uint8_t[size]);
+ RDG_genBuffer(buf.get(), size, matchProba, litProba, seed);
+ string file = tmpnam(nullptr);
+ auto fd = std::fopen(file.c_str(), "wb");
+ auto guard = makeScopeGuard([&] { std::fclose(fd); });
+ auto bytesWritten = std::fwrite(buf.get(), 1, size, fd);
+ if (bytesWritten != size) {
+ std::abort();
+ }
+ return file;
+}
+
+template <typename Generator>
+string generateInputFile(Generator& gen) {
+ // Use inputs ranging from 1 Byte to 2^16 Bytes
+ std::uniform_int_distribution<size_t> size{1, 1 << 16};
+ std::uniform_real_distribution<> prob{0, 1};
+ return writeData(size(gen), prob(gen), prob(gen), gen());
+}
+
+template <typename Generator>
+Options generateOptions(Generator& gen, const string& inputFile) {
+ Options options;
+ options.inputFiles = {inputFile};
+ options.overwrite = true;
+
+ std::uniform_int_distribution<unsigned> numThreads{1, 32};
+ std::uniform_int_distribution<unsigned> compressionLevel{1, 10};
+
+ options.numThreads = numThreads(gen);
+ options.compressionLevel = compressionLevel(gen);
+
+ return options;
+}
+}
+
+int main() {
+ std::mt19937 gen(std::random_device{}());
+
+ auto newlineGuard = makeScopeGuard([] { std::fprintf(stderr, "\n"); });
+ for (unsigned i = 0; i < 10000; ++i) {
+ if (i % 100 == 0) {
+ std::fprintf(stderr, "Progress: %u%%\r", i / 100);
+ }
+ auto inputFile = generateInputFile(gen);
+ auto inputGuard = makeScopeGuard([&] { std::remove(inputFile.c_str()); });
+ for (unsigned i = 0; i < 10; ++i) {
+ auto options = generateOptions(gen, inputFile);
+ if (!roundTrip(options)) {
+ std::fprintf(stderr, "numThreads: %u\n", options.numThreads);
+ std::fprintf(stderr, "level: %u\n", options.compressionLevel);
+ std::fprintf(stderr, "decompress? %u\n", (unsigned)options.decompress);
+ std::fprintf(stderr, "file: %s\n", inputFile.c_str());
+ return 1;
+ }
+ }
+ }
+ return 0;
+}
diff --git a/src/zstd/contrib/pzstd/utils/BUCK b/src/zstd/contrib/pzstd/utils/BUCK
new file mode 100644
index 000000000..e757f4120
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/BUCK
@@ -0,0 +1,75 @@
+cxx_library(
+ name='buffer',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['Buffer.h'],
+ deps=[':range'],
+)
+
+cxx_library(
+ name='file_system',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['FileSystem.h'],
+ deps=[':range'],
+)
+
+cxx_library(
+ name='likely',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['Likely.h'],
+)
+
+cxx_library(
+ name='range',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['Range.h'],
+ deps=[':likely'],
+)
+
+cxx_library(
+ name='resource_pool',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['ResourcePool.h'],
+)
+
+cxx_library(
+ name='scope_guard',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['ScopeGuard.h'],
+)
+
+cxx_library(
+ name='thread_pool',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['ThreadPool.h'],
+ deps=[':work_queue'],
+)
+
+cxx_library(
+ name='work_queue',
+ visibility=['PUBLIC'],
+ header_namespace='utils',
+ exported_headers=['WorkQueue.h'],
+ deps=[':buffer'],
+)
+
+cxx_library(
+ name='utils',
+ visibility=['PUBLIC'],
+ deps=[
+ ':buffer',
+ ':file_system',
+ ':likely',
+ ':range',
+ ':resource_pool',
+ ':scope_guard',
+ ':thread_pool',
+ ':work_queue',
+ ],
+)
diff --git a/src/zstd/contrib/pzstd/utils/Buffer.h b/src/zstd/contrib/pzstd/utils/Buffer.h
new file mode 100644
index 000000000..f69c3b4d9
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/Buffer.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "utils/Range.h"
+
+#include <array>
+#include <cstddef>
+#include <memory>
+
+namespace pzstd {
+
+/**
+ * A `Buffer` has a pointer to a shared buffer, and a range of the buffer that
+ * it owns.
+ * The idea is that you can allocate one buffer, and write chunks into it
+ * and break off those chunks.
+ * The underlying buffer is reference counted, and will be destroyed when all
+ * `Buffer`s that reference it are destroyed.
+ */
+class Buffer {
+ std::shared_ptr<unsigned char> buffer_;
+ MutableByteRange range_;
+
+ static void delete_buffer(unsigned char* buffer) {
+ delete[] buffer;
+ }
+
+ public:
+ /// Construct an empty buffer that owns no data.
+ explicit Buffer() {}
+
+ /// Construct a `Buffer` that owns a new underlying buffer of size `size`.
+ explicit Buffer(std::size_t size)
+ : buffer_(new unsigned char[size], delete_buffer),
+ range_(buffer_.get(), buffer_.get() + size) {}
+
+ explicit Buffer(std::shared_ptr<unsigned char> buffer, MutableByteRange data)
+ : buffer_(buffer), range_(data) {}
+
+ Buffer(Buffer&&) = default;
+ Buffer& operator=(Buffer&&) & = default;
+
+ /**
+ * Splits the data into two pieces: [begin, begin + n), [begin + n, end).
+ * Their data both points into the same underlying buffer.
+ * Modifies the original `Buffer` to point to only [begin + n, end).
+ *
+ * @param n The offset to split at.
+ * @returns A buffer that owns the data [begin, begin + n).
+ */
+ Buffer splitAt(std::size_t n) {
+ auto firstPiece = range_.subpiece(0, n);
+ range_.advance(n);
+ return Buffer(buffer_, firstPiece);
+ }
+
+ /// Modifies the buffer to point to the range [begin + n, end).
+ void advance(std::size_t n) {
+ range_.advance(n);
+ }
+
+ /// Modifies the buffer to point to the range [begin, end - n).
+ void subtract(std::size_t n) {
+ range_.subtract(n);
+ }
+
+ /// Returns a read only `Range` pointing to the `Buffer`s data.
+ ByteRange range() const {
+ return range_;
+ }
+ /// Returns a mutable `Range` pointing to the `Buffer`s data.
+ MutableByteRange range() {
+ return range_;
+ }
+
+ const unsigned char* data() const {
+ return range_.data();
+ }
+
+ unsigned char* data() {
+ return range_.data();
+ }
+
+ std::size_t size() const {
+ return range_.size();
+ }
+
+ bool empty() const {
+ return range_.empty();
+ }
+};
+}
diff --git a/src/zstd/contrib/pzstd/utils/FileSystem.h b/src/zstd/contrib/pzstd/utils/FileSystem.h
new file mode 100644
index 000000000..3cfbe86e5
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/FileSystem.h
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "utils/Range.h"
+
+#include <sys/stat.h>
+#include <cerrno>
+#include <cstdint>
+#include <system_error>
+
+// A small subset of `std::filesystem`.
+// `std::filesystem` should be a drop in replacement.
+// See http://en.cppreference.com/w/cpp/filesystem for documentation.
+
+namespace pzstd {
+
+// using file_status = ... causes gcc to emit a false positive warning
+#if defined(_MSC_VER)
+typedef struct ::_stat64 file_status;
+#else
+typedef struct ::stat file_status;
+#endif
+
+/// http://en.cppreference.com/w/cpp/filesystem/status
+inline file_status status(StringPiece path, std::error_code& ec) noexcept {
+ file_status status;
+#if defined(_MSC_VER)
+ const auto error = ::_stat64(path.data(), &status);
+#else
+ const auto error = ::stat(path.data(), &status);
+#endif
+ if (error) {
+ ec.assign(errno, std::generic_category());
+ } else {
+ ec.clear();
+ }
+ return status;
+}
+
+/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file
+inline bool is_regular_file(file_status status) noexcept {
+#if defined(S_ISREG)
+ return S_ISREG(status.st_mode);
+#elif !defined(S_ISREG) && defined(S_IFMT) && defined(S_IFREG)
+ return (status.st_mode & S_IFMT) == S_IFREG;
+#else
+ static_assert(false, "No POSIX stat() support.");
+#endif
+}
+
+/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file
+inline bool is_regular_file(StringPiece path, std::error_code& ec) noexcept {
+ return is_regular_file(status(path, ec));
+}
+
+/// http://en.cppreference.com/w/cpp/filesystem/is_directory
+inline bool is_directory(file_status status) noexcept {
+#if defined(S_ISDIR)
+ return S_ISDIR(status.st_mode);
+#elif !defined(S_ISDIR) && defined(S_IFMT) && defined(S_IFDIR)
+ return (status.st_mode & S_IFMT) == S_IFDIR;
+#else
+ static_assert(false, "NO POSIX stat() support.");
+#endif
+}
+
+/// http://en.cppreference.com/w/cpp/filesystem/is_directory
+inline bool is_directory(StringPiece path, std::error_code& ec) noexcept {
+ return is_directory(status(path, ec));
+}
+
+/// http://en.cppreference.com/w/cpp/filesystem/file_size
+inline std::uintmax_t file_size(
+ StringPiece path,
+ std::error_code& ec) noexcept {
+ auto stat = status(path, ec);
+ if (ec) {
+ return -1;
+ }
+ if (!is_regular_file(stat)) {
+ ec.assign(ENOTSUP, std::generic_category());
+ return -1;
+ }
+ ec.clear();
+ return stat.st_size;
+}
+}
diff --git a/src/zstd/contrib/pzstd/utils/Likely.h b/src/zstd/contrib/pzstd/utils/Likely.h
new file mode 100644
index 000000000..7cea8da27
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/Likely.h
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+
+/**
+ * Compiler hints to indicate the fast path of an "if" branch: whether
+ * the if condition is likely to be true or false.
+ *
+ * @author Tudor Bosman (tudorb@fb.com)
+ */
+
+#pragma once
+
+#undef LIKELY
+#undef UNLIKELY
+
+#if defined(__GNUC__) && __GNUC__ >= 4
+#define LIKELY(x) (__builtin_expect((x), 1))
+#define UNLIKELY(x) (__builtin_expect((x), 0))
+#else
+#define LIKELY(x) (x)
+#define UNLIKELY(x) (x)
+#endif
diff --git a/src/zstd/contrib/pzstd/utils/Range.h b/src/zstd/contrib/pzstd/utils/Range.h
new file mode 100644
index 000000000..fedb5d786
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/Range.h
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+
+/**
+ * A subset of `folly/Range.h`.
+ * All code copied verbatim modulo formatting
+ */
+#pragma once
+
+#include "utils/Likely.h"
+
+#include <cstddef>
+#include <cstring>
+#include <stdexcept>
+#include <string>
+#include <type_traits>
+
+namespace pzstd {
+
+namespace detail {
+/*
+ *Use IsCharPointer<T>::type to enable const char* or char*.
+ *Use IsCharPointer<T>::const_type to enable only const char*.
+*/
+template <class T>
+struct IsCharPointer {};
+
+template <>
+struct IsCharPointer<char*> {
+ typedef int type;
+};
+
+template <>
+struct IsCharPointer<const char*> {
+ typedef int const_type;
+ typedef int type;
+};
+
+} // namespace detail
+
+template <typename Iter>
+class Range {
+ Iter b_;
+ Iter e_;
+
+ public:
+ using size_type = std::size_t;
+ using iterator = Iter;
+ using const_iterator = Iter;
+ using value_type = typename std::remove_reference<
+ typename std::iterator_traits<Iter>::reference>::type;
+ using reference = typename std::iterator_traits<Iter>::reference;
+
+ constexpr Range() : b_(), e_() {}
+ constexpr Range(Iter begin, Iter end) : b_(begin), e_(end) {}
+
+ constexpr Range(Iter begin, size_type size) : b_(begin), e_(begin + size) {}
+
+ template <class T = Iter, typename detail::IsCharPointer<T>::type = 0>
+ /* implicit */ Range(Iter str) : b_(str), e_(str + std::strlen(str)) {}
+
+ template <class T = Iter, typename detail::IsCharPointer<T>::const_type = 0>
+ /* implicit */ Range(const std::string& str)
+ : b_(str.data()), e_(b_ + str.size()) {}
+
+ // Allow implicit conversion from Range<From> to Range<To> if From is
+ // implicitly convertible to To.
+ template <
+ class OtherIter,
+ typename std::enable_if<
+ (!std::is_same<Iter, OtherIter>::value &&
+ std::is_convertible<OtherIter, Iter>::value),
+ int>::type = 0>
+ constexpr /* implicit */ Range(const Range<OtherIter>& other)
+ : b_(other.begin()), e_(other.end()) {}
+
+ Range(const Range&) = default;
+ Range(Range&&) = default;
+
+ Range& operator=(const Range&) & = default;
+ Range& operator=(Range&&) & = default;
+
+ constexpr size_type size() const {
+ return e_ - b_;
+ }
+ bool empty() const {
+ return b_ == e_;
+ }
+ Iter data() const {
+ return b_;
+ }
+ Iter begin() const {
+ return b_;
+ }
+ Iter end() const {
+ return e_;
+ }
+
+ void advance(size_type n) {
+ if (UNLIKELY(n > size())) {
+ throw std::out_of_range("index out of range");
+ }
+ b_ += n;
+ }
+
+ void subtract(size_type n) {
+ if (UNLIKELY(n > size())) {
+ throw std::out_of_range("index out of range");
+ }
+ e_ -= n;
+ }
+
+ Range subpiece(size_type first, size_type length = std::string::npos) const {
+ if (UNLIKELY(first > size())) {
+ throw std::out_of_range("index out of range");
+ }
+
+ return Range(b_ + first, std::min(length, size() - first));
+ }
+};
+
+using ByteRange = Range<const unsigned char*>;
+using MutableByteRange = Range<unsigned char*>;
+using StringPiece = Range<const char*>;
+}
diff --git a/src/zstd/contrib/pzstd/utils/ResourcePool.h b/src/zstd/contrib/pzstd/utils/ResourcePool.h
new file mode 100644
index 000000000..8dfcdd765
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/ResourcePool.h
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include <cassert>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+namespace pzstd {
+
+/**
+ * An unbounded pool of resources.
+ * A `ResourcePool<T>` requires a factory function that takes allocates `T*` and
+ * a free function that frees a `T*`.
+ * Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr`
+ * to a `T`, and when it goes out of scope the resource will be returned to the
+ * pool.
+ * The `ResourcePool<T>` *must* survive longer than any resources it hands out.
+ * Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean
+ * up the resource before or after every use.
+ */
+template <typename T>
+class ResourcePool {
+ public:
+ class Deleter;
+ using Factory = std::function<T*()>;
+ using Free = std::function<void(T*)>;
+ using UniquePtr = std::unique_ptr<T, Deleter>;
+
+ private:
+ std::mutex mutex_;
+ Factory factory_;
+ Free free_;
+ std::vector<T*> resources_;
+ unsigned inUse_;
+
+ public:
+ /**
+ * Creates a `ResourcePool`.
+ *
+ * @param factory The function to use to create new resources.
+ * @param free The function to use to free resources created by `factory`.
+ */
+ ResourcePool(Factory factory, Free free)
+ : factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {}
+
+ /**
+ * @returns A unique pointer to a resource. The resource is null iff
+ * there are no available resources and `factory()` returns null.
+ */
+ UniquePtr get() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!resources_.empty()) {
+ UniquePtr resource{resources_.back(), Deleter{*this}};
+ resources_.pop_back();
+ ++inUse_;
+ return resource;
+ }
+ UniquePtr resource{factory_(), Deleter{*this}};
+ ++inUse_;
+ return resource;
+ }
+
+ ~ResourcePool() noexcept {
+ assert(inUse_ == 0);
+ for (const auto resource : resources_) {
+ free_(resource);
+ }
+ }
+
+ class Deleter {
+ ResourcePool *pool_;
+ public:
+ explicit Deleter(ResourcePool &pool) : pool_(&pool) {}
+
+ void operator() (T *resource) {
+ std::lock_guard<std::mutex> lock(pool_->mutex_);
+ // Make sure we don't put null resources into the pool
+ if (resource) {
+ pool_->resources_.push_back(resource);
+ }
+ assert(pool_->inUse_ > 0);
+ --pool_->inUse_;
+ }
+ };
+};
+
+}
diff --git a/src/zstd/contrib/pzstd/utils/ScopeGuard.h b/src/zstd/contrib/pzstd/utils/ScopeGuard.h
new file mode 100644
index 000000000..31768f43d
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/ScopeGuard.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include <utility>
+
+namespace pzstd {
+
+/**
+ * Dismissable scope guard.
+ * `Function` must be callable and take no parameters.
+ * Unless `dissmiss()` is called, the callable is executed upon destruction of
+ * `ScopeGuard`.
+ *
+ * Example:
+ *
+ * auto guard = makeScopeGuard([&] { cleanup(); });
+ */
+template <typename Function>
+class ScopeGuard {
+ Function function;
+ bool dismissed;
+
+ public:
+ explicit ScopeGuard(Function&& function)
+ : function(std::move(function)), dismissed(false) {}
+
+ void dismiss() {
+ dismissed = true;
+ }
+
+ ~ScopeGuard() noexcept {
+ if (!dismissed) {
+ function();
+ }
+ }
+};
+
+/// Creates a scope guard from `function`.
+template <typename Function>
+ScopeGuard<Function> makeScopeGuard(Function&& function) {
+ return ScopeGuard<Function>(std::forward<Function>(function));
+}
+}
diff --git a/src/zstd/contrib/pzstd/utils/ThreadPool.h b/src/zstd/contrib/pzstd/utils/ThreadPool.h
new file mode 100644
index 000000000..8ece8e0da
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/ThreadPool.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "utils/WorkQueue.h"
+
+#include <cstddef>
+#include <functional>
+#include <thread>
+#include <vector>
+
+namespace pzstd {
+/// A simple thread pool that pulls tasks off its queue in FIFO order.
+class ThreadPool {
+ std::vector<std::thread> threads_;
+
+ WorkQueue<std::function<void()>> tasks_;
+
+ public:
+ /// Constructs a thread pool with `numThreads` threads.
+ explicit ThreadPool(std::size_t numThreads) {
+ threads_.reserve(numThreads);
+ for (std::size_t i = 0; i < numThreads; ++i) {
+ threads_.emplace_back([this] {
+ std::function<void()> task;
+ while (tasks_.pop(task)) {
+ task();
+ }
+ });
+ }
+ }
+
+ /// Finishes all tasks currently in the queue.
+ ~ThreadPool() {
+ tasks_.finish();
+ for (auto& thread : threads_) {
+ thread.join();
+ }
+ }
+
+ /**
+ * Adds `task` to the queue of tasks to execute. Since `task` is a
+ * `std::function<>`, it cannot be a move only type. So any lambda passed must
+ * not capture move only types (like `std::unique_ptr`).
+ *
+ * @param task The task to execute.
+ */
+ void add(std::function<void()> task) {
+ tasks_.push(std::move(task));
+ }
+};
+}
diff --git a/src/zstd/contrib/pzstd/utils/WorkQueue.h b/src/zstd/contrib/pzstd/utils/WorkQueue.h
new file mode 100644
index 000000000..1d14d922c
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/WorkQueue.h
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "utils/Buffer.h"
+
+#include <atomic>
+#include <cassert>
+#include <cstddef>
+#include <condition_variable>
+#include <cstddef>
+#include <functional>
+#include <mutex>
+#include <queue>
+
+namespace pzstd {
+
+/// Unbounded thread-safe work queue.
+template <typename T>
+class WorkQueue {
+ // Protects all member variable access
+ std::mutex mutex_;
+ std::condition_variable readerCv_;
+ std::condition_variable writerCv_;
+ std::condition_variable finishCv_;
+
+ std::queue<T> queue_;
+ bool done_;
+ std::size_t maxSize_;
+
+ // Must have lock to call this function
+ bool full() const {
+ if (maxSize_ == 0) {
+ return false;
+ }
+ return queue_.size() >= maxSize_;
+ }
+
+ public:
+ /**
+ * Constructs an empty work queue with an optional max size.
+ * If `maxSize == 0` the queue size is unbounded.
+ *
+ * @param maxSize The maximum allowed size of the work queue.
+ */
+ WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
+
+ /**
+ * Push an item onto the work queue. Notify a single thread that work is
+ * available. If `finish()` has been called, do nothing and return false.
+ * If `push()` returns false, then `item` has not been moved from.
+ *
+ * @param item Item to push onto the queue.
+ * @returns True upon success, false if `finish()` has been called. An
+ * item was pushed iff `push()` returns true.
+ */
+ bool push(T&& item) {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (full() && !done_) {
+ writerCv_.wait(lock);
+ }
+ if (done_) {
+ return false;
+ }
+ queue_.push(std::move(item));
+ }
+ readerCv_.notify_one();
+ return true;
+ }
+
+ /**
+ * Attempts to pop an item off the work queue. It will block until data is
+ * available or `finish()` has been called.
+ *
+ * @param[out] item If `pop` returns `true`, it contains the popped item.
+ * If `pop` returns `false`, it is unmodified.
+ * @returns True upon success. False if the queue is empty and
+ * `finish()` has been called.
+ */
+ bool pop(T& item) {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (queue_.empty() && !done_) {
+ readerCv_.wait(lock);
+ }
+ if (queue_.empty()) {
+ assert(done_);
+ return false;
+ }
+ item = std::move(queue_.front());
+ queue_.pop();
+ }
+ writerCv_.notify_one();
+ return true;
+ }
+
+ /**
+ * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
+ *
+ * @param maxSize The new maximum queue size.
+ */
+ void setMaxSize(std::size_t maxSize) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ maxSize_ = maxSize;
+ }
+ writerCv_.notify_all();
+ }
+
+ /**
+ * Promise that `push()` won't be called again, so once the queue is empty
+ * there will never any more work.
+ */
+ void finish() {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ assert(!done_);
+ done_ = true;
+ }
+ readerCv_.notify_all();
+ writerCv_.notify_all();
+ finishCv_.notify_all();
+ }
+
+ /// Blocks until `finish()` has been called (but the queue may not be empty).
+ void waitUntilFinished() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (!done_) {
+ finishCv_.wait(lock);
+ }
+ }
+};
+
+/// Work queue for `Buffer`s that knows the total number of bytes in the queue.
+class BufferWorkQueue {
+ WorkQueue<Buffer> queue_;
+ std::atomic<std::size_t> size_;
+
+ public:
+ BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
+
+ void push(Buffer buffer) {
+ size_.fetch_add(buffer.size());
+ queue_.push(std::move(buffer));
+ }
+
+ bool pop(Buffer& buffer) {
+ bool result = queue_.pop(buffer);
+ if (result) {
+ size_.fetch_sub(buffer.size());
+ }
+ return result;
+ }
+
+ void setMaxSize(std::size_t maxSize) {
+ queue_.setMaxSize(maxSize);
+ }
+
+ void finish() {
+ queue_.finish();
+ }
+
+ /**
+ * Blocks until `finish()` has been called.
+ *
+ * @returns The total number of bytes of all the `Buffer`s currently in the
+ * queue.
+ */
+ std::size_t size() {
+ queue_.waitUntilFinished();
+ return size_.load();
+ }
+};
+}
diff --git a/src/zstd/contrib/pzstd/utils/test/BUCK b/src/zstd/contrib/pzstd/utils/test/BUCK
new file mode 100644
index 000000000..a5113cab6
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/BUCK
@@ -0,0 +1,35 @@
+cxx_test(
+ name='buffer_test',
+ srcs=['BufferTest.cpp'],
+ deps=['//contrib/pzstd/utils:buffer'],
+)
+
+cxx_test(
+ name='range_test',
+ srcs=['RangeTest.cpp'],
+ deps=['//contrib/pzstd/utils:range'],
+)
+
+cxx_test(
+ name='resource_pool_test',
+ srcs=['ResourcePoolTest.cpp'],
+ deps=['//contrib/pzstd/utils:resource_pool'],
+)
+
+cxx_test(
+ name='scope_guard_test',
+ srcs=['ScopeGuardTest.cpp'],
+ deps=['//contrib/pzstd/utils:scope_guard'],
+)
+
+cxx_test(
+ name='thread_pool_test',
+ srcs=['ThreadPoolTest.cpp'],
+ deps=['//contrib/pzstd/utils:thread_pool'],
+)
+
+cxx_test(
+ name='work_queue_test',
+ srcs=['RangeTest.cpp'],
+ deps=['//contrib/pzstd/utils:work_queue'],
+)
diff --git a/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp b/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp
new file mode 100644
index 000000000..fbba74e82
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "utils/Buffer.h"
+#include "utils/Range.h"
+
+#include <gtest/gtest.h>
+#include <memory>
+
+using namespace pzstd;
+
+namespace {
+void deleter(const unsigned char* buf) {
+ delete[] buf;
+}
+}
+
+TEST(Buffer, Constructors) {
+ Buffer empty;
+ EXPECT_TRUE(empty.empty());
+ EXPECT_EQ(0, empty.size());
+
+ Buffer sized(5);
+ EXPECT_FALSE(sized.empty());
+ EXPECT_EQ(5, sized.size());
+
+ Buffer moved(std::move(sized));
+ EXPECT_FALSE(sized.empty());
+ EXPECT_EQ(5, sized.size());
+
+ Buffer assigned;
+ assigned = std::move(moved);
+ EXPECT_FALSE(sized.empty());
+ EXPECT_EQ(5, sized.size());
+}
+
+TEST(Buffer, BufferManagement) {
+ std::shared_ptr<unsigned char> buf(new unsigned char[10], deleter);
+ {
+ Buffer acquired(buf, MutableByteRange(buf.get(), buf.get() + 10));
+ EXPECT_EQ(2, buf.use_count());
+ Buffer moved(std::move(acquired));
+ EXPECT_EQ(2, buf.use_count());
+ Buffer assigned;
+ assigned = std::move(moved);
+ EXPECT_EQ(2, buf.use_count());
+
+ Buffer split = assigned.splitAt(5);
+ EXPECT_EQ(3, buf.use_count());
+
+ split.advance(1);
+ assigned.subtract(1);
+ EXPECT_EQ(3, buf.use_count());
+ }
+ EXPECT_EQ(1, buf.use_count());
+}
+
+TEST(Buffer, Modifiers) {
+ Buffer buf(10);
+ {
+ unsigned char i = 0;
+ for (auto& byte : buf.range()) {
+ byte = i++;
+ }
+ }
+
+ auto prefix = buf.splitAt(2);
+
+ ASSERT_EQ(2, prefix.size());
+ EXPECT_EQ(0, *prefix.data());
+
+ ASSERT_EQ(8, buf.size());
+ EXPECT_EQ(2, *buf.data());
+
+ buf.advance(2);
+ EXPECT_EQ(4, *buf.data());
+
+ EXPECT_EQ(9, *(buf.range().end() - 1));
+
+ buf.subtract(2);
+ EXPECT_EQ(7, *(buf.range().end() - 1));
+
+ EXPECT_EQ(4, buf.size());
+}
diff --git a/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp b/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp
new file mode 100644
index 000000000..755b50fa6
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "utils/Range.h"
+
+#include <gtest/gtest.h>
+#include <string>
+
+using namespace pzstd;
+
+// Range is directly copied from folly.
+// Just some sanity tests to make sure everything seems to work.
+
+TEST(Range, Constructors) {
+ StringPiece empty;
+ EXPECT_TRUE(empty.empty());
+ EXPECT_EQ(0, empty.size());
+
+ std::string str = "hello";
+ {
+ Range<std::string::const_iterator> piece(str.begin(), str.end());
+ EXPECT_EQ(5, piece.size());
+ EXPECT_EQ('h', *piece.data());
+ EXPECT_EQ('o', *(piece.end() - 1));
+ }
+
+ {
+ StringPiece piece(str.data(), str.size());
+ EXPECT_EQ(5, piece.size());
+ EXPECT_EQ('h', *piece.data());
+ EXPECT_EQ('o', *(piece.end() - 1));
+ }
+
+ {
+ StringPiece piece(str);
+ EXPECT_EQ(5, piece.size());
+ EXPECT_EQ('h', *piece.data());
+ EXPECT_EQ('o', *(piece.end() - 1));
+ }
+
+ {
+ StringPiece piece(str.c_str());
+ EXPECT_EQ(5, piece.size());
+ EXPECT_EQ('h', *piece.data());
+ EXPECT_EQ('o', *(piece.end() - 1));
+ }
+}
+
+TEST(Range, Modifiers) {
+ StringPiece range("hello world");
+ ASSERT_EQ(11, range.size());
+
+ {
+ auto hello = range.subpiece(0, 5);
+ EXPECT_EQ(5, hello.size());
+ EXPECT_EQ('h', *hello.data());
+ EXPECT_EQ('o', *(hello.end() - 1));
+ }
+ {
+ auto hello = range;
+ hello.subtract(6);
+ EXPECT_EQ(5, hello.size());
+ EXPECT_EQ('h', *hello.data());
+ EXPECT_EQ('o', *(hello.end() - 1));
+ }
+ {
+ auto world = range;
+ world.advance(6);
+ EXPECT_EQ(5, world.size());
+ EXPECT_EQ('w', *world.data());
+ EXPECT_EQ('d', *(world.end() - 1));
+ }
+
+ std::string expected = "hello world";
+ EXPECT_EQ(expected, std::string(range.begin(), range.end()));
+ EXPECT_EQ(expected, std::string(range.data(), range.size()));
+}
diff --git a/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp b/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp
new file mode 100644
index 000000000..6fe145180
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "utils/ResourcePool.h"
+
+#include <gtest/gtest.h>
+#include <atomic>
+#include <thread>
+
+using namespace pzstd;
+
+TEST(ResourcePool, FullTest) {
+ unsigned numCreated = 0;
+ unsigned numDeleted = 0;
+ {
+ ResourcePool<int> pool(
+ [&numCreated] { ++numCreated; return new int{5}; },
+ [&numDeleted](int *x) { ++numDeleted; delete x; });
+
+ {
+ auto i = pool.get();
+ EXPECT_EQ(5, *i);
+ *i = 6;
+ }
+ {
+ auto i = pool.get();
+ EXPECT_EQ(6, *i);
+ auto j = pool.get();
+ EXPECT_EQ(5, *j);
+ *j = 7;
+ }
+ {
+ auto i = pool.get();
+ EXPECT_EQ(6, *i);
+ auto j = pool.get();
+ EXPECT_EQ(7, *j);
+ }
+ }
+ EXPECT_EQ(2, numCreated);
+ EXPECT_EQ(numCreated, numDeleted);
+}
+
+TEST(ResourcePool, ThreadSafe) {
+ std::atomic<unsigned> numCreated{0};
+ std::atomic<unsigned> numDeleted{0};
+ {
+ ResourcePool<int> pool(
+ [&numCreated] { ++numCreated; return new int{0}; },
+ [&numDeleted](int *x) { ++numDeleted; delete x; });
+ auto push = [&pool] {
+ for (int i = 0; i < 100; ++i) {
+ auto x = pool.get();
+ ++*x;
+ }
+ };
+ std::thread t1{push};
+ std::thread t2{push};
+ t1.join();
+ t2.join();
+
+ auto x = pool.get();
+ auto y = pool.get();
+ EXPECT_EQ(200, *x + *y);
+ }
+ EXPECT_GE(2, numCreated);
+ EXPECT_EQ(numCreated, numDeleted);
+}
diff --git a/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp b/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp
new file mode 100644
index 000000000..7bc624da7
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "utils/ScopeGuard.h"
+
+#include <gtest/gtest.h>
+
+using namespace pzstd;
+
+TEST(ScopeGuard, Dismiss) {
+ {
+ auto guard = makeScopeGuard([&] { EXPECT_TRUE(false); });
+ guard.dismiss();
+ }
+}
+
+TEST(ScopeGuard, Executes) {
+ bool executed = false;
+ {
+ auto guard = makeScopeGuard([&] { executed = true; });
+ }
+ EXPECT_TRUE(executed);
+}
diff --git a/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp b/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp
new file mode 100644
index 000000000..703fd4c9c
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "utils/ThreadPool.h"
+
+#include <gtest/gtest.h>
+#include <atomic>
+#include <iostream>
+#include <thread>
+#include <vector>
+
+using namespace pzstd;
+
+TEST(ThreadPool, Ordering) {
+ std::vector<int> results;
+
+ {
+ ThreadPool executor(1);
+ for (int i = 0; i < 10; ++i) {
+ executor.add([ &results, i ] { results.push_back(i); });
+ }
+ }
+
+ for (int i = 0; i < 10; ++i) {
+ EXPECT_EQ(i, results[i]);
+ }
+}
+
+TEST(ThreadPool, AllJobsFinished) {
+ std::atomic<unsigned> numFinished{0};
+ std::atomic<bool> start{false};
+ {
+ std::cerr << "Creating executor" << std::endl;
+ ThreadPool executor(5);
+ for (int i = 0; i < 10; ++i) {
+ executor.add([ &numFinished, &start ] {
+ while (!start.load()) {
+ std::this_thread::yield();
+ }
+ ++numFinished;
+ });
+ }
+ std::cerr << "Starting" << std::endl;
+ start.store(true);
+ std::cerr << "Finishing" << std::endl;
+ }
+ EXPECT_EQ(10, numFinished.load());
+}
+
+TEST(ThreadPool, AddJobWhileJoining) {
+ std::atomic<bool> done{false};
+ {
+ ThreadPool executor(1);
+ executor.add([&executor, &done] {
+ while (!done.load()) {
+ std::this_thread::yield();
+ }
+ // Sleep for a second to be sure that we are joining
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ executor.add([] {
+ EXPECT_TRUE(false);
+ });
+ });
+ done.store(true);
+ }
+}
diff --git a/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp b/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp
new file mode 100644
index 000000000..14cf77304
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp
@@ -0,0 +1,282 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#include "utils/Buffer.h"
+#include "utils/WorkQueue.h"
+
+#include <gtest/gtest.h>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+using namespace pzstd;
+
+namespace {
+struct Popper {
+ WorkQueue<int>* queue;
+ int* results;
+ std::mutex* mutex;
+
+ void operator()() {
+ int result;
+ while (queue->pop(result)) {
+ std::lock_guard<std::mutex> lock(*mutex);
+ results[result] = result;
+ }
+ }
+};
+}
+
+TEST(WorkQueue, SingleThreaded) {
+ WorkQueue<int> queue;
+ int result;
+
+ queue.push(5);
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(5, result);
+
+ queue.push(1);
+ queue.push(2);
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(1, result);
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(2, result);
+
+ queue.push(1);
+ queue.push(2);
+ queue.finish();
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(1, result);
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(2, result);
+ EXPECT_FALSE(queue.pop(result));
+
+ queue.waitUntilFinished();
+}
+
+TEST(WorkQueue, SPSC) {
+ WorkQueue<int> queue;
+ const int max = 100;
+
+ for (int i = 0; i < 10; ++i) {
+ queue.push(int{i});
+ }
+
+ std::thread thread([ &queue, max ] {
+ int result;
+ for (int i = 0;; ++i) {
+ if (!queue.pop(result)) {
+ EXPECT_EQ(i, max);
+ break;
+ }
+ EXPECT_EQ(i, result);
+ }
+ });
+
+ std::this_thread::yield();
+ for (int i = 10; i < max; ++i) {
+ queue.push(int{i});
+ }
+ queue.finish();
+
+ thread.join();
+}
+
+TEST(WorkQueue, SPMC) {
+ WorkQueue<int> queue;
+ std::vector<int> results(50, -1);
+ std::mutex mutex;
+ std::vector<std::thread> threads;
+ for (int i = 0; i < 5; ++i) {
+ threads.emplace_back(Popper{&queue, results.data(), &mutex});
+ }
+
+ for (int i = 0; i < 50; ++i) {
+ queue.push(int{i});
+ }
+ queue.finish();
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ for (int i = 0; i < 50; ++i) {
+ EXPECT_EQ(i, results[i]);
+ }
+}
+
+TEST(WorkQueue, MPMC) {
+ WorkQueue<int> queue;
+ std::vector<int> results(100, -1);
+ std::mutex mutex;
+ std::vector<std::thread> popperThreads;
+ for (int i = 0; i < 4; ++i) {
+ popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
+ }
+
+ std::vector<std::thread> pusherThreads;
+ for (int i = 0; i < 2; ++i) {
+ auto min = i * 50;
+ auto max = (i + 1) * 50;
+ pusherThreads.emplace_back(
+ [ &queue, min, max ] {
+ for (int i = min; i < max; ++i) {
+ queue.push(int{i});
+ }
+ });
+ }
+
+ for (auto& thread : pusherThreads) {
+ thread.join();
+ }
+ queue.finish();
+
+ for (auto& thread : popperThreads) {
+ thread.join();
+ }
+
+ for (int i = 0; i < 100; ++i) {
+ EXPECT_EQ(i, results[i]);
+ }
+}
+
+TEST(WorkQueue, BoundedSizeWorks) {
+ WorkQueue<int> queue(1);
+ int result;
+ queue.push(5);
+ queue.pop(result);
+ queue.push(5);
+ queue.pop(result);
+ queue.push(5);
+ queue.finish();
+ queue.pop(result);
+ EXPECT_EQ(5, result);
+}
+
+TEST(WorkQueue, BoundedSizePushAfterFinish) {
+ WorkQueue<int> queue(1);
+ int result;
+ queue.push(5);
+ std::thread pusher([&queue] {
+ queue.push(6);
+ });
+ // Dirtily try and make sure that pusher has run.
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ queue.finish();
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(5, result);
+ EXPECT_FALSE(queue.pop(result));
+
+ pusher.join();
+}
+
+TEST(WorkQueue, SetMaxSize) {
+ WorkQueue<int> queue(2);
+ int result;
+ queue.push(5);
+ queue.push(6);
+ queue.setMaxSize(1);
+ std::thread pusher([&queue] {
+ queue.push(7);
+ });
+ // Dirtily try and make sure that pusher has run.
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ queue.finish();
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(5, result);
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(6, result);
+ EXPECT_FALSE(queue.pop(result));
+
+ pusher.join();
+}
+
+TEST(WorkQueue, BoundedSizeMPMC) {
+ WorkQueue<int> queue(10);
+ std::vector<int> results(200, -1);
+ std::mutex mutex;
+ std::cerr << "Creating popperThreads" << std::endl;
+ std::vector<std::thread> popperThreads;
+ for (int i = 0; i < 4; ++i) {
+ popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
+ }
+
+ std::cerr << "Creating pusherThreads" << std::endl;
+ std::vector<std::thread> pusherThreads;
+ for (int i = 0; i < 2; ++i) {
+ auto min = i * 100;
+ auto max = (i + 1) * 100;
+ pusherThreads.emplace_back(
+ [ &queue, min, max ] {
+ for (int i = min; i < max; ++i) {
+ queue.push(int{i});
+ }
+ });
+ }
+
+ std::cerr << "Joining pusherThreads" << std::endl;
+ for (auto& thread : pusherThreads) {
+ thread.join();
+ }
+ std::cerr << "Finishing queue" << std::endl;
+ queue.finish();
+
+ std::cerr << "Joining popperThreads" << std::endl;
+ for (auto& thread : popperThreads) {
+ thread.join();
+ }
+
+ std::cerr << "Inspecting results" << std::endl;
+ for (int i = 0; i < 200; ++i) {
+ EXPECT_EQ(i, results[i]);
+ }
+}
+
+TEST(WorkQueue, FailedPush) {
+ WorkQueue<std::unique_ptr<int>> queue;
+ std::unique_ptr<int> x(new int{5});
+ EXPECT_TRUE(queue.push(std::move(x)));
+ EXPECT_EQ(nullptr, x);
+ queue.finish();
+ x.reset(new int{6});
+ EXPECT_FALSE(queue.push(std::move(x)));
+ EXPECT_NE(nullptr, x);
+ EXPECT_EQ(6, *x);
+}
+
+TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
+ {
+ BufferWorkQueue queue;
+ queue.finish();
+ EXPECT_EQ(0, queue.size());
+ }
+ {
+ BufferWorkQueue queue;
+ queue.push(Buffer(10));
+ queue.finish();
+ EXPECT_EQ(10, queue.size());
+ }
+ {
+ BufferWorkQueue queue;
+ queue.push(Buffer(10));
+ queue.push(Buffer(5));
+ queue.finish();
+ EXPECT_EQ(15, queue.size());
+ }
+ {
+ BufferWorkQueue queue;
+ queue.push(Buffer(10));
+ queue.push(Buffer(5));
+ queue.finish();
+ Buffer buffer;
+ queue.pop(buffer);
+ EXPECT_EQ(5, queue.size());
+ }
+}