diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/util/compression.cc | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/src/rocksdb/util/compression.cc b/src/rocksdb/util/compression.cc new file mode 100644 index 000000000..8e2f01b12 --- /dev/null +++ b/src/rocksdb/util/compression.cc @@ -0,0 +1,122 @@ +// Copyright (c) 2022-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +StreamingCompress* StreamingCompress::Create(CompressionType compression_type, + const CompressionOptions& opts, + uint32_t compress_format_version, + size_t max_output_len) { + switch (compression_type) { + case kZSTD: { + if (!ZSTD_Streaming_Supported()) { + return nullptr; + } + return new ZSTDStreamingCompress(opts, compress_format_version, + max_output_len); + } + default: + return nullptr; + } +} + +StreamingUncompress* StreamingUncompress::Create( + CompressionType compression_type, uint32_t compress_format_version, + size_t max_output_len) { + switch (compression_type) { + case kZSTD: { + if (!ZSTD_Streaming_Supported()) { + return nullptr; + } + return new ZSTDStreamingUncompress(compress_format_version, + max_output_len); + } + default: + return nullptr; + } +} + +int ZSTDStreamingCompress::Compress(const char* input, size_t input_size, + char* output, size_t* output_pos) { + assert(input != nullptr && output != nullptr && output_pos != nullptr); + *output_pos = 0; + // Don't need to compress an empty input + if (input_size == 0) { + return 0; + } +#ifndef ZSTD_STREAMING + (void)input; + (void)input_size; + (void)output; + return -1; +#else + if (input_buffer_.src == nullptr || input_buffer_.src != input) { + // New input + // Catch errors where the previous input was not fully decompressed. + assert(input_buffer_.pos == input_buffer_.size); + input_buffer_ = {input, input_size, /*pos=*/0}; + } else if (input_buffer_.src == input) { + // Same input, not fully compressed. + } + ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0}; + const size_t remaining = + ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end); + if (ZSTD_isError(remaining)) { + // Failure + Reset(); + return -1; + } + // Success + *output_pos = output_buffer.pos; + return (int)remaining; +#endif +} + +void ZSTDStreamingCompress::Reset() { +#ifdef ZSTD_STREAMING + ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only); + input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; +#endif +} + +int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, + char* output, size_t* output_pos) { + assert(input != nullptr && output != nullptr && output_pos != nullptr); + *output_pos = 0; + // Don't need to uncompress an empty input + if (input_size == 0) { + return 0; + } +#ifdef ZSTD_STREAMING + if (input_buffer_.src != input) { + // New input + input_buffer_ = {input, input_size, /*pos=*/0}; + } + ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0}; + size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_); + if (ZSTD_isError(ret)) { + Reset(); + return -1; + } + *output_pos = output_buffer.pos; + return (int)(input_buffer_.size - input_buffer_.pos); +#else + (void)input; + (void)input_size; + (void)output; + return -1; +#endif +} + +void ZSTDStreamingUncompress::Reset() { +#ifdef ZSTD_STREAMING + ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only); + input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0}; +#endif +} + +} // namespace ROCKSDB_NAMESPACE |