diff options
Diffstat (limited to 'src/compressor/QatAccel.cc')
-rw-r--r-- | src/compressor/QatAccel.cc | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc new file mode 100644 index 00000000..7836243b --- /dev/null +++ b/src/compressor/QatAccel.cc @@ -0,0 +1,141 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Intel Corporation + * + * Author: Qiaowei Ren <qiaowei.ren@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "QatAccel.h" + +/* Estimate data expansion after decompression */ +static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200}; + +QatAccel::~QatAccel() { + if (NULL != session.internal) { + qzTeardownSession(&session); + qzClose(&session); + } +} + +bool QatAccel::init(const std::string &alg) { + QzSessionParams_T params = {(QzHuffmanHdr_T)0,}; + int rc; + + rc = qzGetDefaults(¶ms); + if (rc != QZ_OK) + return false; + params.direction = QZ_DIR_BOTH; + if (alg == "snappy") + params.comp_algorithm = QZ_SNAPPY; + else if (alg == "zlib") + params.comp_algorithm = QZ_DEFLATE; + else if (alg == "lz4") + params.comp_algorithm = QZ_LZ4; + else + return false; + + rc = qzSetDefaults(¶ms); + if (rc != QZ_OK) + return false; + + rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT); + if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW) + return false; + + rc = qzSetupSession(&session, ¶ms); + if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) { + qzTeardownSession(&session); + qzClose(&session); + return false; + } + + return true; +} + +int QatAccel::compress(const bufferlist &in, bufferlist &out) { + for (auto &i : in.buffers()) { + const unsigned char* c_in = (unsigned char*) i.c_str(); + unsigned int len = i.length(); + unsigned int out_len = qzMaxCompressedLength(len); + + bufferptr ptr = buffer::create_small_page_aligned(out_len); + int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1); + if (rc != QZ_OK) + return -1; + out.append(ptr, 0, out_len); + } + + return 0; +} + +int QatAccel::decompress(const bufferlist &in, bufferlist &out) { + auto i = in.begin(); + return decompress(i, in.length(), out); +} + +int QatAccel::decompress(bufferlist::const_iterator &p, + size_t compressed_len, + bufferlist &dst) { + unsigned int ratio_idx = 0; + bool read_more = false; + bool joint = false; + int rc = 0; + bufferlist tmp; + size_t remaining = MIN(p.get_remaining(), compressed_len); + + while (remaining) { + if (p.end()) { + return -1; + } + + bufferptr cur_ptr = p.get_current_ptr(); + unsigned int len = cur_ptr.length(); + if (joint) { + if (read_more) + tmp.append(cur_ptr.c_str(), len); + len = tmp.length(); + } + unsigned int out_len = len * expansion_ratio[ratio_idx]; + bufferptr ptr = buffer::create_small_page_aligned(out_len); + + if (joint) + rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len); + else + rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len); + if (rc == QZ_DATA_ERROR) { + if (!joint) { + tmp.append(cur_ptr.c_str(), cur_ptr.length()); + p.advance(remaining); + joint = true; + } + read_more = true; + continue; + } else if (rc == QZ_BUF_ERROR) { + if (ratio_idx == std::size(expansion_ratio)) + return -1; + if (joint) + read_more = false; + ratio_idx++; + continue; + } else if (rc != QZ_OK) { + return -1; + } else { + ratio_idx = 0; + joint = false; + read_more = false; + } + + p.advance(remaining); + remaining -= len; + dst.append(ptr, 0, out_len); + } + + return 0; +} |