// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2012 New Dream Network/Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. */ #ifndef TRACKEDREQUEST_H_ #define TRACKEDREQUEST_H_ #include #include "common/ceph_mutex.h" #include "common/histogram.h" #include "common/Thread.h" #include "common/Clock.h" #include "include/spinlock.h" #include "msg/Message.h" #define OPTRACKER_PREALLOC_EVENTS 20 class TrackedOp; class OpHistory; typedef boost::intrusive_ptr TrackedOpRef; class OpHistoryServiceThread : public Thread { private: std::list> _external_queue; OpHistory* _ophistory; mutable ceph::spinlock queue_spinlock; bool _break_thread; public: explicit OpHistoryServiceThread(OpHistory* parent) : _ophistory(parent), _break_thread(false) { } void break_thread(); void insert_op(const utime_t& now, TrackedOpRef op) { queue_spinlock.lock(); _external_queue.emplace_back(now, op); queue_spinlock.unlock(); } void *entry() override; }; class OpHistory { std::set > arrived; std::set > duration; std::set > slow_op; ceph::mutex ops_history_lock = ceph::make_mutex("OpHistory::ops_history_lock"); void cleanup(utime_t now); std::atomic_size_t history_size{0}; std::atomic_uint32_t history_duration{0}; std::atomic_size_t history_slow_op_size{0}; std::atomic_uint32_t history_slow_op_threshold{0}; std::atomic_bool shutdown{false}; OpHistoryServiceThread opsvc; friend class OpHistoryServiceThread; public: OpHistory() : opsvc(this) { opsvc.create("OpHistorySvc"); } ~OpHistory() { ceph_assert(arrived.empty()); ceph_assert(duration.empty()); ceph_assert(slow_op.empty()); } void insert(const utime_t& now, TrackedOpRef op) { if (shutdown) return; opsvc.insert_op(now, op); } void _insert_delayed(const utime_t& now, TrackedOpRef op); void dump_ops(utime_t now, ceph::Formatter *f, std::set filters = {""}, bool by_duration=false); void dump_slow_ops(utime_t now, ceph::Formatter *f, std::set filters = {""}); void on_shutdown(); void set_size_and_duration(size_t new_size, uint32_t new_duration) { history_size = new_size; history_duration = new_duration; } void set_slow_op_size_and_threshold(size_t new_size, uint32_t new_threshold) { history_slow_op_size = new_size; history_slow_op_threshold = new_threshold; } }; struct ShardedTrackingData; class OpTracker { friend class OpHistory; std::atomic seq = { 0 }; std::vector sharded_in_flight_list; OpHistory history; uint32_t num_optracker_shards; float complaint_time; int log_threshold; std::atomic tracking_enabled; ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock"); public: CephContext *cct; OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards); void set_complaint_and_threshold(float time, int threshold) { complaint_time = time; log_threshold = threshold; } void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) { history.set_size_and_duration(new_size, new_duration); } void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) { history.set_slow_op_size_and_threshold(new_size, new_threshold); } bool is_tracking() const { return tracking_enabled; } void set_tracking(bool enable) { tracking_enabled = enable; } bool dump_ops_in_flight(ceph::Formatter *f, bool print_only_blocked = false, std::set filters = {""}); bool dump_historic_ops(ceph::Formatter *f, bool by_duration = false, std::set filters = {""}); bool dump_historic_slow_ops(ceph::Formatter *f, std::set filters = {""}); bool register_inflight_op(TrackedOp *i); void unregister_inflight_op(TrackedOp *i); void record_history_op(TrackedOpRef&& i); void get_age_ms_histogram(pow2_hist_t *h); /** * walk through ops in flight * * @param oldest_sec the amount of time since the oldest op was initiated * @param check a function consuming tracked ops, the function returns * false if it don't want to be fed with more ops * @return True if there are any Ops to warn on, false otherwise */ bool visit_ops_in_flight(utime_t* oldest_secs, std::function&& visit); /** * walk through slow ops in flight * * @param[out] oldest_sec the amount of time since the oldest op was initiated * @param[out] num_slow_ops total number of slow ops * @param[out] num_warned_ops total number of warned ops * @param on_warn a function consuming tracked ops, the function returns * false if it don't want to be fed with more ops * @return True if there are any Ops to warn on, false otherwise */ bool with_slow_ops_in_flight(utime_t* oldest_secs, int* num_slow_ops, int* num_warned_ops, std::function&& on_warn); /** * Look for Ops which are too old, and insert warning * strings for each Op that is too old. * * @param summary[out] a std::string summarizing slow Ops. * @param warning_strings[out] A std::vector reference which is filled * with a warning std::string for each old Op. * @param slow[out] total number of slow ops * @return True if there are any Ops to warn on, false otherwise. */ bool check_ops_in_flight(std::string* summary, std::vector &warning_strings, int* slow = nullptr); void on_shutdown() { history.on_shutdown(); } ~OpTracker(); template typename T::Ref create_request(U params) { typename T::Ref retval(new T(params, this)); retval->tracking_start(); if (is_tracking()) { retval->mark_event("throttled", params->get_throttle_stamp()); retval->mark_event("header_read", params->get_recv_stamp()); retval->mark_event("all_read", params->get_recv_complete_stamp()); retval->mark_event("dispatched", params->get_dispatch_stamp()); } return retval; } }; class TrackedOp : public boost::intrusive::list_base_hook<> { private: friend class OpHistory; friend class OpTracker; boost::intrusive::list_member_hook<> tracker_item; public: typedef boost::intrusive::list< TrackedOp, boost::intrusive::member_hook< TrackedOp, boost::intrusive::list_member_hook<>, &TrackedOp::tracker_item> > tracked_op_list_t; // for use when clearing lists. e.g., // ls.clear_and_dispose(TrackedOp::Putter()); struct Putter { void operator()(TrackedOp *op) { op->put(); } }; protected: OpTracker *tracker; ///< the tracker we are associated with std::atomic_int nref = {0}; ///< ref count utime_t initiated_at; struct Event { utime_t stamp; std::string str; Event(utime_t t, std::string_view s) : stamp(t), str(s) {} int compare(const char *s) const { return str.compare(s); } const char *c_str() const { return str.c_str(); } void dump(ceph::Formatter *f) const { f->dump_stream("time") << stamp; f->dump_string("event", str); } }; std::vector events; ///< std::list of events and their times mutable ceph::mutex lock = ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list uint64_t seq = 0; ///< a unique value std::set by the OpTracker uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning enum { STATE_UNTRACKED = 0, STATE_LIVE, STATE_HISTORY }; std::atomic state = {STATE_UNTRACKED}; mutable std::string desc_str; ///< protected by lock mutable const char *desc = nullptr; ///< readable without lock mutable std::atomic want_new_desc = {false}; TrackedOp(OpTracker *_tracker, const utime_t& initiated) : tracker(_tracker), initiated_at(initiated) { events.reserve(OPTRACKER_PREALLOC_EVENTS); } /// output any type-specific data you want to get when dump() is called virtual void _dump(ceph::Formatter *f) const {} /// if you want something else to happen when events are marked, implement virtual void _event_marked() {} /// return a unique descriptor of the Op; eg the message it's attached to virtual void _dump_op_descriptor_unlocked(std::ostream& stream) const = 0; /// called when the last non-OpTracker reference is dropped virtual void _unregistered() {} virtual bool filter_out(const std::set& filters) { return true; } public: ZTracer::Trace osd_trace; ZTracer::Trace pg_trace; ZTracer::Trace store_trace; ZTracer::Trace journal_trace; virtual ~TrackedOp() {} void get() { ++nref; } void put() { again: auto nref_snap = nref.load(); if (nref_snap == 1) { switch (state.load()) { case STATE_UNTRACKED: _unregistered(); delete this; break; case STATE_LIVE: mark_event("done"); tracker->unregister_inflight_op(this); _unregistered(); if (!tracker->is_tracking()) { delete this; } else { state = TrackedOp::STATE_HISTORY; tracker->record_history_op( TrackedOpRef(this, /* add_ref = */ false)); } break; case STATE_HISTORY: delete this; break; default: ceph_abort(); } } else if (!nref.compare_exchange_weak(nref_snap, nref_snap - 1)) { goto again; } } const char *get_desc() const { if (!desc || want_new_desc.load()) { std::lock_guard l(lock); _gen_desc(); } return desc; } private: void _gen_desc() const { std::ostringstream ss; _dump_op_descriptor_unlocked(ss); desc_str = ss.str(); desc = desc_str.c_str(); want_new_desc = false; } public: void reset_desc() { want_new_desc = true; } const utime_t& get_initiated() const { return initiated_at; } double get_duration() const { std::lock_guard l(lock); if (!events.empty() && events.rbegin()->compare("done") == 0) return events.rbegin()->stamp - get_initiated(); else return ceph_clock_now() - get_initiated(); } void mark_event(std::string_view event, utime_t stamp=ceph_clock_now()); void mark_nowarn() { warn_interval_multiplier = 0; } virtual std::string_view state_string() const { std::lock_guard l(lock); return events.empty() ? std::string_view() : std::string_view(events.rbegin()->str); } void dump(utime_t now, ceph::Formatter *f) const; void tracking_start() { if (tracker->register_inflight_op(this)) { events.emplace_back(initiated_at, "initiated"); state = STATE_LIVE; } } // ref counting via intrusive_ptr, with special behavior on final // put for historical op tracking friend void intrusive_ptr_add_ref(TrackedOp *o) { o->get(); } friend void intrusive_ptr_release(TrackedOp *o) { o->put(); } }; #endif