diff options
Diffstat (limited to 'src/mgr/MgrClient.cc')
-rw-r--r-- | src/mgr/MgrClient.cc | 531 |
1 files changed, 531 insertions, 0 deletions
diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc new file mode 100644 index 00000000..738e9a3b --- /dev/null +++ b/src/mgr/MgrClient.cc @@ -0,0 +1,531 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray <john.spray@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#include "MgrClient.h" + +#include "mgr/MgrContext.h" + +#include "msg/Messenger.h" +#include "messages/MMgrMap.h" +#include "messages/MMgrReport.h" +#include "messages/MMgrOpen.h" +#include "messages/MMgrClose.h" +#include "messages/MMgrConfigure.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" +#include "messages/MPGStats.h" + +#define dout_subsys ceph_subsys_mgrc +#undef dout_prefix +#define dout_prefix *_dout << "mgrc " << __func__ << " " + +MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_) + : Dispatcher(cct_), cct(cct_), msgr(msgr_), + timer(cct_, lock) +{ + ceph_assert(cct != nullptr); +} + +void MgrClient::init() +{ + std::lock_guard l(lock); + + ceph_assert(msgr != nullptr); + + timer.init(); + initialized = true; +} + +void MgrClient::shutdown() +{ + std::lock_guard l(lock); + ldout(cct, 10) << dendl; + + if (connect_retry_callback) { + timer.cancel_event(connect_retry_callback); + connect_retry_callback = nullptr; + } + + // forget about in-flight commands if we are prematurely shut down + // (e.g., by control-C) + command_table.clear(); + if (service_daemon && + session && + session->con && + HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) { + ldout(cct, 10) << "closing mgr session" << dendl; + MMgrClose *m = new MMgrClose(); + m->daemon_name = daemon_name; + m->service_name = service_name; + session->con->send_message(m); + utime_t timeout; + timeout.set_from_double(cct->_conf.get_val<double>( + "mgr_client_service_daemon_unregister_timeout")); + shutdown_cond.WaitInterval(lock, timeout); + } + + timer.shutdown(); + if (session) { + session->con->mark_down(); + session.reset(); + } +} + +bool MgrClient::ms_dispatch(Message *m) +{ + std::lock_guard l(lock); + + switch(m->get_type()) { + case MSG_MGR_MAP: + return handle_mgr_map(static_cast<MMgrMap*>(m)); + case MSG_MGR_CONFIGURE: + return handle_mgr_configure(static_cast<MMgrConfigure*>(m)); + case MSG_MGR_CLOSE: + return handle_mgr_close(static_cast<MMgrClose*>(m)); + case MSG_COMMAND_REPLY: + if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) { + handle_command_reply(static_cast<MCommandReply*>(m)); + return true; + } else { + return false; + } + default: + ldout(cct, 30) << "Not handling " << *m << dendl; + return false; + } +} + +void MgrClient::reconnect() +{ + ceph_assert(lock.is_locked_by_me()); + + if (session) { + ldout(cct, 4) << "Terminating session with " + << session->con->get_peer_addr() << dendl; + session->con->mark_down(); + session.reset(); + stats_period = 0; + if (report_callback != nullptr) { + timer.cancel_event(report_callback); + report_callback = nullptr; + } + } + + if (!map.get_available()) { + ldout(cct, 4) << "No active mgr available yet" << dendl; + return; + } + + if (last_connect_attempt != utime_t()) { + utime_t now = ceph_clock_now(); + utime_t when = last_connect_attempt; + when += cct->_conf.get_val<double>("mgr_connect_retry_interval"); + if (now < when) { + if (!connect_retry_callback) { + connect_retry_callback = timer.add_event_at( + when, + new FunctionContext([this](int r){ + connect_retry_callback = nullptr; + reconnect(); + })); + } + ldout(cct, 4) << "waiting to retry connect until " << when << dendl; + return; + } + } + + if (connect_retry_callback) { + timer.cancel_event(connect_retry_callback); + connect_retry_callback = nullptr; + } + + ldout(cct, 4) << "Starting new session with " << map.get_active_addrs() + << dendl; + last_connect_attempt = ceph_clock_now(); + + session.reset(new MgrSessionState()); + session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR, + map.get_active_addrs()); + + if (service_daemon) { + daemon_dirty_status = true; + } + + // Don't send an open if we're just a client (i.e. doing + // command-sending, not stats etc) + if (msgr->get_mytype() != CEPH_ENTITY_TYPE_CLIENT || service_daemon) { + _send_open(); + } + + // resend any pending commands + for (const auto &p : command_table.get_commands()) { + auto m = p.second.get_message({}); + ceph_assert(session); + ceph_assert(session->con); + session->con->send_message2(std::move(m)); + } +} + +void MgrClient::_send_open() +{ + if (session && session->con) { + auto open = new MMgrOpen(); + if (!service_name.empty()) { + open->service_name = service_name; + open->daemon_name = daemon_name; + } else { + open->daemon_name = cct->_conf->name.get_id(); + } + if (service_daemon) { + open->service_daemon = service_daemon; + open->daemon_metadata = daemon_metadata; + } + cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version); + cct->_conf.get_defaults_bl(&open->config_defaults_bl); + session->con->send_message(open); + } +} + +bool MgrClient::handle_mgr_map(MMgrMap *m) +{ + ceph_assert(lock.is_locked_by_me()); + + ldout(cct, 20) << *m << dendl; + + map = m->get_map(); + ldout(cct, 4) << "Got map version " << map.epoch << dendl; + m->put(); + + ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl; + + // Reset session? + if (!session || + session->con->get_peer_addrs() != map.get_active_addrs()) { + reconnect(); + } + + return true; +} + +bool MgrClient::ms_handle_reset(Connection *con) +{ + std::lock_guard l(lock); + if (session && con == session->con) { + ldout(cct, 4) << __func__ << " con " << con << dendl; + reconnect(); + return true; + } + return false; +} + +bool MgrClient::ms_handle_refused(Connection *con) +{ + // do nothing for now + return false; +} + +void MgrClient::_send_stats() +{ + _send_report(); + _send_pgstats(); + if (stats_period != 0) { + report_callback = timer.add_event_after( + stats_period, + new FunctionContext([this](int) { + _send_stats(); + })); + } +} + +void MgrClient::_send_report() +{ + ceph_assert(lock.is_locked_by_me()); + ceph_assert(session); + report_callback = nullptr; + + auto report = new MMgrReport(); + auto pcc = cct->get_perfcounters_collection(); + + pcc->with_counters([this, report]( + const PerfCountersCollectionImpl::CounterMap &by_path) + { + // Helper for checking whether a counter should be included + auto include_counter = [this]( + const PerfCounters::perf_counter_data_any_d &ctr, + const PerfCounters &perf_counters) + { + return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold; + }; + + // Helper for cases where we want to forget a counter + auto undeclare = [report, this](const std::string &path) + { + report->undeclare_types.push_back(path); + ldout(cct,20) << " undeclare " << path << dendl; + session->declared.erase(path); + }; + + ENCODE_START(1, 1, report->packed); + + // Find counters that no longer exist, and undeclare them + for (auto p = session->declared.begin(); p != session->declared.end(); ) { + const auto &path = *(p++); + if (by_path.count(path) == 0) { + undeclare(path); + } + } + + for (const auto &i : by_path) { + auto& path = i.first; + auto& data = *(i.second.data); + auto& perf_counters = *(i.second.perf_counters); + + // Find counters that still exist, but are no longer permitted by + // stats_threshold + if (!include_counter(data, perf_counters)) { + if (session->declared.count(path)) { + undeclare(path); + } + continue; + } + + if (session->declared.count(path) == 0) { + ldout(cct,20) << " declare " << path << dendl; + PerfCounterType type; + type.path = path; + if (data.description) { + type.description = data.description; + } + if (data.nick) { + type.nick = data.nick; + } + type.type = data.type; + type.priority = perf_counters.get_adjusted_priority(data.prio); + type.unit = data.unit; + report->declare_types.push_back(std::move(type)); + session->declared.insert(path); + } + + encode(static_cast<uint64_t>(data.u64), report->packed); + if (data.type & PERFCOUNTER_LONGRUNAVG) { + encode(static_cast<uint64_t>(data.avgcount), report->packed); + encode(static_cast<uint64_t>(data.avgcount2), report->packed); + } + } + ENCODE_FINISH(report->packed); + + ldout(cct, 20) << "sending " << session->declared.size() << " counters (" + "of possible " << by_path.size() << "), " + << report->declare_types.size() << " new, " + << report->undeclare_types.size() << " removed" + << dendl; + }); + + ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl; + + if (daemon_name.size()) { + report->daemon_name = daemon_name; + } else { + report->daemon_name = cct->_conf->name.get_id(); + } + report->service_name = service_name; + + if (daemon_dirty_status) { + report->daemon_status = daemon_status; + daemon_dirty_status = false; + } + + if (task_dirty_status) { + report->task_status = task_status; + task_dirty_status = false; + } + + report->daemon_health_metrics = std::move(daemon_health_metrics); + + cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl, + &last_config_bl_version); + + if (get_perf_report_cb) { + get_perf_report_cb(&report->osd_perf_metric_reports); + } + + session->con->send_message(report); +} + +void MgrClient::send_pgstats() +{ + std::lock_guard l(lock); + _send_pgstats(); +} + +void MgrClient::_send_pgstats() +{ + if (pgstats_cb && session) { + session->con->send_message(pgstats_cb()); + } +} + +bool MgrClient::handle_mgr_configure(MMgrConfigure *m) +{ + ceph_assert(lock.is_locked_by_me()); + + ldout(cct, 20) << *m << dendl; + + if (!session) { + lderr(cct) << "dropping unexpected configure message" << dendl; + m->put(); + return true; + } + + ldout(cct, 4) << "stats_period=" << m->stats_period << dendl; + + if (stats_threshold != m->stats_threshold) { + ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl; + stats_threshold = m->stats_threshold; + } + + if (set_perf_queries_cb) { + set_perf_queries_cb(m->osd_perf_metric_queries); + } + + bool starting = (stats_period == 0) && (m->stats_period != 0); + stats_period = m->stats_period; + if (starting) { + _send_stats(); + } + + m->put(); + return true; +} + +bool MgrClient::handle_mgr_close(MMgrClose *m) +{ + service_daemon = false; + shutdown_cond.Signal(); + m->put(); + return true; +} + +int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl, + bufferlist *outbl, string *outs, + Context *onfinish) +{ + std::lock_guard l(lock); + + ldout(cct, 20) << "cmd: " << cmd << dendl; + + if (map.epoch == 0 && mgr_optional) { + ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl; + return -EACCES; + } + + auto &op = command_table.start_command(); + op.cmd = cmd; + op.inbl = inbl; + op.outbl = outbl; + op.outs = outs; + op.on_finish = onfinish; + + if (session && session->con) { + // Leaving fsid argument null because it isn't used. + auto m = op.get_message({}); + session->con->send_message2(std::move(m)); + } else { + ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl; + } + return 0; +} + +bool MgrClient::handle_command_reply(MCommandReply *m) +{ + ceph_assert(lock.is_locked_by_me()); + + ldout(cct, 20) << *m << dendl; + + const auto tid = m->get_tid(); + if (!command_table.exists(tid)) { + ldout(cct, 4) << "handle_command_reply tid " << m->get_tid() + << " not found" << dendl; + m->put(); + return true; + } + + auto &op = command_table.get_command(tid); + if (op.outbl) { + op.outbl->claim(m->get_data()); + } + + if (op.outs) { + *(op.outs) = m->rs; + } + + if (op.on_finish) { + op.on_finish->complete(m->r); + } + + command_table.erase(tid); + + m->put(); + return true; +} + +int MgrClient::service_daemon_register( + const std::string& service, + const std::string& name, + const std::map<std::string,std::string>& metadata) +{ + std::lock_guard l(lock); + if (service_daemon) { + return -EEXIST; + } + ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl; + service_daemon = true; + service_name = service; + daemon_name = name; + daemon_metadata = metadata; + daemon_dirty_status = true; + + // late register? + if (msgr->get_mytype() == CEPH_ENTITY_TYPE_CLIENT && session && session->con) { + _send_open(); + } + + return 0; +} + +int MgrClient::service_daemon_update_status( + std::map<std::string,std::string>&& status) +{ + std::lock_guard l(lock); + ldout(cct,10) << status << dendl; + daemon_status = std::move(status); + daemon_dirty_status = true; + return 0; +} + +int MgrClient::service_daemon_update_task_status( + std::map<std::string,std::string> &&status) { + std::lock_guard l(lock); + ldout(cct,10) << status << dendl; + task_status = std::move(status); + task_dirty_status = true; + return 0; +} + +void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics) +{ + std::lock_guard l(lock); + daemon_health_metrics = std::move(metrics); +} + |