// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */


/*

  -- Storage scheme --

  Pre-quincy:

    - LogSummary contains last N entries for every channel
    - LogSummary (as "full") written on every commit
    - LogSummary contains "keys" which LogEntryKey hash_set for the
      same set of entries (for deduping)

  Quincy+:

    - LogSummary contains, for each channel,
       - start seq
       - end seq (last written seq + 1)
    - LogSummary contains an LRUSet for tracking dups
    - LogSummary written every N commits
    - each LogEntry written in a separate key
       - "%s/%08x" % (channel, seq) -> LogEntry
    - per-commit record includes channel -> begin (trim bounds)
    - 'external_log_to' meta records version to which we have logged externally

*/



#include <boost/algorithm/string/predicate.hpp>

#include <iterator>
#include <sstream>
#include <syslog.h>

#include "LogMonitor.h"
#include "Monitor.h"
#include "MonitorDBStore.h"

#include "messages/MMonCommand.h"
#include "messages/MLog.h"
#include "messages/MLogAck.h"
#include "common/Graylog.h"
#include "common/Journald.h"
#include "common/errno.h"
#include "common/strtol.h"
#include "include/ceph_assert.h"
#include "include/str_list.h"
#include "include/str_map.h"
#include "include/compat.h"
#include "include/utime_fmt.h"

#define dout_subsys ceph_subsys_mon

using namespace TOPNSPC::common;

using std::cerr;
using std::cout;
using std::dec;
using std::hex;
using std::list;
using std::map;
using std::make_pair;
using std::multimap;
using std::ostream;
using std::ostringstream;
using std::pair;
using std::set;
using std::setfill;
using std::string;
using std::stringstream;
using std::to_string;
using std::vector;
using std::unique_ptr;

using ceph::bufferlist;
using ceph::decode;
using ceph::encode;
using ceph::Formatter;
using ceph::JSONFormatter;
using ceph::make_message;
using ceph::mono_clock;
using ceph::mono_time;
using ceph::timespan_str;

string LogMonitor::log_channel_info::get_log_file(const string &channel)
{
  dout(25) << __func__ << " for channel '"
	   << channel << "'" << dendl;

  if (expanded_log_file.count(channel) == 0) {
    string fname = expand_channel_meta(
      get_str_map_key(log_file, channel, &CLOG_CONFIG_DEFAULT_KEY),
      channel);
    expanded_log_file[channel] = fname;

    dout(20) << __func__ << " for channel '"
	     << channel << "' expanded to '"
	     << fname << "'" << dendl;
  }
  return expanded_log_file[channel];
}


void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
{
  dout(20) << __func__ << " expand map: " << m << dendl;
  for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
    m[p->first] = expand_channel_meta(p->second, p->first);
  }
  dout(20) << __func__ << " expanded map: " << m << dendl;
}

string LogMonitor::log_channel_info::expand_channel_meta(
    const string &input,
    const string &change_to)
{
  size_t pos = string::npos;
  string s(input);
  while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
    string tmp = s.substr(0, pos) + change_to;
    if (pos+LOG_META_CHANNEL.length() < s.length())
      tmp += s.substr(pos+LOG_META_CHANNEL.length());
    s = tmp;
  }
  dout(20) << __func__ << " from '" << input
	   << "' to '" << s << "'" << dendl;

  return s;
}

bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
  string v = get_str_map_key(log_to_syslog, channel,
                             &CLOG_CONFIG_DEFAULT_KEY);
  // We expect booleans, but they are in k/v pairs, kept
  // as strings, in 'log_to_syslog'. We must ensure
  // compatibility with existing boolean handling, and so
  // we are here using a modified version of how
  // md_config_t::set_val_raw() handles booleans. We will
  // accept both 'true' and 'false', but will also check for
  // '1' and '0'. The main distiction between this and the
  // original code is that we will assume everything not '1',
  // '0', 'true' or 'false' to be 'false'.
  bool ret = false;

  if (boost::iequals(v, "false")) {
    ret = false;
  } else if (boost::iequals(v, "true")) {
    ret = true;
  } else {
    std::string err;
    int b = strict_strtol(v.c_str(), 10, &err);
    ret = (err.empty() && b == 1);
  }

  return ret;
}

ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
    const string &channel)
{
  dout(25) << __func__ << " for channel '"
	   << channel << "'" << dendl;

  if (graylogs.count(channel) == 0) {
    auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));

    graylog->set_fsid(g_conf().get_val<uuid_d>("fsid"));
    graylog->set_hostname(g_conf()->host);
    graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
					     &CLOG_CONFIG_DEFAULT_KEY),
			     atoi(get_str_map_key(log_to_graylog_port, channel,
						  &CLOG_CONFIG_DEFAULT_KEY).c_str()));

    graylogs[channel] = graylog;
    dout(20) << __func__ << " for channel '"
	     << channel << "' to graylog host '"
	     << log_to_graylog_host[channel] << ":"
	     << log_to_graylog_port[channel]
	     << "'" << dendl;
  }
  return graylogs[channel];
}

ceph::logging::JournaldClusterLogger &LogMonitor::log_channel_info::get_journald()
{
  dout(25) << __func__ << dendl;

  if (!journald) {
    journald = std::make_unique<ceph::logging::JournaldClusterLogger>();
  }
  return *journald;
}

void LogMonitor::log_channel_info::clear()
{
  log_to_syslog.clear();
  syslog_level.clear();
  syslog_facility.clear();
  log_file.clear();
  expanded_log_file.clear();
  log_file_level.clear();
  log_to_graylog.clear();
  log_to_graylog_host.clear();
  log_to_graylog_port.clear();
  log_to_journald.clear();
  graylogs.clear();
  journald.reset();
}

LogMonitor::log_channel_info::log_channel_info() = default;
LogMonitor::log_channel_info::~log_channel_info() = default;


#undef dout_prefix
#define dout_prefix _prefix(_dout, mon, get_last_committed())
static ostream& _prefix(std::ostream *_dout, Monitor &mon, version_t v) {
  return *_dout << "mon." << mon.name << "@" << mon.rank
		<< "(" << mon.get_state_name()
		<< ").log v" << v << " ";
}

ostream& operator<<(ostream &out, const LogMonitor &pm)
{
  return out << "log";
}

/*
 Tick function to update the map based on performance every N seconds
*/

void LogMonitor::tick() 
{
  if (!is_active()) return;

  dout(10) << *this << dendl;

}

void LogMonitor::create_initial()
{
  dout(10) << "create_initial -- creating initial map" << dendl;
  LogEntry e;
  e.name = g_conf()->name;
  e.rank = entity_name_t::MON(mon.rank);
  e.addrs = mon.messenger->get_myaddrs();
  e.stamp = ceph_clock_now();
  e.prio = CLOG_INFO;
  e.channel = CLOG_CHANNEL_CLUSTER;
  std::stringstream ss;
  ss << "mkfs " << mon.monmap->get_fsid();
  e.msg = ss.str();
  e.seq = 0;
  pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
}

void LogMonitor::update_from_paxos(bool *need_bootstrap)
{
  dout(10) << __func__ << dendl;
  version_t version = get_last_committed();
  dout(10) << __func__ << " version " << version
           << " summary v " << summary.version << dendl;

  log_external_backlog();

  if (version == summary.version)
    return;
  ceph_assert(version >= summary.version);

  version_t latest_full = get_version_latest_full();
  dout(10) << __func__ << " latest full " << latest_full << dendl;
  if ((latest_full > 0) && (latest_full > summary.version)) {
    bufferlist latest_bl;
    get_version_full(latest_full, latest_bl);
    ceph_assert(latest_bl.length() != 0);
    dout(7) << __func__ << " loading summary e" << latest_full << dendl;
    auto p = latest_bl.cbegin();
    decode(summary, p);
    dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
  }

  // walk through incrementals
  while (version > summary.version) {
    bufferlist bl;
    int err = get_version(summary.version+1, bl);
    ceph_assert(err == 0);
    ceph_assert(bl.length());

    auto p = bl.cbegin();
    __u8 struct_v;
    decode(struct_v, p);
    if (struct_v == 1) {
      // legacy pre-quincy commits
      while (!p.end()) {
	LogEntry le;
	le.decode(p);
	dout(7) << "update_from_paxos applying incremental log "
		<< summary.version+1 <<  " " << le << dendl;
	summary.add_legacy(le);
      }
    } else {
      uint32_t num;
      decode(num, p);
      while (num--) {
	LogEntry le;
	le.decode(p);
	dout(7) << "update_from_paxos applying incremental log "
		<< summary.version+1 <<  " " << le << dendl;
	summary.recent_keys.insert(le.key());
	summary.channel_info[le.channel].second++;
	// we may have logged past the (persisted) summary in a prior quorum
	if (version > external_log_to) {
	  log_external(le);
	}
      }
      map<string,version_t> prune_channels_to;
      decode(prune_channels_to, p);
      for (auto& [channel, prune_to] : prune_channels_to) {
	dout(20) << __func__ << " channel " << channel
		 << " pruned to " << prune_to << dendl;
	summary.channel_info[channel].first = prune_to;
      }
      // zero out pre-quincy fields (encode_pending needs this to reliably detect
      // upgrade)
      summary.tail_by_channel.clear();
      summary.keys.clear();
    }

    summary.version++;
    summary.prune(g_conf()->mon_log_max_summary);
  }
  dout(10) << " summary.channel_info " << summary.channel_info << dendl;
  external_log_to = version;
  mon.store->write_meta("external_log_to", stringify(external_log_to));

  check_subs();
}

void LogMonitor::log_external(const LogEntry& le)
{
  string channel = le.channel;
  if (channel.empty()) { // keep retrocompatibility
    channel = CLOG_CHANNEL_CLUSTER;
  }

  if (channels.do_log_to_syslog(channel)) {
    string level = channels.get_level(channel);
    string facility = channels.get_facility(channel);
    if (level.empty() || facility.empty()) {
      derr << __func__ << " unable to log to syslog -- level or facility"
	   << " not defined (level: " << level << ", facility: "
	   << facility << ")" << dendl;
    } else {
      le.log_to_syslog(channels.get_level(channel),
		       channels.get_facility(channel));
    }
  }

  if (channels.do_log_to_graylog(channel)) {
    ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
    if (graylog) {
      graylog->log_log_entry(&le);
    }
    dout(7) << "graylog: " << channel << " " << graylog
	    << " host:" << channels.log_to_graylog_host << dendl;
  }

  if (channels.do_log_to_journald(channel)) {
    auto &journald = channels.get_journald();
    journald.log_log_entry(le);
    dout(7) << "journald: " << channel << dendl;
  }

  bool do_stderr = g_conf().get_val<bool>("mon_cluster_log_to_stderr");
  int fd = -1;
  if (g_conf()->mon_cluster_log_to_file) {
    if (this->log_rotated.exchange(false)) {
      this->log_external_close_fds();
    }

    auto p = channel_fds.find(channel);
    if (p == channel_fds.end()) {
      string log_file = channels.get_log_file(channel);
      dout(20) << __func__ << " logging for channel '" << channel
	       << "' to file '" << log_file << "'" << dendl;
      if (!log_file.empty()) {
	fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
	if (fd < 0) {
	  int err = -errno;
	  dout(1) << "unable to write to '" << log_file << "' for channel '"
		  << channel << "': " << cpp_strerror(err) << dendl;
	} else {
	  channel_fds[channel] = fd;
	}
      }
    } else {
      fd = p->second;
    }
  }
  if (do_stderr || fd >= 0) {
    fmt::format_to(std::back_inserter(log_buffer), "{}\n", le);

    if (fd >= 0) {
      int err = safe_write(fd, log_buffer.data(), log_buffer.size());
      if (err < 0) {
	dout(1) << "error writing to '" << channels.get_log_file(channel)
		<< "' for channel '" << channel
		<< ": " << cpp_strerror(err) << dendl;
	::close(fd);
	channel_fds.erase(channel);
      }
    }

    if (do_stderr) {
      fmt::print(std::cerr, "{} {}", channel, std::string_view(log_buffer.data(), log_buffer.size()));
    }

    log_buffer.clear();
  }
}

void LogMonitor::log_external_close_fds()
{
  for (auto& [channel, fd] : channel_fds) {
    if (fd >= 0) {
      dout(10) << __func__ << " closing " << channel << " (" << fd << ")" << dendl;
      ::close(fd);
    }
  }
  channel_fds.clear();
}

/// catch external logs up to summary.version
void LogMonitor::log_external_backlog()
{
  if (!external_log_to) {
    std::string cur_str;
    int r = mon.store->read_meta("external_log_to", &cur_str);
    if (r == 0) {
      external_log_to = std::stoull(cur_str);
      dout(10) << __func__ << " initialized external_log_to = " << external_log_to
	       << " (recorded log_to position)" << dendl;
    } else {
      // pre-quincy, we assumed that anything through summary.version was
      // logged externally.
      assert(r == -ENOENT);
      external_log_to = summary.version;
      dout(10) << __func__ << " initialized external_log_to = " << external_log_to
	       << " (summary v " << summary.version << ")" << dendl;
    }
  }
  // we may have logged ahead of summary.version, but never ahead of paxos
  if (external_log_to > get_last_committed()) {
    derr << __func__ << " rewinding external_log_to from " << external_log_to
	 << " -> " << get_last_committed() << " (sync_force? mon rebuild?)" << dendl;
    external_log_to = get_last_committed();
  }
  if (external_log_to >= summary.version) {
    return;
  }
  if (auto first = get_first_committed(); external_log_to < first) {
    derr << __func__ << " local logs at " << external_log_to
	 << ", skipping to " << first << dendl;
    external_log_to = first;
    // FIXME: write marker in each channel log file?
  }
  for (; external_log_to < summary.version; ++external_log_to) {
    bufferlist bl;
    int err = get_version(external_log_to+1, bl);
    ceph_assert(err == 0);
    ceph_assert(bl.length());
    auto p = bl.cbegin();
    __u8 v;
    decode(v, p);
    int32_t num = -2;
    if (v >= 2) {
      decode(num, p);
    }
    while ((num == -2 && !p.end()) || (num >= 0 && num--)) {
      LogEntry le;
      le.decode(p);
      log_external(le);
    }
  }
  mon.store->write_meta("external_log_to", stringify(external_log_to));
}

void LogMonitor::create_pending()
{
  pending_log.clear();
  pending_keys.clear();
  dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
}

void LogMonitor::generate_logentry_key(
  const std::string& channel,
  version_t v,
  std::string *out)
{
  out->append(channel);
  out->append("/");
  char vs[10];
  snprintf(vs, sizeof(vs), "%08llx", (unsigned long long)v);
  out->append(vs);
}

void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
  version_t version = get_last_committed() + 1;
  bufferlist bl;
  dout(10) << __func__ << " v" << version << dendl;

  if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
    // legacy encoding for pre-quincy quorum
    __u8 struct_v = 1;
    encode(struct_v, bl);
    for (auto& p : pending_log) {
      p.second.encode(bl, mon.get_quorum_con_features());
    }
    put_version(t, version, bl);
    put_last_committed(t, version);
    return;
  }
  
  __u8 struct_v = 2;
  encode(struct_v, bl);

  // first commit after upgrading to quincy?
  if (!summary.tail_by_channel.empty()) {
    // include past log entries
    for (auto& p : summary.tail_by_channel) {
      for (auto& q : p.second) {
	pending_log.emplace(make_pair(q.second.stamp, q.second));
      }
    }
  }

  // record new entries
  auto pending_channel_info = summary.channel_info;
  uint32_t num = pending_log.size();
  encode(num, bl);
  dout(20) << __func__ << " writing " << num << " entries" << dendl;
  for (auto& p : pending_log) {
    bufferlist ebl;
    p.second.encode(ebl, mon.get_quorum_con_features());

    auto& bounds = pending_channel_info[p.second.channel];
    version_t v = bounds.second++;
    std::string key;
    generate_logentry_key(p.second.channel, v, &key);
    t->put(get_service_name(), key, ebl);
    
    bl.claim_append(ebl);
  }

  // prune log entries?
  map<string,version_t> prune_channels_to;
  for (auto& [channel, info] : summary.channel_info) {
    if (info.second - info.first > g_conf()->mon_log_max) {
      const version_t from = info.first;
      const version_t to = info.second - g_conf()->mon_log_max;
      dout(10) << __func__ << " pruning channel " << channel
	       << " " << from << " -> " << to << dendl;
      prune_channels_to[channel] = to;
      pending_channel_info[channel].first = to;
      for (version_t v = from; v < to; ++v) {
	std::string key;
	generate_logentry_key(channel, v, &key);
	t->erase(get_service_name(), key);
      }
    }
  }
  dout(20) << __func__ << " prune_channels_to " << prune_channels_to << dendl;
  encode(prune_channels_to, bl);

  put_version(t, version, bl);
  put_last_committed(t, version);
}

bool LogMonitor::should_stash_full()
{
  if (mon.monmap->min_mon_release < ceph_release_t::quincy) {
    // commit a LogSummary on every commit
    return true;
  }

  // store periodic summary
  auto period = std::min<uint64_t>(
    g_conf()->mon_log_full_interval,
    g_conf()->mon_max_log_epochs
    );
  return (get_last_committed() - get_version_latest_full() > period);
}


void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
{
  dout(10) << __func__ << " log v " << summary.version << dendl;
  ceph_assert(get_last_committed() == summary.version);

  bufferlist summary_bl;
  encode(summary, summary_bl, mon.get_quorum_con_features());

  put_version_full(t, summary.version, summary_bl);
  put_version_latest_full(t, summary.version);
}

version_t LogMonitor::get_trim_to() const
{
  if (!mon.is_leader())
    return 0;

  unsigned max = g_conf()->mon_max_log_epochs;
  version_t version = get_last_committed();
  if (version > max)
    return version - max;
  return 0;
}

bool LogMonitor::preprocess_query(MonOpRequestRef op)
{
  op->mark_logmon_event("preprocess_query");
  auto m = op->get_req<PaxosServiceMessage>();
  dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
  switch (m->get_type()) {
  case MSG_MON_COMMAND:
    try {
      return preprocess_command(op);
    } catch (const bad_cmd_get& e) {
      bufferlist bl;
      mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
      return true;
    }

  case MSG_LOG:
    return preprocess_log(op);

  default:
    ceph_abort();
    return true;
  }
}

bool LogMonitor::prepare_update(MonOpRequestRef op)
{
  op->mark_logmon_event("prepare_update");
  auto m = op->get_req<PaxosServiceMessage>();
  dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
  switch (m->get_type()) {
  case MSG_MON_COMMAND:
    try {
      return prepare_command(op);
    } catch (const bad_cmd_get& e) {
      bufferlist bl;
      mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
      return true;
    }
  case MSG_LOG:
    return prepare_log(op);
  default:
    ceph_abort();
    return false;
  }
}

bool LogMonitor::preprocess_log(MonOpRequestRef op)
{
  op->mark_logmon_event("preprocess_log");
  auto m = op->get_req<MLog>();
  dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
  int num_new = 0;

  MonSession *session = op->get_session();
  if (!session)
    goto done;
  if (!session->is_capable("log", MON_CAP_W)) {
    dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
	    << session->caps << dendl;
    goto done;
  }
  
  for (auto p = m->entries.begin();
       p != m->entries.end();
       ++p) {
    if (!summary.contains(p->key()))
      num_new++;
  }
  if (!num_new) {
    dout(10) << "  nothing new" << dendl;
    goto done;
  }

  return false;

 done:
  mon.no_reply(op);
  return true;
}

struct LogMonitor::C_Log : public C_MonOp {
  LogMonitor *logmon;
  C_Log(LogMonitor *p, MonOpRequestRef o) :
    C_MonOp(o), logmon(p) {}
  void _finish(int r) override {
    if (r == -ECANCELED) {
      return;
    }
    logmon->_updated_log(op);
  }
};

bool LogMonitor::prepare_log(MonOpRequestRef op) 
{
  op->mark_logmon_event("prepare_log");
  auto m = op->get_req<MLog>();
  dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;

  if (m->fsid != mon.monmap->fsid) {
    dout(0) << "handle_log on fsid " << m->fsid << " != " << mon.monmap->fsid 
	    << dendl;
    return false;
  }

  for (auto p = m->entries.begin();
       p != m->entries.end();
       ++p) {
    dout(10) << " logging " << *p << dendl;
    if (!summary.contains(p->key()) &&
	!pending_keys.count(p->key())) {
      pending_keys.insert(p->key());
      pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
    }
  }
  wait_for_finished_proposal(op, new C_Log(this, op));
  return true;
}

void LogMonitor::_updated_log(MonOpRequestRef op)
{
  auto m = op->get_req<MLog>();
  dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
  mon.send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
}

bool LogMonitor::should_propose(double& delay)
{
  // commit now if we have a lot of pending events
  if (g_conf()->mon_max_log_entries_per_event > 0 &&
      pending_log.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event)
    return true;

  // otherwise fall back to generic policy
  return PaxosService::should_propose(delay);
}


bool LogMonitor::preprocess_command(MonOpRequestRef op)
{
  op->mark_logmon_event("preprocess_command");
  auto m = op->get_req<MMonCommand>();
  int r = -EINVAL;
  bufferlist rdata;
  stringstream ss;

  cmdmap_t cmdmap;
  if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
    string rs = ss.str();
    mon.reply_command(op, -EINVAL, rs, get_last_committed());
    return true;
  }
  MonSession *session = op->get_session();
  if (!session) {
    mon.reply_command(op, -EACCES, "access denied", get_last_committed());
    return true;
  }

  string prefix;
  cmd_getval(cmdmap, "prefix", prefix);

  string format = cmd_getval_or<string>(cmdmap, "format", "plain");
  boost::scoped_ptr<Formatter> f(Formatter::create(format));

  if (prefix == "log last") {
    int64_t num = 20;
    cmd_getval(cmdmap, "num", num);
    if (f) {
      f->open_array_section("tail");
    }

    std::string level_str;
    clog_type level;
    if (cmd_getval(cmdmap, "level", level_str)) {
      level = LogEntry::str_to_level(level_str);
      if (level == CLOG_UNKNOWN) {
        ss << "Invalid severity '" << level_str << "'";
        mon.reply_command(op, -EINVAL, ss.str(), get_last_committed());
        return true;
      }
    } else {
      level = CLOG_INFO;
    }

    std::string channel;
    if (!cmd_getval(cmdmap, "channel", channel)) {
      channel = CLOG_CHANNEL_DEFAULT;
    }

    // We'll apply this twice, once while counting out lines
    // and once while outputting them.
    auto match = [level](const LogEntry &entry) {
      return entry.prio >= level;
    };

    ostringstream ss;
    if (!summary.tail_by_channel.empty()) {
      // pre-quincy compat
      // Decrement operation that sets to container end when hitting rbegin
      if (channel == "*") {
	list<LogEntry> full_tail;
	summary.build_ordered_tail_legacy(&full_tail);
	auto rp = full_tail.rbegin();
	for (; num > 0 && rp != full_tail.rend(); ++rp) {
	  if (match(*rp)) {
	    num--;
	  }
	}
	if (rp == full_tail.rend()) {
	  --rp;
	}

	// Decrement a reverse iterator such that going past rbegin()
	// sets it to rend().  This is for writing a for() loop that
	// goes up to (and including) rbegin()
	auto dec = [&rp, &full_tail] () {
		     if (rp == full_tail.rbegin()) {
		       rp = full_tail.rend();
		     } else {
		       --rp;
		     }
		   };

	// Move forward to the end of the container (decrement the reverse
	// iterator).
	for (; rp != full_tail.rend(); dec()) {
	  if (!match(*rp)) {
	    continue;
	  }
	  if (f) {
	    f->dump_object("entry", *rp);
	  } else {
	    ss << *rp << "\n";
	  }
	}
      } else {
	auto p = summary.tail_by_channel.find(channel);
	if (p != summary.tail_by_channel.end()) {
	  auto rp = p->second.rbegin();
	  for (; num > 0 && rp != p->second.rend(); ++rp) {
	    if (match(rp->second)) {
	      num--;
	    }
	  }
	  if (rp == p->second.rend()) {
	    --rp;
	  }

	  // Decrement a reverse iterator such that going past rbegin()
	  // sets it to rend().  This is for writing a for() loop that
	  // goes up to (and including) rbegin()
	  auto dec = [&rp, &p] () {
		       if (rp == p->second.rbegin()) {
			 rp = p->second.rend();
		       } else {
			 --rp;
		       }
		     };
	  
	  // Move forward to the end of the container (decrement the reverse
	  // iterator).
	  for (; rp != p->second.rend(); dec()) {
	    if (!match(rp->second)) {
	      continue;
	    }
	    if (f) {
	      f->dump_object("entry", rp->second);
	    } else {
	      ss << rp->second << "\n";
	    }
	  }
	}
      }
    } else {
      // quincy+
      if (channel == "*") {
	// tail all channels; we need to mix by timestamp
	multimap<utime_t,LogEntry> entries;  // merge+sort all channels by timestamp
	for (auto& p : summary.channel_info) {
	  version_t from = p.second.first;
	  version_t to = p.second.second;
	  version_t start;
	  if (to > (version_t)num) {
	    start = std::max(to - num, from);
	  } else {
	    start = from;
	  }
	  dout(10) << __func__ << " channel " << p.first
		   << " from " << from << " to " << to << dendl;
	  for (version_t v = start; v < to; ++v) {
	    bufferlist ebl;
	    string key;
	    generate_logentry_key(p.first, v, &key);
	    int r = mon.store->get(get_service_name(), key, ebl);
	    if (r < 0) {
	      derr << __func__ << " missing key " << key << dendl;
	      continue;
	    }
	    LogEntry le;
	    auto p = ebl.cbegin();
	    decode(le, p);
	    entries.insert(make_pair(le.stamp, le));
	  }
	}
	while ((int)entries.size() > num) {
	  entries.erase(entries.begin());
	}
	for (auto& p : entries) {
	  if (!match(p.second)) {
	    continue;
	  }
	  if (f) {
	    f->dump_object("entry", p.second);
	  } else {
	    ss << p.second << "\n";
	  }
	}
      } else {
	// tail one channel
	auto p = summary.channel_info.find(channel);
	if (p != summary.channel_info.end()) {
	  version_t from = p->second.first;
	  version_t to = p->second.second;
	  version_t start;
	  if (to > (version_t)num) {
	    start = std::max(to - num, from);
	  } else {
	    start = from;
	  }
	  dout(10) << __func__ << " from " << from << " to " << to << dendl;
	  for (version_t v = start; v < to; ++v) {
	    bufferlist ebl;
	    string key;
	    generate_logentry_key(channel, v, &key);
	    int r = mon.store->get(get_service_name(), key, ebl);
	    if (r < 0) {
	      derr << __func__ << " missing key " << key << dendl;
	      continue;
	    }
	    LogEntry le;
	    auto p = ebl.cbegin();
	    decode(le, p);
	    if (match(le)) {
	      if (f) {
	        f->dump_object("entry", le);
	      } else {
	        ss << le << "\n";
	      }
	    }
	  }
	}
      }
    }
    if (f) {
      f->close_section();
      f->flush(rdata);
    } else {
      rdata.append(ss.str());
    }
    r = 0;
  } else {
    return false;
  }

  string rs;
  getline(ss, rs);
  mon.reply_command(op, r, rs, rdata, get_last_committed());
  return true;
}


bool LogMonitor::prepare_command(MonOpRequestRef op)
{
  op->mark_logmon_event("prepare_command");
  auto m = op->get_req<MMonCommand>();
  stringstream ss;
  string rs;
  int err = -EINVAL;

  cmdmap_t cmdmap;
  if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
    // ss has reason for failure
    string rs = ss.str();
    mon.reply_command(op, -EINVAL, rs, get_last_committed());
    return true;
  }

  string prefix;
  cmd_getval(cmdmap, "prefix", prefix);

  MonSession *session = op->get_session();
  if (!session) {
    mon.reply_command(op, -EACCES, "access denied", get_last_committed());
    return true;
  }

  if (prefix == "log") {
    vector<string> logtext;
    cmd_getval(cmdmap, "logtext", logtext);
    LogEntry le;
    le.rank = m->get_orig_source();
    le.addrs.v.push_back(m->get_orig_source_addr());
    le.name = session->entity_name;
    le.stamp = m->get_recv_stamp();
    le.seq = 0;
    string level_str = cmd_getval_or<string>(cmdmap, "level", "info");
    le.prio = LogEntry::str_to_level(level_str);
    le.channel = CLOG_CHANNEL_DEFAULT;
    le.msg = str_join(logtext, " ");
    pending_keys.insert(le.key());
    pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
    wait_for_finished_proposal(op, new Monitor::C_Command(
          mon, op, 0, string(), get_last_committed() + 1));
    return true;
  }

  getline(ss, rs);
  mon.reply_command(op, err, rs, get_last_committed());
  return false;
}

void LogMonitor::dump_info(Formatter *f)
{
  f->dump_unsigned("logm_first_committed", get_first_committed());
  f->dump_unsigned("logm_last_committed", get_last_committed());
}

int LogMonitor::sub_name_to_id(const string& n)
{
  if (n.substr(0, 4) == "log-" && n.size() > 4) {
    return LogEntry::str_to_level(n.substr(4));
  } else {
    return CLOG_UNKNOWN;
  }
}

void LogMonitor::check_subs()
{
  dout(10) << __func__ << dendl;
  for (map<string, xlist<Subscription*>*>::iterator i = mon.session_map.subs.begin();
       i != mon.session_map.subs.end();
       ++i) {
    for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
      if (sub_name_to_id((*j)->type) >= 0)
	check_sub(*j);
    }
  }
}

void LogMonitor::check_sub(Subscription *s)
{
  dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;

  int sub_level = sub_name_to_id(s->type);
  ceph_assert(sub_level >= 0);

  version_t summary_version = summary.version;
  if (s->next > summary_version) {
    dout(10) << __func__ << " client " << s->session->name
	    << " requested version (" << s->next << ") is greater than ours (" 
	    << summary_version << "), which means we already sent him" 
	    << " everything we have." << dendl;
    return;
  } 
 
  MLog *mlog = new MLog(mon.monmap->fsid);

  if (s->next == 0) { 
    /* First timer, heh? */
    _create_sub_incremental(mlog, sub_level, get_last_committed());
  } else {
    /* let us send you an incremental log... */
    _create_sub_incremental(mlog, sub_level, s->next);
  }

  dout(10) << __func__ << " sending message to " << s->session->name
	  << " with " << mlog->entries.size() << " entries"
	  << " (version " << mlog->version << ")" << dendl;
  
  if (!mlog->entries.empty()) {
    s->session->con->send_message(mlog);
  } else {
    mlog->put();
  }
  if (s->onetime)
    mon.session_map.remove_sub(s);
  else
    s->next = summary_version+1;
}

/**
 * Create an incremental log message from version \p sv to \p summary.version
 *
 * @param mlog	Log message we'll send to the client with the messages received
 *		since version \p sv, inclusive.
 * @param level	The max log level of the messages the client is interested in.
 * @param sv	The version the client is looking for.
 */
void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
{
  dout(10) << __func__ << " level " << level << " ver " << sv 
	  << " cur summary ver " << summary.version << dendl; 

  if (sv < get_first_committed()) {
    dout(10) << __func__ << " skipped from " << sv
	     << " to first_committed " << get_first_committed() << dendl;
    LogEntry le;
    le.stamp = ceph_clock_now();
    le.prio = CLOG_WARN;
    ostringstream ss;
    ss << "skipped log messages from " << sv << " to " << get_first_committed();
    le.msg = ss.str();
    mlog->entries.push_back(le);
    sv = get_first_committed();
  }

  version_t summary_ver = summary.version;
  while (sv && sv <= summary_ver) {
    bufferlist bl;
    int err = get_version(sv, bl);
    ceph_assert(err == 0);
    ceph_assert(bl.length());
    auto p = bl.cbegin();
    __u8 v;
    decode(v, p);
    int32_t num = -2;
    if (v >= 2) {
      decode(num, p);
      dout(20) << __func__ << " sv " << sv << " has " << num << " entries" << dendl;
    }
    while ((num == -2 && !p.end()) || (num >= 0 && num--)) {
      LogEntry le;
      le.decode(p);
      if (le.prio < level) {
	dout(20) << __func__ << " requested " << level 
		 << ", skipping " << le << dendl;
	continue;
      }
      mlog->entries.push_back(le);
    }
    mlog->version = sv++;
  }

  dout(10) << __func__ << " incremental message ready (" 
	   << mlog->entries.size() << " entries)" << dendl;
}

void LogMonitor::update_log_channels()
{
  ostringstream oss;

  channels.clear();

  int r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_syslog"),
    oss, &channels.log_to_syslog,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_syslog_level"),
    oss, &channels.syslog_level,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
         << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_syslog_facility"),
    oss, &channels.syslog_facility,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
         << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_file"), oss,
    &channels.log_file,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_file_level"), oss,
    &channels.log_file_level,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
         << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_graylog"), oss,
    &channels.log_to_graylog,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
         << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_graylog_host"), oss,
    &channels.log_to_graylog_host,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
         << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_graylog_port"), oss,
    &channels.log_to_graylog_port,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
         << dendl;
    return;
  }

  r = get_conf_str_map_helper(
    g_conf().get_val<string>("mon_cluster_log_to_journald"), oss,
    &channels.log_to_journald,
    CLOG_CONFIG_DEFAULT_KEY);
  if (r < 0) {
    derr << __func__ << " error parsing 'mon_cluster_log_to_journald'"
         << dendl;
    return;
  }

  channels.expand_channel_meta();
  log_external_close_fds();
}


void LogMonitor::handle_conf_change(const ConfigProxy& conf,
                                    const std::set<std::string> &changed)
{
  if (changed.count("mon_cluster_log_to_syslog") ||
      changed.count("mon_cluster_log_to_syslog_level") ||
      changed.count("mon_cluster_log_to_syslog_facility") ||
      changed.count("mon_cluster_log_file") ||
      changed.count("mon_cluster_log_file_level") ||
      changed.count("mon_cluster_log_to_graylog") ||
      changed.count("mon_cluster_log_to_graylog_host") ||
      changed.count("mon_cluster_log_to_graylog_port") ||
      changed.count("mon_cluster_log_to_journald") ||
      changed.count("mon_cluster_log_to_file")) {
    update_log_channels();
  }
}