summaryrefslogtreecommitdiffstats
path: root/src/common/TrackedOp.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/common/TrackedOp.h
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/common/TrackedOp.h')
-rw-r--r--src/common/TrackedOp.h393
1 files changed, 393 insertions, 0 deletions
diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h
new file mode 100644
index 000000000..e70cba117
--- /dev/null
+++ b/src/common/TrackedOp.h
@@ -0,0 +1,393 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef TRACKEDREQUEST_H_
+#define TRACKEDREQUEST_H_
+
+#include <atomic>
+#include "common/ceph_mutex.h"
+#include "common/histogram.h"
+#include "common/Thread.h"
+#include "common/Clock.h"
+#include "include/spinlock.h"
+#include "msg/Message.h"
+
+#define OPTRACKER_PREALLOC_EVENTS 20
+
+class TrackedOp;
+class OpHistory;
+
+typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
+
+class OpHistoryServiceThread : public Thread
+{
+private:
+ std::list<std::pair<utime_t, TrackedOpRef>> _external_queue;
+ OpHistory* _ophistory;
+ mutable ceph::spinlock queue_spinlock;
+ bool _break_thread;
+public:
+ explicit OpHistoryServiceThread(OpHistory* parent)
+ : _ophistory(parent),
+ _break_thread(false) { }
+
+ void break_thread();
+ void insert_op(const utime_t& now, TrackedOpRef op) {
+ queue_spinlock.lock();
+ _external_queue.emplace_back(now, op);
+ queue_spinlock.unlock();
+ }
+
+ void *entry() override;
+};
+
+
+class OpHistory {
+ std::set<std::pair<utime_t, TrackedOpRef> > arrived;
+ std::set<std::pair<double, TrackedOpRef> > duration;
+ std::set<std::pair<utime_t, TrackedOpRef> > slow_op;
+ ceph::mutex ops_history_lock = ceph::make_mutex("OpHistory::ops_history_lock");
+ void cleanup(utime_t now);
+ std::atomic_size_t history_size{0};
+ std::atomic_uint32_t history_duration{0};
+ std::atomic_size_t history_slow_op_size{0};
+ std::atomic_uint32_t history_slow_op_threshold{0};
+ std::atomic_bool shutdown{false};
+ OpHistoryServiceThread opsvc;
+ friend class OpHistoryServiceThread;
+
+public:
+ OpHistory() : opsvc(this) {
+ opsvc.create("OpHistorySvc");
+ }
+ ~OpHistory() {
+ ceph_assert(arrived.empty());
+ ceph_assert(duration.empty());
+ ceph_assert(slow_op.empty());
+ }
+ void insert(const utime_t& now, TrackedOpRef op)
+ {
+ if (shutdown)
+ return;
+
+ opsvc.insert_op(now, op);
+ }
+
+ void _insert_delayed(const utime_t& now, TrackedOpRef op);
+ void dump_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""}, bool by_duration=false);
+ void dump_slow_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""});
+ void on_shutdown();
+ void set_size_and_duration(size_t new_size, uint32_t new_duration) {
+ history_size = new_size;
+ history_duration = new_duration;
+ }
+ void set_slow_op_size_and_threshold(size_t new_size, uint32_t new_threshold) {
+ history_slow_op_size = new_size;
+ history_slow_op_threshold = new_threshold;
+ }
+};
+
+struct ShardedTrackingData;
+class OpTracker {
+ friend class OpHistory;
+ std::atomic<int64_t> seq = { 0 };
+ std::vector<ShardedTrackingData*> sharded_in_flight_list;
+ OpHistory history;
+ uint32_t num_optracker_shards;
+ float complaint_time;
+ int log_threshold;
+ std::atomic<bool> tracking_enabled;
+ ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock");
+
+public:
+ CephContext *cct;
+ OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
+
+ void set_complaint_and_threshold(float time, int threshold) {
+ complaint_time = time;
+ log_threshold = threshold;
+ }
+ void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
+ history.set_size_and_duration(new_size, new_duration);
+ }
+ void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
+ history.set_slow_op_size_and_threshold(new_size, new_threshold);
+ }
+ bool is_tracking() const {
+ return tracking_enabled;
+ }
+ void set_tracking(bool enable) {
+ tracking_enabled = enable;
+ }
+ bool dump_ops_in_flight(ceph::Formatter *f, bool print_only_blocked = false, std::set<std::string> filters = {""});
+ bool dump_historic_ops(ceph::Formatter *f, bool by_duration = false, std::set<std::string> filters = {""});
+ bool dump_historic_slow_ops(ceph::Formatter *f, std::set<std::string> filters = {""});
+ bool register_inflight_op(TrackedOp *i);
+ void unregister_inflight_op(TrackedOp *i);
+ void record_history_op(TrackedOpRef&& i);
+
+ void get_age_ms_histogram(pow2_hist_t *h);
+
+ /**
+ * walk through ops in flight
+ *
+ * @param oldest_sec the amount of time since the oldest op was initiated
+ * @param check a function consuming tracked ops, the function returns
+ * false if it don't want to be fed with more ops
+ * @return True if there are any Ops to warn on, false otherwise
+ */
+ bool visit_ops_in_flight(utime_t* oldest_secs,
+ std::function<bool(TrackedOp&)>&& visit);
+ /**
+ * walk through slow ops in flight
+ *
+ * @param[out] oldest_sec the amount of time since the oldest op was initiated
+ * @param[out] num_slow_ops total number of slow ops
+ * @param[out] num_warned_ops total number of warned ops
+ * @param on_warn a function consuming tracked ops, the function returns
+ * false if it don't want to be fed with more ops
+ * @return True if there are any Ops to warn on, false otherwise
+ */
+ bool with_slow_ops_in_flight(utime_t* oldest_secs,
+ int* num_slow_ops,
+ int* num_warned_ops,
+ std::function<void(TrackedOp&)>&& on_warn);
+ /**
+ * Look for Ops which are too old, and insert warning
+ * strings for each Op that is too old.
+ *
+ * @param summary[out] a std::string summarizing slow Ops.
+ * @param warning_strings[out] A std::vector<std::string> reference which is filled
+ * with a warning std::string for each old Op.
+ * @param slow[out] total number of slow ops
+ * @return True if there are any Ops to warn on, false otherwise.
+ */
+ bool check_ops_in_flight(std::string* summary,
+ std::vector<std::string> &warning_strings,
+ int* slow = nullptr);
+
+ void on_shutdown() {
+ history.on_shutdown();
+ }
+ ~OpTracker();
+
+ template <typename T, typename U>
+ typename T::Ref create_request(U params)
+ {
+ typename T::Ref retval(new T(params, this));
+ retval->tracking_start();
+ if (is_tracking()) {
+ retval->mark_event("throttled", params->get_throttle_stamp());
+ retval->mark_event("header_read", params->get_recv_stamp());
+ retval->mark_event("all_read", params->get_recv_complete_stamp());
+ retval->mark_event("dispatched", params->get_dispatch_stamp());
+ }
+
+ return retval;
+ }
+};
+
+class TrackedOp : public boost::intrusive::list_base_hook<> {
+private:
+ friend class OpHistory;
+ friend class OpTracker;
+
+ boost::intrusive::list_member_hook<> tracker_item;
+
+public:
+ typedef boost::intrusive::list<
+ TrackedOp,
+ boost::intrusive::member_hook<
+ TrackedOp,
+ boost::intrusive::list_member_hook<>,
+ &TrackedOp::tracker_item> > tracked_op_list_t;
+
+ // for use when clearing lists. e.g.,
+ // ls.clear_and_dispose(TrackedOp::Putter());
+ struct Putter {
+ void operator()(TrackedOp *op) {
+ op->put();
+ }
+ };
+
+protected:
+ OpTracker *tracker; ///< the tracker we are associated with
+ std::atomic_int nref = {0}; ///< ref count
+
+ utime_t initiated_at;
+
+ struct Event {
+ utime_t stamp;
+ std::string str;
+
+ Event(utime_t t, std::string_view s) : stamp(t), str(s) {}
+
+ int compare(const char *s) const {
+ return str.compare(s);
+ }
+
+ const char *c_str() const {
+ return str.c_str();
+ }
+
+ void dump(ceph::Formatter *f) const {
+ f->dump_stream("time") << stamp;
+ f->dump_string("event", str);
+ }
+ };
+
+ std::vector<Event> events; ///< std::list of events and their times
+ mutable ceph::mutex lock = ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
+ uint64_t seq = 0; ///< a unique value std::set by the OpTracker
+
+ uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
+
+ enum {
+ STATE_UNTRACKED = 0,
+ STATE_LIVE,
+ STATE_HISTORY
+ };
+ std::atomic<int> state = {STATE_UNTRACKED};
+
+ mutable std::string desc_str; ///< protected by lock
+ mutable const char *desc = nullptr; ///< readable without lock
+ mutable std::atomic<bool> want_new_desc = {false};
+
+ TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
+ tracker(_tracker),
+ initiated_at(initiated)
+ {
+ events.reserve(OPTRACKER_PREALLOC_EVENTS);
+ }
+
+ /// output any type-specific data you want to get when dump() is called
+ virtual void _dump(ceph::Formatter *f) const {}
+ /// if you want something else to happen when events are marked, implement
+ virtual void _event_marked() {}
+ /// return a unique descriptor of the Op; eg the message it's attached to
+ virtual void _dump_op_descriptor_unlocked(std::ostream& stream) const = 0;
+ /// called when the last non-OpTracker reference is dropped
+ virtual void _unregistered() {}
+
+ virtual bool filter_out(const std::set<std::string>& filters) { return true; }
+
+public:
+ ZTracer::Trace osd_trace;
+ ZTracer::Trace pg_trace;
+ ZTracer::Trace store_trace;
+ ZTracer::Trace journal_trace;
+
+ virtual ~TrackedOp() {}
+
+ void get() {
+ ++nref;
+ }
+ void put() {
+ again:
+ auto nref_snap = nref.load();
+ if (nref_snap == 1) {
+ switch (state.load()) {
+ case STATE_UNTRACKED:
+ _unregistered();
+ delete this;
+ break;
+
+ case STATE_LIVE:
+ mark_event("done");
+ tracker->unregister_inflight_op(this);
+ _unregistered();
+ if (!tracker->is_tracking()) {
+ delete this;
+ } else {
+ state = TrackedOp::STATE_HISTORY;
+ tracker->record_history_op(
+ TrackedOpRef(this, /* add_ref = */ false));
+ }
+ break;
+
+ case STATE_HISTORY:
+ delete this;
+ break;
+
+ default:
+ ceph_abort();
+ }
+ } else if (!nref.compare_exchange_weak(nref_snap, nref_snap - 1)) {
+ goto again;
+ }
+ }
+
+ const char *get_desc() const {
+ if (!desc || want_new_desc.load()) {
+ std::lock_guard l(lock);
+ _gen_desc();
+ }
+ return desc;
+ }
+private:
+ void _gen_desc() const {
+ std::ostringstream ss;
+ _dump_op_descriptor_unlocked(ss);
+ desc_str = ss.str();
+ desc = desc_str.c_str();
+ want_new_desc = false;
+ }
+public:
+ void reset_desc() {
+ want_new_desc = true;
+ }
+
+ const utime_t& get_initiated() const {
+ return initiated_at;
+ }
+
+ double get_duration() const {
+ std::lock_guard l(lock);
+ if (!events.empty() && events.rbegin()->compare("done") == 0)
+ return events.rbegin()->stamp - get_initiated();
+ else
+ return ceph_clock_now() - get_initiated();
+ }
+
+ void mark_event(std::string_view event, utime_t stamp=ceph_clock_now());
+
+ void mark_nowarn() {
+ warn_interval_multiplier = 0;
+ }
+
+ virtual std::string_view state_string() const {
+ std::lock_guard l(lock);
+ return events.empty() ? std::string_view() : std::string_view(events.rbegin()->str);
+ }
+
+ void dump(utime_t now, ceph::Formatter *f) const;
+
+ void tracking_start() {
+ if (tracker->register_inflight_op(this)) {
+ events.emplace_back(initiated_at, "initiated");
+ state = STATE_LIVE;
+ }
+ }
+
+ // ref counting via intrusive_ptr, with special behavior on final
+ // put for historical op tracking
+ friend void intrusive_ptr_add_ref(TrackedOp *o) {
+ o->get();
+ }
+ friend void intrusive_ptr_release(TrackedOp *o) {
+ o->put();
+ }
+};
+
+
+#endif