summaryrefslogtreecommitdiffstats
path: root/src/compressor/QatAccel.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/compressor/QatAccel.cc141
1 files changed, 141 insertions, 0 deletions
diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc
new file mode 100644
index 00000000..7836243b
--- /dev/null
+++ b/src/compressor/QatAccel.cc
@@ -0,0 +1,141 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "QatAccel.h"
+
+/* Estimate data expansion after decompression */
+static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
+
+QatAccel::~QatAccel() {
+ if (NULL != session.internal) {
+ qzTeardownSession(&session);
+ qzClose(&session);
+ }
+}
+
+bool QatAccel::init(const std::string &alg) {
+ QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
+ int rc;
+
+ rc = qzGetDefaults(&params);
+ if (rc != QZ_OK)
+ return false;
+ params.direction = QZ_DIR_BOTH;
+ if (alg == "snappy")
+ params.comp_algorithm = QZ_SNAPPY;
+ else if (alg == "zlib")
+ params.comp_algorithm = QZ_DEFLATE;
+ else if (alg == "lz4")
+ params.comp_algorithm = QZ_LZ4;
+ else
+ return false;
+
+ rc = qzSetDefaults(&params);
+ if (rc != QZ_OK)
+ return false;
+
+ rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW)
+ return false;
+
+ rc = qzSetupSession(&session, &params);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) {
+ qzTeardownSession(&session);
+ qzClose(&session);
+ return false;
+ }
+
+ return true;
+}
+
+int QatAccel::compress(const bufferlist &in, bufferlist &out) {
+ for (auto &i : in.buffers()) {
+ const unsigned char* c_in = (unsigned char*) i.c_str();
+ unsigned int len = i.length();
+ unsigned int out_len = qzMaxCompressedLength(len);
+
+ bufferptr ptr = buffer::create_small_page_aligned(out_len);
+ int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
+ if (rc != QZ_OK)
+ return -1;
+ out.append(ptr, 0, out_len);
+ }
+
+ return 0;
+}
+
+int QatAccel::decompress(const bufferlist &in, bufferlist &out) {
+ auto i = in.begin();
+ return decompress(i, in.length(), out);
+}
+
+int QatAccel::decompress(bufferlist::const_iterator &p,
+ size_t compressed_len,
+ bufferlist &dst) {
+ unsigned int ratio_idx = 0;
+ bool read_more = false;
+ bool joint = false;
+ int rc = 0;
+ bufferlist tmp;
+ size_t remaining = MIN(p.get_remaining(), compressed_len);
+
+ while (remaining) {
+ if (p.end()) {
+ return -1;
+ }
+
+ bufferptr cur_ptr = p.get_current_ptr();
+ unsigned int len = cur_ptr.length();
+ if (joint) {
+ if (read_more)
+ tmp.append(cur_ptr.c_str(), len);
+ len = tmp.length();
+ }
+ unsigned int out_len = len * expansion_ratio[ratio_idx];
+ bufferptr ptr = buffer::create_small_page_aligned(out_len);
+
+ if (joint)
+ rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
+ else
+ rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
+ if (rc == QZ_DATA_ERROR) {
+ if (!joint) {
+ tmp.append(cur_ptr.c_str(), cur_ptr.length());
+ p.advance(remaining);
+ joint = true;
+ }
+ read_more = true;
+ continue;
+ } else if (rc == QZ_BUF_ERROR) {
+ if (ratio_idx == std::size(expansion_ratio))
+ return -1;
+ if (joint)
+ read_more = false;
+ ratio_idx++;
+ continue;
+ } else if (rc != QZ_OK) {
+ return -1;
+ } else {
+ ratio_idx = 0;
+ joint = false;
+ read_more = false;
+ }
+
+ p.advance(remaining);
+ remaining -= len;
+ dst.append(ptr, 0, out_len);
+ }
+
+ return 0;
+}