diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_log.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_log.cc')
-rw-r--r-- | src/rgw/rgw_log.cc | 722 |
1 files changed, 722 insertions, 0 deletions
diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc new file mode 100644 index 000000000..de67fcd4b --- /dev/null +++ b/src/rgw/rgw_log.cc @@ -0,0 +1,722 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "common/Clock.h" +#include "common/Timer.h" +#include "common/utf8.h" +#include "common/OutputDataSocket.h" +#include "common/Formatter.h" + +#include "rgw_bucket.h" +#include "rgw_log.h" +#include "rgw_acl.h" +#include "rgw_client_io.h" +#include "rgw_rest.h" +#include "rgw_zone.h" +#include "rgw_rados.h" + +#include "services/svc_zone.h" + +#include <chrono> +#include <math.h> + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +static void set_param_str(req_state *s, const char *name, string& str) +{ + const char *p = s->info.env->get(name); + if (p) + str = p; +} + +string render_log_object_name(const string& format, + struct tm *dt, const string& bucket_id, + const string& bucket_name) +{ + string o; + for (unsigned i=0; i<format.size(); i++) { + if (format[i] == '%' && i+1 < format.size()) { + i++; + char buf[32]; + switch (format[i]) { + case '%': + strcpy(buf, "%"); + break; + case 'Y': + sprintf(buf, "%.4d", dt->tm_year + 1900); + break; + case 'y': + sprintf(buf, "%.2d", dt->tm_year % 100); + break; + case 'm': + sprintf(buf, "%.2d", dt->tm_mon + 1); + break; + case 'd': + sprintf(buf, "%.2d", dt->tm_mday); + break; + case 'H': + sprintf(buf, "%.2d", dt->tm_hour); + break; + case 'I': + sprintf(buf, "%.2d", (dt->tm_hour % 12) + 1); + break; + case 'k': + sprintf(buf, "%d", dt->tm_hour); + break; + case 'l': + sprintf(buf, "%d", (dt->tm_hour % 12) + 1); + break; + case 'M': + sprintf(buf, "%.2d", dt->tm_min); + break; + + case 'i': + o += bucket_id; + continue; + case 'n': + o += bucket_name; + continue; + default: + // unknown code + sprintf(buf, "%%%c", format[i]); + break; + } + o += buf; + continue; + } + o += format[i]; + } + return o; +} + +/* usage logger */ +class UsageLogger : public DoutPrefixProvider { + CephContext *cct; + rgw::sal::Driver* driver; + map<rgw_user_bucket, RGWUsageBatch> usage_map; + ceph::mutex lock = ceph::make_mutex("UsageLogger"); + int32_t num_entries; + ceph::mutex timer_lock = ceph::make_mutex("UsageLogger::timer_lock"); + SafeTimer timer; + utime_t round_timestamp; + + class C_UsageLogTimeout : public Context { + UsageLogger *logger; + public: + explicit C_UsageLogTimeout(UsageLogger *_l) : logger(_l) {} + void finish(int r) override { + logger->flush(); + logger->set_timer(); + } + }; + + void set_timer() { + timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(this)); + } +public: + + UsageLogger(CephContext *_cct, rgw::sal::Driver* _driver) : cct(_cct), driver(_driver), num_entries(0), timer(cct, timer_lock) { + timer.init(); + std::lock_guard l{timer_lock}; + set_timer(); + utime_t ts = ceph_clock_now(); + recalc_round_timestamp(ts); + } + + ~UsageLogger() { + std::lock_guard l{timer_lock}; + flush(); + timer.cancel_all_events(); + timer.shutdown(); + } + + void recalc_round_timestamp(utime_t& ts) { + round_timestamp = ts.round_to_hour(); + } + + void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) { + lock.lock(); + if (timestamp.sec() > round_timestamp + 3600) + recalc_round_timestamp(timestamp); + entry.epoch = round_timestamp.sec(); + bool account; + string u = user.to_str(); + rgw_user_bucket ub(u, entry.bucket); + real_time rt = round_timestamp.to_real_time(); + usage_map[ub].insert(rt, entry, &account); + if (account) + num_entries++; + bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold); + lock.unlock(); + if (need_flush) { + std::lock_guard l{timer_lock}; + flush(); + } + } + + void insert(utime_t& timestamp, rgw_usage_log_entry& entry) { + if (entry.payer.empty()) { + insert_user(timestamp, entry.owner, entry); + } else { + insert_user(timestamp, entry.payer, entry); + } + } + + void flush() { + map<rgw_user_bucket, RGWUsageBatch> old_map; + lock.lock(); + old_map.swap(usage_map); + num_entries = 0; + lock.unlock(); + + driver->log_usage(this, old_map); + } + + CephContext *get_cct() const override { return cct; } + unsigned get_subsys() const override { return dout_subsys; } + std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw UsageLogger: "; } +}; + +static UsageLogger *usage_logger = NULL; + +void rgw_log_usage_init(CephContext *cct, rgw::sal::Driver* driver) +{ + usage_logger = new UsageLogger(cct, driver); +} + +void rgw_log_usage_finalize() +{ + delete usage_logger; + usage_logger = NULL; +} + +static void log_usage(req_state *s, const string& op_name) +{ + if (s->system_request) /* don't log system user operations */ + return; + + if (!usage_logger) + return; + + rgw_user user; + rgw_user payer; + string bucket_name; + + bucket_name = s->bucket_name; + + if (!bucket_name.empty()) { + bucket_name = s->bucket_name; + user = s->bucket_owner.get_id(); + if (!rgw::sal::Bucket::empty(s->bucket.get()) && + s->bucket->get_info().requester_pays) { + payer = s->user->get_id(); + } + } else { + user = s->user->get_id(); + } + + bool error = s->err.is_err(); + if (error && s->err.http_ret == 404) { + bucket_name = "-"; /* bucket not found, use the invalid '-' as bucket name */ + } + + string u = user.to_str(); + string p = payer.to_str(); + rgw_usage_log_entry entry(u, p, bucket_name); + + uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent(); + uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received(); + + rgw_usage_data data(bytes_sent, bytes_received); + + data.ops = 1; + if (!s->is_err()) + data.successful_ops = 1; + + ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name + << " tenant=" << s->bucket_tenant + << ", bytes_sent=" << bytes_sent << ", bytes_received=" + << bytes_received << ", success=" << data.successful_ops << dendl; + + entry.add(op_name, data); + + utime_t ts = ceph_clock_now(); + + usage_logger->insert(ts, entry); +} + +void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter) +{ + formatter->open_object_section("log_entry"); + formatter->dump_string("bucket", entry.bucket); + { + auto t = utime_t{entry.time}; + t.gmtime(formatter->dump_stream("time")); // UTC + t.localtime(formatter->dump_stream("time_local")); + } + formatter->dump_string("remote_addr", entry.remote_addr); + string obj_owner = entry.object_owner.to_str(); + if (obj_owner.length()) + formatter->dump_string("object_owner", obj_owner); + formatter->dump_string("user", entry.user); + formatter->dump_string("operation", entry.op); + formatter->dump_string("uri", entry.uri); + formatter->dump_string("http_status", entry.http_status); + formatter->dump_string("error_code", entry.error_code); + formatter->dump_int("bytes_sent", entry.bytes_sent); + formatter->dump_int("bytes_received", entry.bytes_received); + formatter->dump_int("object_size", entry.obj_size); + { + using namespace std::chrono; + uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count(); + formatter->dump_int("total_time", total_time); + } + formatter->dump_string("user_agent", entry.user_agent); + formatter->dump_string("referrer", entry.referrer); + if (entry.x_headers.size() > 0) { + formatter->open_array_section("http_x_headers"); + for (const auto& iter: entry.x_headers) { + formatter->open_object_section(iter.first.c_str()); + formatter->dump_string(iter.first.c_str(), iter.second); + formatter->close_section(); + } + formatter->close_section(); + } + formatter->dump_string("trans_id", entry.trans_id); + switch(entry.identity_type) { + case TYPE_RGW: + formatter->dump_string("authentication_type","Local"); + break; + case TYPE_LDAP: + formatter->dump_string("authentication_type","LDAP"); + break; + case TYPE_KEYSTONE: + formatter->dump_string("authentication_type","Keystone"); + break; + case TYPE_WEB: + formatter->dump_string("authentication_type","OIDC Provider"); + break; + case TYPE_ROLE: + formatter->dump_string("authentication_type","STS"); + break; + default: + break; + } + if (entry.token_claims.size() > 0) { + if (entry.token_claims[0] == "sts") { + formatter->open_object_section("sts_info"); + for (const auto& iter: entry.token_claims) { + auto pos = iter.find(":"); + if (pos != string::npos) { + formatter->dump_string(iter.substr(0, pos), iter.substr(pos + 1)); + } + } + formatter->close_section(); + } + } + if (!entry.access_key_id.empty()) { + formatter->dump_string("access_key_id", entry.access_key_id); + } + if (!entry.subuser.empty()) { + formatter->dump_string("subuser", entry.subuser); + } + formatter->dump_bool("temp_url", entry.temp_url); + + if (entry.op == "multi_object_delete") { + formatter->open_object_section("op_data"); + formatter->dump_int("num_ok", entry.delete_multi_obj_meta.num_ok); + formatter->dump_int("num_err", entry.delete_multi_obj_meta.num_err); + formatter->open_array_section("objects"); + for (const auto& iter: entry.delete_multi_obj_meta.objects) { + formatter->open_object_section(""); + formatter->dump_string("key", iter.key); + formatter->dump_string("version_id", iter.version_id); + formatter->dump_int("http_status", iter.http_status); + formatter->dump_bool("error", iter.error); + if (iter.error) { + formatter->dump_string("error_message", iter.error_message); + } else { + formatter->dump_bool("delete_marker", iter.delete_marker); + formatter->dump_string("marker_version_id", iter.marker_version_id); + } + formatter->close_section(); + } + formatter->close_section(); + formatter->close_section(); + } + formatter->close_section(); +} + +OpsLogManifold::~OpsLogManifold() +{ + for (const auto &sink : sinks) { + delete sink; + } +} + +void OpsLogManifold::add_sink(OpsLogSink* sink) +{ + sinks.push_back(sink); +} + +int OpsLogManifold::log(req_state* s, struct rgw_log_entry& entry) +{ + int ret = 0; + for (const auto &sink : sinks) { + if (sink->log(s, entry) < 0) { + ret = -1; + } + } + return ret; +} + +OpsLogFile::OpsLogFile(CephContext* cct, std::string& path, uint64_t max_data_size) : + cct(cct), data_size(0), max_data_size(max_data_size), path(path), need_reopen(false) +{ +} + +void OpsLogFile::reopen() { + need_reopen = true; +} + +void OpsLogFile::flush() +{ + { + std::scoped_lock log_lock(mutex); + assert(flush_buffer.empty()); + flush_buffer.swap(log_buffer); + data_size = 0; + } + for (auto bl : flush_buffer) { + int try_num = 0; + while (true) { + if (!file.is_open() || need_reopen) { + need_reopen = false; + file.close(); + file.open(path, std::ofstream::app); + } + bl.write_stream(file); + if (!file) { + ldpp_dout(this, 0) << "ERROR: failed to log RGW ops log file entry" << dendl; + file.clear(); + if (stopped) { + break; + } + int sleep_time_secs = std::min((int) pow(2, try_num), 60); + std::this_thread::sleep_for(std::chrono::seconds(sleep_time_secs)); + try_num++; + } else { + break; + } + } + } + flush_buffer.clear(); + file << std::endl; +} + +void* OpsLogFile::entry() { + std::unique_lock lock(mutex); + while (!stopped) { + if (!log_buffer.empty()) { + lock.unlock(); + flush(); + lock.lock(); + continue; + } + cond.wait(lock); + } + lock.unlock(); + flush(); + return NULL; +} + +void OpsLogFile::start() { + stopped = false; + create("ops_log_file"); +} + +void OpsLogFile::stop() { + { + std::unique_lock lock(mutex); + cond.notify_one(); + stopped = true; + } + join(); +} + +OpsLogFile::~OpsLogFile() +{ + if (!stopped) { + stop(); + } + file.close(); +} + +int OpsLogFile::log_json(req_state* s, bufferlist& bl) +{ + std::unique_lock lock(mutex); + if (data_size + bl.length() >= max_data_size) { + ldout(s->cct, 0) << "ERROR: RGW ops log file buffer too full, dropping log for txn: " << s->trans_id << dendl; + return -1; + } + log_buffer.push_back(bl); + data_size += bl.length(); + cond.notify_all(); + return 0; +} + +unsigned OpsLogFile::get_subsys() const { + return dout_subsys; +} + +JsonOpsLogSink::JsonOpsLogSink() { + formatter = new JSONFormatter; +} + +JsonOpsLogSink::~JsonOpsLogSink() { + delete formatter; +} + +void JsonOpsLogSink::formatter_to_bl(bufferlist& bl) +{ + stringstream ss; + formatter->flush(ss); + const string& s = ss.str(); + bl.append(s); +} + +int JsonOpsLogSink::log(req_state* s, struct rgw_log_entry& entry) +{ + bufferlist bl; + + lock.lock(); + rgw_format_ops_log_entry(entry, formatter); + formatter_to_bl(bl); + lock.unlock(); + + return log_json(s, bl); +} + +void OpsLogSocket::init_connection(bufferlist& bl) +{ + bl.append("["); +} + +OpsLogSocket::OpsLogSocket(CephContext *cct, uint64_t _backlog) : OutputDataSocket(cct, _backlog) +{ + delim.append(",\n"); +} + +int OpsLogSocket::log_json(req_state* s, bufferlist& bl) +{ + append_output(bl); + return 0; +} + +OpsLogRados::OpsLogRados(rgw::sal::Driver* const& driver): driver(driver) +{ +} + +int OpsLogRados::log(req_state* s, struct rgw_log_entry& entry) +{ + if (!s->cct->_conf->rgw_ops_log_rados) { + return 0; + } + bufferlist bl; + encode(entry, bl); + + struct tm bdt; + time_t t = req_state::Clock::to_time_t(entry.time); + if (s->cct->_conf->rgw_log_object_name_utc) + gmtime_r(&t, &bdt); + else + localtime_r(&t, &bdt); + string oid = render_log_object_name(s->cct->_conf->rgw_log_object_name, &bdt, + entry.bucket_id, entry.bucket); + if (driver->log_op(s, oid, bl) < 0) { + ldpp_dout(s, 0) << "ERROR: failed to log RADOS RGW ops log entry for txn: " << s->trans_id << dendl; + return -1; + } + return 0; +} + +int rgw_log_op(RGWREST* const rest, req_state *s, const RGWOp* op, OpsLogSink *olog) +{ + struct rgw_log_entry entry; + string bucket_id; + string op_name = (op ? op->name() : "unknown"); + + if (s->enable_usage_log) + log_usage(s, op_name); + + if (!s->enable_ops_log) + return 0; + + if (s->bucket_name.empty()) { + /* this case is needed for, e.g., list_buckets */ + } else { + if (s->err.ret == -ERR_NO_SUCH_BUCKET || + rgw::sal::Bucket::empty(s->bucket.get())) { + if (!s->cct->_conf->rgw_log_nonexistent_bucket) { + ldout(s->cct, 5) << "bucket " << s->bucket_name << " doesn't exist, not logging" << dendl; + return 0; + } + bucket_id = ""; + } else { + bucket_id = s->bucket->get_bucket_id(); + } + entry.bucket = rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name); + + if (check_utf8(entry.bucket.c_str(), entry.bucket.size()) != 0) { + ldpp_dout(s, 5) << "not logging op on bucket with non-utf8 name" << dendl; + return 0; + } + + if (!rgw::sal::Object::empty(s->object.get())) { + entry.obj = s->object->get_key(); + } else { + entry.obj = rgw_obj_key("-"); + } + + entry.obj_size = s->obj_size; + } /* !bucket empty */ + + if (s->cct->_conf->rgw_remote_addr_param.length()) + set_param_str(s, s->cct->_conf->rgw_remote_addr_param.c_str(), + entry.remote_addr); + else + set_param_str(s, "REMOTE_ADDR", entry.remote_addr); + set_param_str(s, "HTTP_USER_AGENT", entry.user_agent); + // legacy apps are still using misspelling referer, such as curl -e option + if (s->info.env->exists("HTTP_REFERRER")) + set_param_str(s, "HTTP_REFERRER", entry.referrer); + else + set_param_str(s, "HTTP_REFERER", entry.referrer); + + std::string uri; + if (s->info.env->exists("REQUEST_METHOD")) { + uri.append(s->info.env->get("REQUEST_METHOD")); + uri.append(" "); + } + + if (s->info.env->exists("REQUEST_URI")) { + uri.append(s->info.env->get("REQUEST_URI")); + } + + /* Formerly, we appended QUERY_STRING to uri, but in RGW, QUERY_STRING is a + * substring of REQUEST_URI--appending qs to uri here duplicates qs to the + * ops log */ + + if (s->info.env->exists("HTTP_VERSION")) { + uri.append(" "); + uri.append("HTTP/"); + uri.append(s->info.env->get("HTTP_VERSION")); + } + + entry.uri = std::move(uri); + + entry.op = op_name; + if (op) { + op->write_ops_log_entry(entry); + } + + if (s->auth.identity) { + entry.identity_type = s->auth.identity->get_identity_type(); + s->auth.identity->write_ops_log_entry(entry); + } else { + entry.identity_type = TYPE_NONE; + } + + if (! s->token_claims.empty()) { + entry.token_claims = std::move(s->token_claims); + } + + /* custom header logging */ + if (rest) { + if (rest->log_x_headers()) { + for (const auto& iter : s->info.env->get_map()) { + if (rest->log_x_header(iter.first)) { + entry.x_headers.insert( + rgw_log_entry::headers_map::value_type(iter.first, iter.second)); + } + } + } + } + + entry.user = s->user->get_id().to_str(); + if (s->object_acl) + entry.object_owner = s->object_acl->get_owner().get_id(); + entry.bucket_owner = s->bucket_owner.get_id(); + + uint64_t bytes_sent = ACCOUNTING_IO(s)->get_bytes_sent(); + uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received(); + + entry.time = s->time; + entry.total_time = s->time_elapsed(); + entry.bytes_sent = bytes_sent; + entry.bytes_received = bytes_received; + if (s->err.http_ret) { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", s->err.http_ret); + entry.http_status = buf; + } else { + entry.http_status = "200"; // default + } + entry.error_code = s->err.err_code; + entry.bucket_id = bucket_id; + entry.trans_id = s->trans_id; + if (olog) { + return olog->log(s, entry); + } + return 0; +} + +void rgw_log_entry::generate_test_instances(list<rgw_log_entry*>& o) +{ + rgw_log_entry *e = new rgw_log_entry; + e->object_owner = "object_owner"; + e->bucket_owner = "bucket_owner"; + e->bucket = "bucket"; + e->remote_addr = "1.2.3.4"; + e->user = "user"; + e->obj = rgw_obj_key("obj"); + e->uri = "http://uri/bucket/obj"; + e->http_status = "200"; + e->error_code = "error_code"; + e->bytes_sent = 1024; + e->bytes_received = 512; + e->obj_size = 2048; + e->user_agent = "user_agent"; + e->referrer = "referrer"; + e->bucket_id = "10"; + e->trans_id = "trans_id"; + e->identity_type = TYPE_RGW; + o.push_back(e); + o.push_back(new rgw_log_entry); +} + +void rgw_log_entry::dump(Formatter *f) const +{ + f->dump_string("object_owner", object_owner.to_str()); + f->dump_string("bucket_owner", bucket_owner.to_str()); + f->dump_string("bucket", bucket); + f->dump_stream("time") << time; + f->dump_string("remote_addr", remote_addr); + f->dump_string("user", user); + f->dump_stream("obj") << obj; + f->dump_string("op", op); + f->dump_string("uri", uri); + f->dump_string("http_status", http_status); + f->dump_string("error_code", error_code); + f->dump_unsigned("bytes_sent", bytes_sent); + f->dump_unsigned("bytes_received", bytes_received); + f->dump_unsigned("obj_size", obj_size); + f->dump_stream("total_time") << total_time; + f->dump_string("user_agent", user_agent); + f->dump_string("referrer", referrer); + f->dump_string("bucket_id", bucket_id); + f->dump_string("trans_id", trans_id); + f->dump_unsigned("identity_type", identity_type); +} |