summaryrefslogtreecommitdiffstats
path: root/src/ceph_mon.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/ceph_mon.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/ceph_mon.cc')
-rw-r--r--src/ceph_mon.cc946
1 files changed, 946 insertions, 0 deletions
diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc
new file mode 100644
index 000000000..ed949bda0
--- /dev/null
+++ b/src/ceph_mon.cc
@@ -0,0 +1,946 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <iostream>
+#include <string>
+
+#include "common/config.h"
+#include "include/ceph_features.h"
+
+#include "mon/MonMap.h"
+#include "mon/Monitor.h"
+#include "mon/MonitorDBStore.h"
+#include "mon/MonClient.h"
+
+#include "msg/Messenger.h"
+
+#include "include/CompatSet.h"
+
+#include "common/ceph_argparse.h"
+#include "common/pick_address.h"
+#include "common/Throttle.h"
+#include "common/Timer.h"
+#include "common/errno.h"
+#include "common/Preforker.h"
+
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+
+#include "perfglue/heap_profiler.h"
+
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_mon
+
+using std::cerr;
+using std::cout;
+using std::list;
+using std::map;
+using std::ostringstream;
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+using ceph::JSONFormatter;
+
+Monitor *mon = NULL;
+
+
+void handle_mon_signal(int signum)
+{
+ if (mon)
+ mon->handle_signal(signum);
+}
+
+
+int obtain_monmap(MonitorDBStore &store, bufferlist &bl)
+{
+ dout(10) << __func__ << dendl;
+ /*
+ * the monmap may be in one of three places:
+ * 'mon_sync:temp_newer_monmap' - stashed newer map for bootstrap
+ * 'monmap:<latest_version_no>' - the monmap we'd really like to have
+ * 'mon_sync:latest_monmap' - last monmap backed up for the last sync
+ * 'mkfs:monmap' - a monmap resulting from mkfs
+ */
+
+ if (store.exists("monmap", "last_committed")) {
+ version_t latest_ver = store.get("monmap", "last_committed");
+ if (store.exists("monmap", latest_ver)) {
+ int err = store.get("monmap", latest_ver, bl);
+ ceph_assert(err == 0);
+ ceph_assert(bl.length() > 0);
+ dout(10) << __func__ << " read last committed monmap ver "
+ << latest_ver << dendl;
+
+ // see if there is stashed newer map (see bootstrap())
+ if (store.exists("mon_sync", "temp_newer_monmap")) {
+ bufferlist bl2;
+ int err = store.get("mon_sync", "temp_newer_monmap", bl2);
+ ceph_assert(err == 0);
+ ceph_assert(bl2.length() > 0);
+ MonMap b;
+ b.decode(bl2);
+ if (b.get_epoch() > latest_ver) {
+ dout(10) << __func__ << " using stashed monmap " << b.get_epoch()
+ << " instead" << dendl;
+ bl = std::move(bl2);
+ } else {
+ dout(10) << __func__ << " ignoring stashed monmap " << b.get_epoch()
+ << dendl;
+ }
+ }
+ return 0;
+ }
+ }
+
+ if (store.exists("mon_sync", "in_sync")
+ || store.exists("mon_sync", "force_sync")) {
+ dout(10) << __func__ << " detected aborted sync" << dendl;
+ if (store.exists("mon_sync", "latest_monmap")) {
+ int err = store.get("mon_sync", "latest_monmap", bl);
+ ceph_assert(err == 0);
+ ceph_assert(bl.length() > 0);
+ dout(10) << __func__ << " read backup monmap" << dendl;
+ return 0;
+ }
+ }
+
+ if (store.exists("mon_sync", "temp_newer_monmap")) {
+ dout(10) << __func__ << " found temp_newer_monmap" << dendl;
+ int err = store.get("mon_sync", "temp_newer_monmap", bl);
+ ceph_assert(err == 0);
+ ceph_assert(bl.length() > 0);
+ return 0;
+ }
+
+ if (store.exists("mkfs", "monmap")) {
+ dout(10) << __func__ << " found mkfs monmap" << dendl;
+ int err = store.get("mkfs", "monmap", bl);
+ ceph_assert(err == 0);
+ ceph_assert(bl.length() > 0);
+ return 0;
+ }
+
+ derr << __func__ << " unable to find a monmap" << dendl;
+ return -ENOENT;
+}
+
+int check_mon_data_exists()
+{
+ string mon_data = g_conf()->mon_data;
+ struct stat buf;
+ if (::stat(mon_data.c_str(), &buf)) {
+ if (errno != ENOENT) {
+ derr << "stat(" << mon_data << ") " << cpp_strerror(errno) << dendl;
+ }
+ return -errno;
+ }
+ return 0;
+}
+
+/** Check whether **mon data** is empty.
+ *
+ * Being empty means mkfs has not been run and there's no monitor setup
+ * at **g_conf()->mon_data**.
+ *
+ * If the directory g_conf()->mon_data is not empty we will return -ENOTEMPTY.
+ * Otherwise we will return 0. Any other negative returns will represent
+ * a failure to be handled by the caller.
+ *
+ * @return **0** on success, -ENOTEMPTY if not empty or **-errno** otherwise.
+ */
+int check_mon_data_empty()
+{
+ string mon_data = g_conf()->mon_data;
+
+ DIR *dir = ::opendir(mon_data.c_str());
+ if (!dir) {
+ derr << "opendir(" << mon_data << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ int code = 0;
+ struct dirent *de = nullptr;
+ errno = 0;
+ while ((de = ::readdir(dir))) {
+ if (string(".") != de->d_name &&
+ string("..") != de->d_name &&
+ string("kv_backend") != de->d_name) {
+ code = -ENOTEMPTY;
+ break;
+ }
+ }
+ if (!de && errno) {
+ derr << "readdir(" << mon_data << ") " << cpp_strerror(errno) << dendl;
+ code = -errno;
+ }
+
+ ::closedir(dir);
+
+ return code;
+}
+
+static void usage()
+{
+ cout << "usage: ceph-mon -i <ID> [flags]\n"
+ << " --debug_mon n\n"
+ << " debug monitor level (e.g. 10)\n"
+ << " --mkfs\n"
+ << " build fresh monitor fs\n"
+ << " --force-sync\n"
+ << " force a sync from another mon by wiping local data (BE CAREFUL)\n"
+ << " --yes-i-really-mean-it\n"
+ << " mandatory safeguard for --force-sync\n"
+ << " --compact\n"
+ << " compact the monitor store\n"
+ << " --osdmap <filename>\n"
+ << " only used when --mkfs is provided: load the osdmap from <filename>\n"
+ << " --inject-monmap <filename>\n"
+ << " write the <filename> monmap to the local monitor store and exit\n"
+ << " --extract-monmap <filename>\n"
+ << " extract the monmap from the local monitor store and exit\n"
+ << " --mon-data <directory>\n"
+ << " where the mon store and keyring are located\n"
+ << " --set-crush-location <bucket>=<foo>"
+ << " sets monitor's crush bucket location (only for stretch mode)"
+ << std::endl;
+ generic_server_usage();
+}
+
+entity_addrvec_t make_mon_addrs(entity_addr_t a)
+{
+ entity_addrvec_t addrs;
+ if (a.get_port() == 0) {
+ a.set_type(entity_addr_t::TYPE_MSGR2);
+ a.set_port(CEPH_MON_PORT_IANA);
+ addrs.v.push_back(a);
+ a.set_type(entity_addr_t::TYPE_LEGACY);
+ a.set_port(CEPH_MON_PORT_LEGACY);
+ addrs.v.push_back(a);
+ } else if (a.get_port() == CEPH_MON_PORT_LEGACY) {
+ a.set_type(entity_addr_t::TYPE_LEGACY);
+ addrs.v.push_back(a);
+ } else if (a.get_type() == entity_addr_t::TYPE_ANY) {
+ a.set_type(entity_addr_t::TYPE_MSGR2);
+ addrs.v.push_back(a);
+ } else {
+ addrs.v.push_back(a);
+ }
+ return addrs;
+}
+
+int main(int argc, const char **argv)
+{
+ // reset our process name, in case we did a respawn, so that it's not
+ // left as "exe".
+ ceph_pthread_setname(pthread_self(), "ceph-mon");
+
+ int err;
+
+ bool mkfs = false;
+ bool compact = false;
+ bool force_sync = false;
+ bool yes_really = false;
+ std::string osdmapfn, inject_monmap, extract_monmap, crush_loc;
+
+ vector<const char*> args;
+ argv_to_vec(argc, argv, args);
+ if (args.empty()) {
+ cerr << argv[0] << ": -h or --help for usage" << std::endl;
+ exit(1);
+ }
+ if (ceph_argparse_need_usage(args)) {
+ usage();
+ exit(0);
+ }
+
+ // We need to specify some default values that may be overridden by the
+ // user, that are specific to the monitor. The options we are overriding
+ // are also used on the OSD (or in any other component that uses leveldb),
+ // so changing the global defaults is not an option.
+ // This is not the prettiest way of doing this, especially since it has us
+ // having a different place defining default values, but it's not horribly
+ // wrong enough to prevent us from doing it :)
+ //
+ // NOTE: user-defined options will take precedence over ours.
+ //
+ // leveldb_write_buffer_size = 32*1024*1024 = 33554432 // 32MB
+ // leveldb_cache_size = 512*1024*1204 = 536870912 // 512MB
+ // leveldb_block_size = 64*1024 = 65536 // 64KB
+ // leveldb_compression = false
+ // leveldb_log = ""
+ map<string,string> defaults = {
+ { "leveldb_write_buffer_size", "33554432" },
+ { "leveldb_cache_size", "536870912" },
+ { "leveldb_block_size", "65536" },
+ { "leveldb_compression", "false"},
+ { "leveldb_log", "" },
+ { "keyring", "$mon_data/keyring" },
+ };
+
+ int flags = 0;
+ {
+ vector<const char*> args_copy = args;
+ std::string val;
+ for (std::vector<const char*>::iterator i = args_copy.begin();
+ i != args_copy.end(); ) {
+ if (ceph_argparse_double_dash(args_copy, i)) {
+ break;
+ } else if (ceph_argparse_flag(args_copy, i, "--mkfs", (char*)NULL)) {
+ flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
+ } else if (ceph_argparse_witharg(args_copy, i, &val, "--inject_monmap", (char*)NULL)) {
+ flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
+ } else if (ceph_argparse_witharg(args_copy, i, &val, "--extract-monmap", (char*)NULL)) {
+ flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
+ } else {
+ ++i;
+ }
+ }
+ }
+
+ // don't try to get config from mon cluster during startup
+ flags |= CINIT_FLAG_NO_MON_CONFIG;
+
+ auto cct = global_init(&defaults, args,
+ CEPH_ENTITY_TYPE_MON, CODE_ENVIRONMENT_DAEMON,
+ flags);
+ ceph_heap_profiler_init();
+
+ std::string val;
+ for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
+ if (ceph_argparse_double_dash(args, i)) {
+ break;
+ } else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) {
+ mkfs = true;
+ } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {
+ compact = true;
+ } else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) {
+ force_sync = true;
+ } else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) {
+ yes_really = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {
+ osdmapfn = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {
+ inject_monmap = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--extract-monmap", (char*)NULL)) {
+ extract_monmap = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--set-crush-location", (char*)NULL)) {
+ crush_loc = val;
+ } else {
+ ++i;
+ }
+ }
+ if (!args.empty()) {
+ cerr << "too many arguments: " << args << std::endl;
+ exit(1);
+ }
+
+ if (force_sync && !yes_really) {
+ cerr << "are you SURE you want to force a sync? this will erase local data and may\n"
+ << "break your mon cluster. pass --yes-i-really-mean-it if you do." << std::endl;
+ exit(1);
+ }
+
+ if (g_conf()->mon_data.empty()) {
+ cerr << "must specify '--mon-data=foo' data path" << std::endl;
+ exit(1);
+ }
+
+ if (g_conf()->name.get_id().empty()) {
+ cerr << "must specify id (--id <id> or --name mon.<id>)" << std::endl;
+ exit(1);
+ }
+
+ // -- mkfs --
+ if (mkfs) {
+
+ int err = check_mon_data_exists();
+ if (err == -ENOENT) {
+ if (::mkdir(g_conf()->mon_data.c_str(), 0755)) {
+ derr << "mkdir(" << g_conf()->mon_data << ") : "
+ << cpp_strerror(errno) << dendl;
+ exit(1);
+ }
+ } else if (err < 0) {
+ derr << "error opening '" << g_conf()->mon_data << "': "
+ << cpp_strerror(-err) << dendl;
+ exit(-err);
+ }
+
+ err = check_mon_data_empty();
+ if (err == -ENOTEMPTY) {
+ // Mon may exist. Let the user know and exit gracefully.
+ derr << "'" << g_conf()->mon_data << "' already exists and is not empty"
+ << ": monitor may already exist" << dendl;
+ exit(0);
+ } else if (err < 0) {
+ derr << "error checking if '" << g_conf()->mon_data << "' is empty: "
+ << cpp_strerror(-err) << dendl;
+ exit(-err);
+ }
+
+ // resolve public_network -> public_addr
+ pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
+
+ dout(10) << "public_network " << g_conf()->public_network << dendl;
+ dout(10) << "public_addr " << g_conf()->public_addr << dendl;
+ dout(10) << "public_addrv " << g_conf()->public_addrv << dendl;
+
+ common_init_finish(g_ceph_context);
+
+ bufferlist monmapbl, osdmapbl;
+ std::string error;
+ MonMap monmap;
+
+ // load or generate monmap
+ const auto monmap_fn = g_conf().get_val<string>("monmap");
+ if (monmap_fn.length()) {
+ int err = monmapbl.read_file(monmap_fn.c_str(), &error);
+ if (err < 0) {
+ derr << argv[0] << ": error reading " << monmap_fn << ": " << error << dendl;
+ exit(1);
+ }
+ try {
+ monmap.decode(monmapbl);
+
+ // always mark seed/mkfs monmap as epoch 0
+ monmap.set_epoch(0);
+ } catch (const ceph::buffer::error& e) {
+ derr << argv[0] << ": error decoding monmap " << monmap_fn << ": " << e.what() << dendl;
+ exit(1);
+ }
+
+ dout(1) << "imported monmap:\n";
+ monmap.print(*_dout);
+ *_dout << dendl;
+
+ } else {
+ ostringstream oss;
+ int err = monmap.build_initial(g_ceph_context, true, oss);
+ if (oss.tellp())
+ derr << oss.str() << dendl;
+ if (err < 0) {
+ derr << argv[0] << ": warning: no initial monitors; must use admin socket to feed hints" << dendl;
+ }
+
+ dout(1) << "initial generated monmap:\n";
+ monmap.print(*_dout);
+ *_dout << dendl;
+
+ // am i part of the initial quorum?
+ if (monmap.contains(g_conf()->name.get_id())) {
+ // hmm, make sure the ip listed exists on the current host?
+ // maybe later.
+ } else if (!g_conf()->public_addrv.empty()) {
+ entity_addrvec_t av = g_conf()->public_addrv;
+ string name;
+ if (monmap.contains(av, &name)) {
+ monmap.rename(name, g_conf()->name.get_id());
+ dout(0) << argv[0] << ": renaming mon." << name << " " << av
+ << " to mon." << g_conf()->name.get_id() << dendl;
+ }
+ } else if (!g_conf()->public_addr.is_blank_ip()) {
+ entity_addrvec_t av = make_mon_addrs(g_conf()->public_addr);
+ string name;
+ if (monmap.contains(av, &name)) {
+ monmap.rename(name, g_conf()->name.get_id());
+ dout(0) << argv[0] << ": renaming mon." << name << " " << av
+ << " to mon." << g_conf()->name.get_id() << dendl;
+ }
+ } else {
+ // is a local address listed without a name? if so, name myself.
+ list<entity_addr_t> ls;
+ monmap.list_addrs(ls);
+ dout(0) << " monmap addrs are " << ls << ", checking if any are local"
+ << dendl;
+
+ entity_addr_t local;
+ if (have_local_addr(g_ceph_context, ls, &local)) {
+ dout(0) << " have local addr " << local << dendl;
+ string name;
+ local.set_type(entity_addr_t::TYPE_MSGR2);
+ if (!monmap.get_addr_name(local, name)) {
+ local.set_type(entity_addr_t::TYPE_LEGACY);
+ if (!monmap.get_addr_name(local, name)) {
+ dout(0) << "no local addresses appear in bootstrap monmap"
+ << dendl;
+ }
+ }
+ if (name.compare(0, 7, "noname-") == 0) {
+ dout(0) << argv[0] << ": mon." << name << " " << local
+ << " is local, renaming to mon." << g_conf()->name.get_id()
+ << dendl;
+ monmap.rename(name, g_conf()->name.get_id());
+ } else if (name.size()) {
+ dout(0) << argv[0] << ": mon." << name << " " << local
+ << " is local, but not 'noname-' + something; "
+ << "not assuming it's me" << dendl;
+ }
+ } else {
+ dout(0) << " no local addrs match monmap" << dendl;
+ }
+ }
+ }
+
+ const auto fsid = g_conf().get_val<uuid_d>("fsid");
+ if (!fsid.is_zero()) {
+ monmap.fsid = fsid;
+ dout(0) << argv[0] << ": set fsid to " << fsid << dendl;
+ }
+
+ if (monmap.fsid.is_zero()) {
+ derr << argv[0] << ": generated monmap has no fsid; use '--fsid <uuid>'" << dendl;
+ exit(10);
+ }
+
+ //monmap.print(cout);
+
+ // osdmap
+ if (osdmapfn.length()) {
+ err = osdmapbl.read_file(osdmapfn.c_str(), &error);
+ if (err < 0) {
+ derr << argv[0] << ": error reading " << osdmapfn << ": "
+ << error << dendl;
+ exit(1);
+ }
+ }
+
+ // go
+ MonitorDBStore store(g_conf()->mon_data);
+ ostringstream oss;
+ int r = store.create_and_open(oss);
+ if (oss.tellp())
+ derr << oss.str() << dendl;
+ if (r < 0) {
+ derr << argv[0] << ": error opening mon data directory at '"
+ << g_conf()->mon_data << "': " << cpp_strerror(r) << dendl;
+ exit(1);
+ }
+ ceph_assert(r == 0);
+
+ Monitor mon(g_ceph_context, g_conf()->name.get_id(), &store, 0, 0, &monmap);
+ r = mon.mkfs(osdmapbl);
+ if (r < 0) {
+ derr << argv[0] << ": error creating monfs: " << cpp_strerror(r) << dendl;
+ exit(1);
+ }
+ store.close();
+ dout(0) << argv[0] << ": created monfs at " << g_conf()->mon_data
+ << " for " << g_conf()->name << dendl;
+ return 0;
+ }
+
+ err = check_mon_data_exists();
+ if (err < 0 && err == -ENOENT) {
+ derr << "monitor data directory at '" << g_conf()->mon_data << "'"
+ << " does not exist: have you run 'mkfs'?" << dendl;
+ exit(1);
+ } else if (err < 0) {
+ derr << "error accessing monitor data directory at '"
+ << g_conf()->mon_data << "': " << cpp_strerror(-err) << dendl;
+ exit(1);
+ }
+
+ err = check_mon_data_empty();
+ if (err == 0) {
+ derr << "monitor data directory at '" << g_conf()->mon_data
+ << "' is empty: have you run 'mkfs'?" << dendl;
+ exit(1);
+ } else if (err < 0 && err != -ENOTEMPTY) {
+ // we don't want an empty data dir by now
+ derr << "error accessing '" << g_conf()->mon_data << "': "
+ << cpp_strerror(-err) << dendl;
+ exit(1);
+ }
+
+ {
+ // check fs stats. don't start if it's critically close to full.
+ ceph_data_stats_t stats;
+ int err = get_fs_stats(stats, g_conf()->mon_data.c_str());
+ if (err < 0) {
+ derr << "error checking monitor data's fs stats: " << cpp_strerror(err)
+ << dendl;
+ exit(-err);
+ }
+ if (stats.avail_percent <= g_conf()->mon_data_avail_crit) {
+ derr << "error: monitor data filesystem reached concerning levels of"
+ << " available storage space (available: "
+ << stats.avail_percent << "% " << byte_u_t(stats.byte_avail)
+ << ")\nyou may adjust 'mon data avail crit' to a lower value"
+ << " to make this go away (default: " << g_conf()->mon_data_avail_crit
+ << "%)\n" << dendl;
+ exit(ENOSPC);
+ }
+ }
+
+ // we fork early to prevent leveldb's environment static state from
+ // screwing us over
+ Preforker prefork;
+ if (!(flags & CINIT_FLAG_NO_DAEMON_ACTIONS)) {
+ if (global_init_prefork(g_ceph_context) >= 0) {
+ string err_msg;
+ err = prefork.prefork(err_msg);
+ if (err < 0) {
+ derr << err_msg << dendl;
+ prefork.exit(err);
+ }
+ if (prefork.is_parent()) {
+ err = prefork.parent_wait(err_msg);
+ if (err < 0)
+ derr << err_msg << dendl;
+ prefork.exit(err);
+ }
+ setsid();
+ global_init_postfork_start(g_ceph_context);
+ }
+ common_init_finish(g_ceph_context);
+ global_init_chdir(g_ceph_context);
+ if (global_init_preload_erasure_code(g_ceph_context) < 0)
+ prefork.exit(1);
+ }
+
+ // set up signal handlers, now that we've daemonized/forked.
+ init_async_signal_handler();
+ register_async_signal_handler(SIGHUP, sighup_handler);
+
+ MonitorDBStore *store = new MonitorDBStore(g_conf()->mon_data);
+
+ // make sure we aren't upgrading too fast
+ {
+ string val;
+ int r = store->read_meta("min_mon_release", &val);
+ if (r >= 0 && val.size()) {
+ ceph_release_t from_release = ceph_release_from_name(val);
+ ostringstream err;
+ if (!can_upgrade_from(from_release, "min_mon_release", err)) {
+ derr << err.str() << dendl;
+ prefork.exit(1);
+ }
+ }
+ }
+
+ {
+ ostringstream oss;
+ err = store->open(oss);
+ if (oss.tellp())
+ derr << oss.str() << dendl;
+ if (err < 0) {
+ derr << "error opening mon data directory at '"
+ << g_conf()->mon_data << "': " << cpp_strerror(err) << dendl;
+ prefork.exit(1);
+ }
+ }
+
+ bufferlist magicbl;
+ err = store->get(Monitor::MONITOR_NAME, "magic", magicbl);
+ if (err || !magicbl.length()) {
+ derr << "unable to read magic from mon data" << dendl;
+ prefork.exit(1);
+ }
+ string magic(magicbl.c_str(), magicbl.length()-1); // ignore trailing \n
+ if (strcmp(magic.c_str(), CEPH_MON_ONDISK_MAGIC)) {
+ derr << "mon fs magic '" << magic << "' != current '" << CEPH_MON_ONDISK_MAGIC << "'" << dendl;
+ prefork.exit(1);
+ }
+
+ err = Monitor::check_features(store);
+ if (err < 0) {
+ derr << "error checking features: " << cpp_strerror(err) << dendl;
+ prefork.exit(1);
+ }
+
+ // inject new monmap?
+ if (!inject_monmap.empty()) {
+ bufferlist bl;
+ std::string error;
+ int r = bl.read_file(inject_monmap.c_str(), &error);
+ if (r) {
+ derr << "unable to read monmap from " << inject_monmap << ": "
+ << error << dendl;
+ prefork.exit(1);
+ }
+
+ // get next version
+ version_t v = store->get("monmap", "last_committed");
+ dout(0) << "last committed monmap epoch is " << v << ", injected map will be " << (v+1)
+ << dendl;
+ v++;
+
+ // set the version
+ MonMap tmp;
+ tmp.decode(bl);
+ if (tmp.get_epoch() != v) {
+ dout(0) << "changing monmap epoch from " << tmp.get_epoch()
+ << " to " << v << dendl;
+ tmp.set_epoch(v);
+ }
+ bufferlist mapbl;
+ tmp.encode(mapbl, CEPH_FEATURES_ALL);
+ bufferlist final;
+ encode(v, final);
+ encode(mapbl, final);
+
+ auto t(std::make_shared<MonitorDBStore::Transaction>());
+ // save it
+ t->put("monmap", v, mapbl);
+ t->put("monmap", "latest", final);
+ t->put("monmap", "last_committed", v);
+ store->apply_transaction(t);
+
+ dout(0) << "done." << dendl;
+ prefork.exit(0);
+ }
+
+ // monmap?
+ MonMap monmap;
+ {
+ // note that even if we don't find a viable monmap, we should go ahead
+ // and try to build it up in the next if-else block.
+ bufferlist mapbl;
+ int err = obtain_monmap(*store, mapbl);
+ if (err >= 0) {
+ try {
+ monmap.decode(mapbl);
+ } catch (const ceph::buffer::error& e) {
+ derr << "can't decode monmap: " << e.what() << dendl;
+ }
+ } else {
+ derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;
+ }
+
+ dout(10) << __func__ << " monmap:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("monmap", monmap);
+ jf.flush(*_dout);
+ *_dout << dendl;
+
+ if (!extract_monmap.empty()) {
+ int r = mapbl.write_file(extract_monmap.c_str());
+ if (r < 0) {
+ r = -errno;
+ derr << "error writing monmap to " << extract_monmap << ": " << cpp_strerror(r) << dendl;
+ prefork.exit(1);
+ }
+ derr << "wrote monmap to " << extract_monmap << dendl;
+ prefork.exit(0);
+ }
+ }
+
+ // this is what i will bind to
+ entity_addrvec_t ipaddrs;
+
+ if (monmap.contains(g_conf()->name.get_id())) {
+ ipaddrs = monmap.get_addrs(g_conf()->name.get_id());
+
+ // print helpful warning if the conf file doesn't match
+ std::vector<std::string> my_sections = g_conf().get_my_sections();
+ std::string mon_addr_str;
+ if (g_conf().get_val_from_conf_file(my_sections, "mon addr",
+ mon_addr_str, true) == 0) {
+ entity_addr_t conf_addr;
+ if (conf_addr.parse(mon_addr_str.c_str())) {
+ entity_addrvec_t conf_addrs = make_mon_addrs(conf_addr);
+ if (ipaddrs != conf_addrs) {
+ derr << "WARNING: 'mon addr' config option " << conf_addrs
+ << " does not match monmap file" << std::endl
+ << " continuing with monmap configuration" << dendl;
+ }
+ } else
+ derr << "WARNING: invalid 'mon addr' config option" << std::endl
+ << " continuing with monmap configuration" << dendl;
+ }
+ } else {
+ dout(0) << g_conf()->name << " does not exist in monmap, will attempt to join an existing cluster" << dendl;
+
+ pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
+ if (!g_conf()->public_addrv.empty()) {
+ ipaddrs = g_conf()->public_addrv;
+ dout(0) << "using public_addrv " << ipaddrs << dendl;
+ } else if (!g_conf()->public_addr.is_blank_ip()) {
+ ipaddrs = make_mon_addrs(g_conf()->public_addr);
+ dout(0) << "using public_addr " << g_conf()->public_addr << " -> "
+ << ipaddrs << dendl;
+ } else {
+ MonMap tmpmap;
+ ostringstream oss;
+ int err = tmpmap.build_initial(g_ceph_context, true, oss);
+ if (oss.tellp())
+ derr << oss.str() << dendl;
+ if (err < 0) {
+ derr << argv[0] << ": error generating initial monmap: "
+ << cpp_strerror(err) << dendl;
+ prefork.exit(1);
+ }
+ if (tmpmap.contains(g_conf()->name.get_id())) {
+ ipaddrs = tmpmap.get_addrs(g_conf()->name.get_id());
+ } else {
+ derr << "no public_addr or public_network specified, and "
+ << g_conf()->name << " not present in monmap or ceph.conf" << dendl;
+ prefork.exit(1);
+ }
+ }
+ }
+
+ // bind
+ int rank = monmap.get_rank(g_conf()->name.get_id());
+ std::string public_msgr_type = g_conf()->ms_public_type.empty() ? g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
+ Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
+ entity_name_t::MON(rank), "mon", 0);
+ if (!msgr)
+ exit(1);
+ msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
+ msgr->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
+
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ msgr->set_policy(entity_name_t::TYPE_MON,
+ Messenger::Policy::lossless_peer_reuse(
+ CEPH_FEATURE_SERVER_LUMINOUS));
+ msgr->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(
+ CEPH_FEATURE_SERVER_LUMINOUS));
+ msgr->set_policy(entity_name_t::TYPE_CLIENT,
+ Messenger::Policy::stateless_server(0));
+ msgr->set_policy(entity_name_t::TYPE_MDS,
+ Messenger::Policy::stateless_server(0));
+
+ // throttle client traffic
+ Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
+ g_conf()->mon_client_bytes);
+ msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ client_throttler, NULL);
+
+ // throttle daemon traffic
+ // NOTE: actual usage on the leader may multiply by the number of
+ // monitors if they forward large update messages from daemons.
+ Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
+ g_conf()->mon_daemon_bytes);
+ msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler,
+ NULL);
+ msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler,
+ NULL);
+
+ entity_addrvec_t bind_addrs = ipaddrs;
+ entity_addrvec_t public_addrs = ipaddrs;
+
+ // check if the public_bind_addr option is set
+ if (!g_conf()->public_bind_addr.is_blank_ip()) {
+ bind_addrs = make_mon_addrs(g_conf()->public_bind_addr);
+ }
+
+ dout(0) << "starting " << g_conf()->name << " rank " << rank
+ << " at public addrs " << public_addrs
+ << " at bind addrs " << bind_addrs
+ << " mon_data " << g_conf()->mon_data
+ << " fsid " << monmap.get_fsid()
+ << dendl;
+
+ Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
+ entity_name_t::MON(rank), "mon-mgrc",
+ Messenger::get_pid_nonce());
+ if (!mgr_msgr) {
+ derr << "unable to create mgr_msgr" << dendl;
+ prefork.exit(1);
+ }
+
+ mon = new Monitor(g_ceph_context, g_conf()->name.get_id(), store,
+ msgr, mgr_msgr, &monmap);
+
+ mon->orig_argc = argc;
+ mon->orig_argv = argv;
+
+ if (force_sync) {
+ derr << "flagging a forced sync ..." << dendl;
+ ostringstream oss;
+ JSONFormatter jf(true);
+ mon->sync_force(&jf);
+ derr << "out:\n";
+ jf.flush(*_dout);
+ *_dout << dendl;
+ }
+
+ err = mon->preinit();
+ if (err < 0) {
+ derr << "failed to initialize" << dendl;
+ prefork.exit(1);
+ }
+
+ if (compact || g_conf()->mon_compact_on_start) {
+ derr << "compacting monitor store ..." << dendl;
+ mon->store->compact();
+ derr << "done compacting" << dendl;
+ }
+
+ // bind
+ err = msgr->bindv(bind_addrs);
+ if (err < 0) {
+ derr << "unable to bind monitor to " << bind_addrs << dendl;
+ prefork.exit(1);
+ }
+
+ // if the public and bind addr are different set the msgr addr
+ // to the public one, now that the bind is complete.
+ if (public_addrs != bind_addrs) {
+ msgr->set_addrs(public_addrs);
+ }
+
+ if (g_conf()->daemonize) {
+ global_init_postfork_finish(g_ceph_context);
+ prefork.daemonize();
+ }
+
+ msgr->start();
+ mgr_msgr->start();
+
+ mon->set_mon_crush_location(crush_loc);
+ mon->init();
+
+ register_async_signal_handler_oneshot(SIGINT, handle_mon_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal);
+
+ if (g_conf()->inject_early_sigterm)
+ kill(getpid(), SIGTERM);
+
+ msgr->wait();
+ mgr_msgr->wait();
+
+ store->close();
+
+ unregister_async_signal_handler(SIGHUP, sighup_handler);
+ unregister_async_signal_handler(SIGINT, handle_mon_signal);
+ unregister_async_signal_handler(SIGTERM, handle_mon_signal);
+ shutdown_async_signal_handler();
+
+ delete mon;
+ delete store;
+ delete msgr;
+ delete mgr_msgr;
+ delete client_throttler;
+ delete daemon_throttler;
+
+ // cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
+ char s[20];
+ snprintf(s, sizeof(s), "gmon/%d", getpid());
+ if ((mkdir(s, 0755) == 0) && (chdir(s) == 0)) {
+ dout(0) << "ceph-mon: gmon.out should be in " << s << dendl;
+ }
+
+ prefork.signal_exit(0);
+ return 0;
+}