diff options
Diffstat (limited to '')
-rw-r--r-- | src/common/Journald.cc | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/src/common/Journald.cc b/src/common/Journald.cc new file mode 100644 index 000000000..a1321c7ee --- /dev/null +++ b/src/common/Journald.cc @@ -0,0 +1,319 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Journald.h" + +#include <endian.h> +#include <fcntl.h> +#include <iterator> +#include <memory> +#include <string> +#include <sys/mman.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <sys/un.h> +#include <syslog.h> +#include <unistd.h> +#include <fmt/format.h> +#include <fmt/ostream.h> + +#include "include/ceph_assert.h" +#include "common/LogEntry.h" +#include "log/Entry.h" +#include "log/SubsystemMap.h" +#include "msg/msg_fmt.h" + + +namespace ceph::logging { + +namespace { +const struct sockaddr_un sockaddr = { + AF_UNIX, + "/run/systemd/journal/socket", +}; + +ssize_t sendmsg_fd(int transport_fd, int fd) +{ + constexpr size_t control_len = CMSG_LEN(sizeof(int)); + char control[control_len]; + struct msghdr mh = { + (struct sockaddr*)&sockaddr, // msg_name + sizeof(sockaddr), // msg_namelen + nullptr, // msg_iov + 0, // msg_iovlen + &control, // msg_control + control_len, // msg_controllen + }; + ceph_assert(transport_fd >= 0); + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&mh); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + *reinterpret_cast<int *>(CMSG_DATA(cmsg)) = fd; + + return sendmsg(transport_fd, &mh, MSG_NOSIGNAL); +} + +char map_prio(short ceph_prio) +{ + if (ceph_prio < 0) + return LOG_ERR; + if (ceph_prio == 0) + return LOG_WARNING; + if (ceph_prio < 5) + return LOG_NOTICE; + if (ceph_prio < 10) + return LOG_INFO; + return LOG_DEBUG; +} +} + +namespace detail { +class EntryEncoderBase { + public: + EntryEncoderBase(): + m_msg_vec { + {}, {}, {}, { (char *)"\n", 1 }, + } + { + std::string id = program_invocation_short_name; + for (auto& c : id) { + if (c == '\n') + c = '_'; + } + static_segment = "SYSLOG_IDENTIFIER=" + id + "\n"; + m_msg_vec[0].iov_base = static_segment.data(); + m_msg_vec[0].iov_len = static_segment.size(); + } + + constexpr struct iovec *iovec() { return this->m_msg_vec; } + constexpr std::size_t iovec_len() + { + return sizeof(m_msg_vec) / sizeof(m_msg_vec[0]); + } + + private: + struct iovec m_msg_vec[4]; + std::string static_segment; + + protected: + fmt::memory_buffer meta_buf; + + struct iovec &meta_vec() { return m_msg_vec[1]; } + struct iovec &msg_vec() { return m_msg_vec[2]; } +}; + +class EntryEncoder : public EntryEncoderBase { + public: + void encode(const Entry& e, const SubsystemMap *s) + { + meta_buf.clear(); + fmt::format_to(std::back_inserter(meta_buf), + R"(PRIORITY={:d} +CEPH_SUBSYS={} +TIMESTAMP={} +CEPH_PRIO={} +THREAD={:016x} +MESSAGE +)", + map_prio(e.m_prio), + s->get_name(e.m_subsys), + e.m_stamp.time_since_epoch().count().count, + e.m_prio, + e.m_thread); + + uint64_t msg_len = htole64(e.size()); + meta_buf.resize(meta_buf.size() + sizeof(msg_len)); + *(reinterpret_cast<uint64_t*>(meta_buf.end()) - 1) = htole64(e.size()); + + meta_vec().iov_base = meta_buf.data(); + meta_vec().iov_len = meta_buf.size(); + + msg_vec().iov_base = (void *)e.strv().data(); + msg_vec().iov_len = e.size(); + } +}; + +class LogEntryEncoder : public EntryEncoderBase { + public: + void encode(const LogEntry& le) + { + meta_buf.clear(); + fmt::format_to(std::back_inserter(meta_buf), + R"(PRIORITY={:d} +TIMESTAMP={} +CEPH_NAME={} +CEPH_RANK={} +CEPH_SEQ={} +CEPH_CHANNEL={} +MESSAGE +)", + clog_type_to_syslog_level(le.prio), + le.stamp.to_nsec(), + le.name.to_str(), + le.rank, + le.seq, + le.channel); + + uint64_t msg_len = htole64(le.msg.size()); + meta_buf.resize(meta_buf.size() + sizeof(msg_len)); + *(reinterpret_cast<uint64_t*>(meta_buf.end()) - 1) = htole64(le.msg.size()); + + meta_vec().iov_base = meta_buf.data(); + meta_vec().iov_len = meta_buf.size(); + + msg_vec().iov_base = (void *)le.msg.data(); + msg_vec().iov_len = le.msg.size(); + } +}; + +enum class JournaldClient::MemFileMode { + MEMFD_CREATE, + OPEN_TMPFILE, + OPEN_UNLINK, +}; + +constexpr const char *mem_file_dir = "/dev/shm"; + +void JournaldClient::detect_mem_file_mode() +{ + int memfd = memfd_create("ceph-journald", MFD_ALLOW_SEALING | MFD_CLOEXEC); + if (memfd >= 0) { + mem_file_mode = MemFileMode::MEMFD_CREATE; + close(memfd); + return; + } + memfd = open(mem_file_dir, O_TMPFILE | O_EXCL | O_CLOEXEC, S_IRUSR | S_IWUSR); + if (memfd >= 0) { + mem_file_mode = MemFileMode::OPEN_TMPFILE; + close(memfd); + return; + } + mem_file_mode = MemFileMode::OPEN_UNLINK; +} + +int JournaldClient::open_mem_file() +{ + switch (mem_file_mode) { + case MemFileMode::MEMFD_CREATE: + return memfd_create("ceph-journald", MFD_ALLOW_SEALING | MFD_CLOEXEC); + case MemFileMode::OPEN_TMPFILE: + return open(mem_file_dir, O_TMPFILE | O_EXCL | O_CLOEXEC, S_IRUSR | S_IWUSR); + case MemFileMode::OPEN_UNLINK: + char mem_file_template[] = "/dev/shm/ceph-journald-XXXXXX"; + int fd = mkostemp(mem_file_template, O_CLOEXEC); + unlink(mem_file_template); + return fd; + } + ceph_abort("Unexpected mem_file_mode"); +} + +JournaldClient::JournaldClient() : + m_msghdr({ + (struct sockaddr*)&sockaddr, // msg_name + sizeof(sockaddr), // msg_namelen + }) +{ + fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0); + ceph_assertf(fd > 0, "socket creation failed: %s", strerror(errno)); + + int sendbuf = 2 * 1024 * 1024; + setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendbuf, sizeof(sendbuf)); + + detect_mem_file_mode(); +} + +JournaldClient::~JournaldClient() +{ + close(fd); +} + +int JournaldClient::send() +{ + int ret = sendmsg(fd, &m_msghdr, MSG_NOSIGNAL); + if (ret >= 0) + return 0; + + /* Fail silently if the journal is not available */ + if (errno == ENOENT) + return -1; + + if (errno != EMSGSIZE && errno != ENOBUFS) { + std::cerr << "Failed to send log to journald: " << strerror(errno) << std::endl; + return -1; + } + /* Message doesn't fit... Let's dump the data in a memfd and + * just pass a file descriptor of it to the other side. + */ + int buffer_fd = open_mem_file(); + if (buffer_fd < 0) { + std::cerr << "Failed to open buffer_fd while sending log to journald: " << strerror(errno) << std::endl; + return -1; + } + + ret = writev(buffer_fd, m_msghdr.msg_iov, m_msghdr.msg_iovlen); + if (ret < 0) { + std::cerr << "Failed to write to buffer_fd while sending log to journald: " << strerror(errno) << std::endl; + goto err_close_buffer_fd; + } + + if (mem_file_mode == MemFileMode::MEMFD_CREATE) { + ret = fcntl(buffer_fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL); + if (ret) { + std::cerr << "Failed to seal buffer_fd while sending log to journald: " << strerror(errno) << std::endl; + goto err_close_buffer_fd; + } + } + + ret = sendmsg_fd(fd, buffer_fd); + if (ret < 0) { + /* Fail silently if the journal is not available */ + if (errno == ENOENT) + goto err_close_buffer_fd; + + std::cerr << "Failed to send fd while sending log to journald: " << strerror(errno) << std::endl; + goto err_close_buffer_fd; + } + close(buffer_fd); + return 0; + +err_close_buffer_fd: + close(buffer_fd); + return -1; +} + +} // namespace ceph::logging::detail + +JournaldLogger::JournaldLogger(const SubsystemMap *s) : + m_entry_encoder(std::make_unique<detail::EntryEncoder>()), + m_subs(s) +{ + client.m_msghdr.msg_iov = m_entry_encoder->iovec(); + client.m_msghdr.msg_iovlen = m_entry_encoder->iovec_len(); +} + +JournaldLogger::~JournaldLogger() = default; + +int JournaldLogger::log_entry(const Entry& e) +{ + m_entry_encoder->encode(e, m_subs); + return client.send(); +} + +JournaldClusterLogger::JournaldClusterLogger() : + m_log_entry_encoder(std::make_unique<detail::LogEntryEncoder>()) +{ + client.m_msghdr.msg_iov = m_log_entry_encoder->iovec(); + client.m_msghdr.msg_iovlen = m_log_entry_encoder->iovec_len(); +} + +JournaldClusterLogger::~JournaldClusterLogger() = default; + +int JournaldClusterLogger::log_log_entry(const LogEntry &le) +{ + m_log_entry_encoder->encode(le); + return client.send(); +} + +} |