summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_tools.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_tools.cc')
-rw-r--r--src/rgw/rgw_tools.cc596
1 files changed, 596 insertions, 0 deletions
diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc
new file mode 100644
index 000000000..5a73d7e36
--- /dev/null
+++ b/src/rgw/rgw_tools.cc
@@ -0,0 +1,596 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <errno.h>
+
+#include "common/errno.h"
+#include "common/safe_io.h"
+#include "librados/librados_asio.h"
+#include "common/async/yield_context.h"
+
+#include "include/types.h"
+#include "include/stringify.h"
+
+#include "librados/AioCompletionImpl.h"
+
+#include "rgw_common.h"
+#include "rgw_tools.h"
+#include "rgw_acl_s3.h"
+#include "rgw_op.h"
+#include "rgw_putobj_processor.h"
+#include "rgw_aio_throttle.h"
+#include "rgw_compression.h"
+#include "rgw_zone.h"
+#include "rgw_sal_rados.h"
+#include "osd/osd_types.h"
+
+#include "services/svc_sys_obj.h"
+#include "services/svc_zone.h"
+#include "services/svc_zone_utils.h"
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
+#define READ_CHUNK_LEN (512 * 1024)
+
+static std::map<std::string, std::string>* ext_mime_map;
+
+int rgw_init_ioctx(const DoutPrefixProvider *dpp,
+ librados::Rados *rados, const rgw_pool& pool,
+ librados::IoCtx& ioctx, bool create,
+ bool mostly_omap)
+{
+ int r = rados->ioctx_create(pool.name.c_str(), ioctx);
+ if (r == -ENOENT && create) {
+ r = rados->pool_create(pool.name.c_str());
+ if (r == -ERANGE) {
+ ldpp_dout(dpp, 0)
+ << __func__
+ << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
+ << " (this can be due to a pool or placement group misconfiguration, e.g."
+ << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
+ << dendl;
+ }
+ if (r < 0 && r != -EEXIST) {
+ return r;
+ }
+
+ r = rados->ioctx_create(pool.name.c_str(), ioctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
+ if (r < 0 && r != -EOPNOTSUPP) {
+ return r;
+ }
+
+ if (mostly_omap) {
+ // set pg_autoscale_bias
+ bufferlist inbl;
+ float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
+ int r = rados->mon_command(
+ "{\"prefix\": \"osd pool set\", \"pool\": \"" +
+ pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
+ stringify(bias) + "\"}",
+ inbl, NULL, NULL);
+ if (r < 0) {
+ ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_autoscale_bias on "
+ << pool.name << dendl;
+ }
+ // set recovery_priority
+ int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
+ r = rados->mon_command(
+ "{\"prefix\": \"osd pool set\", \"pool\": \"" +
+ pool.name + "\", \"var\": \"recovery_priority\": \"" +
+ stringify(p) + "\"}",
+ inbl, NULL, NULL);
+ if (r < 0) {
+ ldpp_dout(dpp, 10) << __func__ << " warning: failed to set recovery_priority on "
+ << pool.name << dendl;
+ }
+ }
+ } else if (r < 0) {
+ return r;
+ }
+ if (!pool.ns.empty()) {
+ ioctx.set_namespace(pool.ns);
+ }
+ return 0;
+}
+
+void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
+{
+ uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+ char buf[16];
+ if (shard_id) {
+ *shard_id = val % max_shards;
+ }
+ snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+ name = prefix + buf;
+}
+
+void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
+{
+ uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+ val ^= ceph_str_hash_linux(section.c_str(), section.size());
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+ name = prefix + buf;
+}
+
+void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
+{
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%u", shard_id);
+ name = prefix + buf;
+}
+
+int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
+ const string& str, uint32_t *perm)
+{
+ list<string> strs;
+ get_str_list(str, strs);
+ list<string>::iterator iter;
+ uint32_t v = 0;
+ for (iter = strs.begin(); iter != strs.end(); ++iter) {
+ string& s = *iter;
+ for (int i = 0; mapping[i].type_name; i++) {
+ if (s.compare(mapping[i].type_name) == 0)
+ v |= mapping[i].flag;
+ }
+ }
+
+ *perm = v;
+ return 0;
+}
+
+int rgw_put_system_obj(const DoutPrefixProvider *dpp,
+ RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+ RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
+{
+ map<string,bufferlist> no_attrs;
+ if (!pattrs) {
+ pattrs = &no_attrs;
+ }
+
+ rgw_raw_obj obj(pool, oid);
+
+ auto sysobj = obj_ctx.get_obj(obj);
+ int ret = sysobj.wop()
+ .set_objv_tracker(objv_tracker)
+ .set_exclusive(exclusive)
+ .set_mtime(set_mtime)
+ .set_attrs(*pattrs)
+ .write(dpp, data, y);
+
+ return ret;
+}
+
+int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
+ RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, const DoutPrefixProvider *dpp, map<string, bufferlist> *pattrs,
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version, bool raw_attrs)
+{
+ bufferlist::iterator iter;
+ int request_len = READ_CHUNK_LEN;
+ rgw_raw_obj obj(pool, key);
+
+ obj_version original_readv;
+ if (objv_tracker && !objv_tracker->read_version.empty()) {
+ original_readv = objv_tracker->read_version;
+ }
+
+ do {
+ auto sysobj = obj_ctx.get_obj(obj);
+ auto rop = sysobj.rop();
+
+ int ret = rop.set_attrs(pattrs)
+ .set_last_mod(pmtime)
+ .set_objv_tracker(objv_tracker)
+ .set_raw_attrs(raw_attrs)
+ .stat(y, dpp);
+ if (ret < 0)
+ return ret;
+
+ ret = rop.set_cache_info(cache_info)
+ .set_refresh_version(refresh_version)
+ .read(dpp, &bl, y);
+ if (ret == -ECANCELED) {
+ /* raced, restart */
+ if (!original_readv.empty()) {
+ /* we were asked to read a specific obj_version, failed */
+ return ret;
+ }
+ if (objv_tracker) {
+ objv_tracker->read_version.clear();
+ }
+ sysobj.invalidate();
+ continue;
+ }
+ if (ret < 0)
+ return ret;
+
+ if (ret < request_len)
+ break;
+ bl.clear();
+ request_len *= 2;
+ } while (true);
+
+ return 0;
+}
+
+int rgw_delete_system_obj(const DoutPrefixProvider *dpp,
+ RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
+ RGWObjVersionTracker *objv_tracker, optional_yield y)
+{
+ auto obj_ctx = sysobj_svc->init_obj_ctx();
+ auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
+ rgw_raw_obj obj(pool, oid);
+ return sysobj.wop()
+ .set_objv_tracker(objv_tracker)
+ .remove(dpp, y);
+}
+
+thread_local bool is_asio_thread = false;
+
+int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
+ librados::ObjectReadOperation *op, bufferlist* pbl,
+ optional_yield y, int flags)
+{
+ // given a yield_context, call async_operate() to yield the coroutine instead
+ // of blocking
+ if (y) {
+ auto& context = y.get_io_context();
+ auto& yield = y.get_yield_context();
+ boost::system::error_code ec;
+ auto bl = librados::async_operate(
+ context, ioctx, oid, op, flags, yield[ec]);
+ if (pbl) {
+ *pbl = std::move(bl);
+ }
+ return -ec.value();
+ }
+ // work on asio threads should be asynchronous, so warn when they block
+ if (is_asio_thread) {
+ ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
+ }
+ return ioctx.operate(oid, op, nullptr, flags);
+}
+
+int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
+ librados::ObjectWriteOperation *op, optional_yield y,
+ int flags)
+{
+ if (y) {
+ auto& context = y.get_io_context();
+ auto& yield = y.get_yield_context();
+ boost::system::error_code ec;
+ librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
+ return -ec.value();
+ }
+ if (is_asio_thread) {
+ ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
+ }
+ return ioctx.operate(oid, op, flags);
+}
+
+int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
+ bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
+ optional_yield y)
+{
+ if (y) {
+ auto& context = y.get_io_context();
+ auto& yield = y.get_yield_context();
+ boost::system::error_code ec;
+ auto reply = librados::async_notify(context, ioctx, oid,
+ bl, timeout_ms, yield[ec]);
+ if (pbl) {
+ *pbl = std::move(reply);
+ }
+ return -ec.value();
+ }
+ if (is_asio_thread) {
+ ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
+ }
+ return ioctx.notify2(oid, bl, timeout_ms, pbl);
+}
+
+void parse_mime_map_line(const char *start, const char *end)
+{
+ char line[end - start + 1];
+ strncpy(line, start, end - start);
+ line[end - start] = '\0';
+ char *l = line;
+#define DELIMS " \t\n\r"
+
+ while (isspace(*l))
+ l++;
+
+ char *mime = strsep(&l, DELIMS);
+ if (!mime)
+ return;
+
+ char *ext;
+ do {
+ ext = strsep(&l, DELIMS);
+ if (ext && *ext) {
+ (*ext_mime_map)[ext] = mime;
+ }
+ } while (ext);
+}
+
+
+void parse_mime_map(const char *buf)
+{
+ const char *start = buf, *end = buf;
+ while (*end) {
+ while (*end && *end != '\n') {
+ end++;
+ }
+ parse_mime_map_line(start, end);
+ end++;
+ start = end;
+ }
+}
+
+static int ext_mime_map_init(CephContext *cct, const char *ext_map)
+{
+ int fd = open(ext_map, O_RDONLY);
+ char *buf = NULL;
+ int ret;
+ if (fd < 0) {
+ ret = -errno;
+ ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
+ << " : " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ struct stat st;
+ ret = fstat(fd, &st);
+ if (ret < 0) {
+ ret = -errno;
+ ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
+ << " : " << cpp_strerror(-ret) << dendl;
+ goto done;
+ }
+
+ buf = (char *)malloc(st.st_size + 1);
+ if (!buf) {
+ ret = -ENOMEM;
+ ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
+ goto done;
+ }
+
+ ret = safe_read(fd, buf, st.st_size + 1);
+ if (ret != st.st_size) {
+ // huh? file size has changed?
+ ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
+ free(buf);
+ close(fd);
+ return ext_mime_map_init(cct, ext_map);
+ }
+ buf[st.st_size] = '\0';
+
+ parse_mime_map(buf);
+ ret = 0;
+done:
+ free(buf);
+ close(fd);
+ return ret;
+}
+
+const char *rgw_find_mime_by_ext(string& ext)
+{
+ map<string, string>::iterator iter = ext_mime_map->find(ext);
+ if (iter == ext_mime_map->end())
+ return NULL;
+
+ return iter->second.c_str();
+}
+
+void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
+ map<string, bufferlist> *attrset)
+{
+ attrset->clear();
+ map<string, bufferlist>::iterator iter;
+ for (iter = unfiltered_attrset.lower_bound(check_prefix);
+ iter != unfiltered_attrset.end(); ++iter) {
+ if (!boost::algorithm::starts_with(iter->first, check_prefix))
+ break;
+ (*attrset)[iter->first] = iter->second;
+ }
+}
+
+RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
+{
+ sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
+}
+
+
+int RGWDataAccess::Bucket::finish_init()
+{
+ auto iter = attrs.find(RGW_ATTR_ACL);
+ if (iter == attrs.end()) {
+ return 0;
+ }
+
+ bufferlist::const_iterator bliter = iter->second.begin();
+ try {
+ policy.decode(bliter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
+{
+ int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
+ tenant, name,
+ bucket_info,
+ &mtime,
+ y,
+ dpp,
+ &attrs);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return finish_init();
+}
+
+int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
+ const map<string, bufferlist>& _attrs)
+{
+ bucket_info = _bucket_info;
+ attrs = _attrs;
+
+ return finish_init();
+}
+
+int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
+ ObjectRef *obj) {
+ obj->reset(new Object(sd, shared_from_this(), key));
+ return 0;
+}
+
+int RGWDataAccess::Object::put(bufferlist& data,
+ map<string, bufferlist>& attrs,
+ const DoutPrefixProvider *dpp,
+ optional_yield y)
+{
+ rgw::sal::RGWRadosStore *store = sd->store;
+ CephContext *cct = store->ctx();
+
+ string tag;
+ append_rand_alpha(cct, tag, tag, 32);
+
+ RGWBucketInfo& bucket_info = bucket->bucket_info;
+
+ rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+
+ RGWObjectCtx obj_ctx(store);
+ std::unique_ptr<rgw::sal::RGWBucket> b;
+ store->get_bucket(NULL, bucket_info, &b);
+ std::unique_ptr<rgw::sal::RGWObject> obj = b->get_object(key);
+
+ auto& owner = bucket->policy.get_owner();
+
+ string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
+
+ using namespace rgw::putobj;
+ AtomicObjectProcessor processor(&aio, store, b.get(), nullptr,
+ owner.get_id(), obj_ctx, std::move(obj), olh_epoch,
+ req_id, dpp, y);
+
+ int ret = processor.prepare(y);
+ if (ret < 0)
+ return ret;
+
+ DataProcessor *filter = &processor;
+
+ CompressorRef plugin;
+ boost::optional<RGWPutObj_Compress> compressor;
+
+ const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
+ if (compression_type != "none") {
+ plugin = Compressor::create(store->ctx(), compression_type);
+ if (!plugin) {
+ ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(store->ctx(), plugin, filter);
+ filter = &*compressor;
+ }
+ }
+
+ off_t ofs = 0;
+ auto obj_size = data.length();
+
+ RGWMD5Etag etag_calc;
+
+ do {
+ size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
+
+ bufferlist bl;
+
+ data.splice(0, read_len, &bl);
+ etag_calc.update(bl);
+
+ ret = filter->process(std::move(bl), ofs);
+ if (ret < 0)
+ return ret;
+
+ ofs += read_len;
+ } while (data.length() > 0);
+
+ ret = filter->process({}, ofs);
+ if (ret < 0) {
+ return ret;
+ }
+ bool has_etag_attr = false;
+ auto iter = attrs.find(RGW_ATTR_ETAG);
+ if (iter != attrs.end()) {
+ bufferlist& bl = iter->second;
+ etag = bl.to_str();
+ has_etag_attr = true;
+ }
+
+ if (!aclbl) {
+ RGWAccessControlPolicy_S3 policy(cct);
+
+ policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
+
+ policy.encode(aclbl.emplace());
+ }
+
+ if (etag.empty()) {
+ etag_calc.finish(&etag);
+ }
+
+ if (!has_etag_attr) {
+ bufferlist etagbl;
+ etagbl.append(etag);
+ attrs[RGW_ATTR_ETAG] = etagbl;
+ }
+ attrs[RGW_ATTR_ACL] = *aclbl;
+
+ string *puser_data = nullptr;
+ if (user_data) {
+ puser_data = &(*user_data);
+ }
+
+ return processor.complete(obj_size, etag,
+ &mtime, mtime,
+ attrs, delete_at,
+ nullptr, nullptr,
+ puser_data,
+ nullptr, nullptr, y);
+}
+
+void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
+{
+ policy.encode(aclbl.emplace());
+}
+
+int rgw_tools_init(CephContext *cct)
+{
+ ext_mime_map = new std::map<std::string, std::string>;
+ ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
+ // ignore errors; missing mime.types is not fatal
+ return 0;
+}
+
+void rgw_tools_cleanup()
+{
+ delete ext_mime_map;
+ ext_mime_map = nullptr;
+}
+
+void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
+ auto pc = c->pc;
+ librados::CB_AioCompleteAndSafe cb(pc);
+ cb(r);
+}