summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_compression.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_compression.cc')
-rw-r--r--src/rgw/rgw_compression.cc236
1 files changed, 236 insertions, 0 deletions
diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc
new file mode 100644
index 000000000..8306e766a
--- /dev/null
+++ b/src/rgw/rgw_compression.cc
@@ -0,0 +1,236 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_compression.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+
+int rgw_compression_info_from_attr(const bufferlist& attr,
+ bool& need_decompress,
+ RGWCompressionInfo& cs_info)
+{
+ auto bliter = attr.cbegin();
+ try {
+ decode(cs_info, bliter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ if (cs_info.blocks.size() == 0) {
+ return -EIO;
+ }
+ if (cs_info.compression_type != "none")
+ need_decompress = true;
+ else
+ need_decompress = false;
+ return 0;
+}
+
+int rgw_compression_info_from_attrset(const map<string, bufferlist>& attrs,
+ bool& need_decompress,
+ RGWCompressionInfo& cs_info)
+{
+ auto value = attrs.find(RGW_ATTR_COMPRESSION);
+ if (value == attrs.end()) {
+ need_decompress = false;
+ return 0;
+ }
+ return rgw_compression_info_from_attr(value->second, need_decompress, cs_info);
+}
+
+//------------RGWPutObj_Compress---------------
+
+int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
+{
+ bufferlist out;
+ compressed_ofs = logical_offset;
+
+ if (in.length() > 0) {
+ // compression stuff
+ if ((logical_offset > 0 && compressed) || // if previous part was compressed
+ (logical_offset == 0)) { // or it's the first part
+ ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
+ int cr = compressor->compress(in, out, compressor_message);
+ if (cr < 0) {
+ if (logical_offset > 0) {
+ lderr(cct) << "Compression failed with exit code " << cr
+ << " for next part, compression process failed" << dendl;
+ return -EIO;
+ }
+ compressed = false;
+ ldout(cct, 5) << "Compression failed with exit code " << cr
+ << " for first part, storing uncompressed" << dendl;
+ out = std::move(in);
+ } else {
+ compressed = true;
+
+ compression_block newbl;
+ size_t bs = blocks.size();
+ newbl.old_ofs = logical_offset;
+ newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
+ newbl.len = out.length();
+ blocks.push_back(newbl);
+
+ compressed_ofs = newbl.new_ofs;
+ }
+ } else {
+ compressed = false;
+ out = std::move(in);
+ }
+ // end of compression stuff
+ } else {
+ size_t bs = blocks.size();
+ compressed_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : logical_offset;
+ }
+
+ return Pipe::process(std::move(out), compressed_ofs);
+}
+
+//----------------RGWGetObj_Decompress---------------------
+RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
+ RGWCompressionInfo* cs_info_,
+ bool partial_content_,
+ RGWGetObj_Filter* next): RGWGetObj_Filter(next),
+ cct(cct_),
+ cs_info(cs_info_),
+ partial_content(partial_content_),
+ q_ofs(0),
+ q_len(0),
+ cur_ofs(0)
+{
+ compressor = Compressor::create(cct, cs_info->compression_type);
+ if (!compressor.get())
+ lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
+}
+
+int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+ ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
+ << "bl_ofs=" << bl_ofs << ", bl_len=" << bl_len << dendl;
+
+ if (!compressor.get()) {
+ // if compressor isn't available - error, because cannot return decompressed data?
+ lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
+ return -EIO;
+ }
+ bufferlist out_bl, in_bl, temp_in_bl;
+ bl.begin(bl_ofs).copy(bl_len, temp_in_bl);
+ bl_ofs = 0;
+ int r = 0;
+ if (waiting.length() != 0) {
+ in_bl.append(waiting);
+ in_bl.append(temp_in_bl);
+ waiting.clear();
+ } else {
+ in_bl = std::move(temp_in_bl);
+ }
+ bl_len = in_bl.length();
+
+ auto iter_in_bl = in_bl.cbegin();
+ while (first_block <= last_block) {
+ bufferlist tmp;
+ off_t ofs_in_bl = first_block->new_ofs - cur_ofs;
+ if (ofs_in_bl + (off_t)first_block->len > bl_len) {
+ // not complete block, put it to waiting
+ unsigned tail = bl_len - ofs_in_bl;
+ if (iter_in_bl.get_off() != ofs_in_bl) {
+ iter_in_bl.seek(ofs_in_bl);
+ }
+ iter_in_bl.copy(tail, waiting);
+ cur_ofs -= tail;
+ break;
+ }
+ if (iter_in_bl.get_off() != ofs_in_bl) {
+ iter_in_bl.seek(ofs_in_bl);
+ }
+ iter_in_bl.copy(first_block->len, tmp);
+ int cr = compressor->decompress(tmp, out_bl, cs_info->compressor_message);
+ if (cr < 0) {
+ lderr(cct) << "Decompression failed with exit code " << cr << dendl;
+ return cr;
+ }
+ ++first_block;
+ while (out_bl.length() - q_ofs >=
+ static_cast<off_t>(cct->_conf->rgw_max_chunk_size)) {
+ off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len);
+ q_len -= ch_len;
+ r = next->handle_data(out_bl, q_ofs, ch_len);
+ if (r < 0) {
+ lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl;
+ return r;
+ }
+ out_bl.splice(0, q_ofs + ch_len);
+ q_ofs = 0;
+ }
+ }
+
+ cur_ofs += bl_len;
+ off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len);
+ if (ch_len > 0) {
+ r = next->handle_data(out_bl, q_ofs, ch_len);
+ if (r < 0) {
+ lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl;
+ return r;
+ }
+ out_bl.splice(0, q_ofs + ch_len);
+ q_len -= ch_len;
+ q_ofs = 0;
+ }
+ return r;
+}
+
+int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
+{
+ if (partial_content) {
+ // if user set range, we need to calculate it in decompressed data
+ first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
+ if (cs_info->blocks.size() > 1) {
+ vector<compression_block>::iterator fb, lb;
+ // not bad to use auto for lambda, I think
+ auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; };
+ auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; };
+ fb = upper_bound(cs_info->blocks.begin()+1,
+ cs_info->blocks.end(),
+ ofs,
+ cmp_u);
+ first_block = fb - 1;
+ lb = lower_bound(fb,
+ cs_info->blocks.end(),
+ end,
+ cmp_l);
+ last_block = lb - 1;
+ }
+ } else {
+ first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
+ }
+
+ q_ofs = ofs - first_block->old_ofs;
+ q_len = end + 1 - ofs;
+
+ ofs = first_block->new_ofs;
+ end = last_block->new_ofs + last_block->len - 1;
+
+ cur_ofs = ofs;
+ waiting.clear();
+
+ return next->fixup_range(ofs, end);
+}
+
+void compression_block::dump(Formatter *f) const
+{
+ f->dump_unsigned("old_ofs", old_ofs);
+ f->dump_unsigned("new_ofs", new_ofs);
+ f->dump_unsigned("len", len);
+}
+
+void RGWCompressionInfo::dump(Formatter *f) const
+{
+ f->dump_string("compression_type", compression_type);
+ f->dump_unsigned("orig_size", orig_size);
+ if (compressor_message) {
+ f->dump_int("compressor_message", *compressor_message);
+ }
+ ::encode_json("blocks", blocks, f);
+}
+