summaryrefslogtreecommitdiffstats
path: root/src/common/OutputDataSocket.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/common/OutputDataSocket.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/common/OutputDataSocket.cc')
-rw-r--r--src/common/OutputDataSocket.cc407
1 files changed, 407 insertions, 0 deletions
diff --git a/src/common/OutputDataSocket.cc b/src/common/OutputDataSocket.cc
new file mode 100644
index 000000000..5828daebc
--- /dev/null
+++ b/src/common/OutputDataSocket.cc
@@ -0,0 +1,407 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <poll.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include "common/OutputDataSocket.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/safe_io.h"
+#include "include/compat.h"
+#include "include/sock_compat.h"
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_asok
+#undef dout_prefix
+#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
+
+using std::ostringstream;
+
+/*
+ * UNIX domain sockets created by an application persist even after that
+ * application closes, unless they're explicitly unlinked. This is because the
+ * directory containing the socket keeps a reference to the socket.
+ *
+ * This code makes things a little nicer by unlinking those dead sockets when
+ * the application exits normally.
+ */
+static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
+static std::vector <const char*> cleanup_files;
+static bool cleanup_atexit = false;
+
+static void remove_cleanup_file(const char *file)
+{
+ pthread_mutex_lock(&cleanup_lock);
+ VOID_TEMP_FAILURE_RETRY(unlink(file));
+ for (std::vector <const char*>::iterator i = cleanup_files.begin();
+ i != cleanup_files.end(); ++i) {
+ if (strcmp(file, *i) == 0) {
+ free((void*)*i);
+ cleanup_files.erase(i);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&cleanup_lock);
+}
+
+static void remove_all_cleanup_files()
+{
+ pthread_mutex_lock(&cleanup_lock);
+ for (std::vector <const char*>::iterator i = cleanup_files.begin();
+ i != cleanup_files.end(); ++i) {
+ VOID_TEMP_FAILURE_RETRY(unlink(*i));
+ free((void*)*i);
+ }
+ cleanup_files.clear();
+ pthread_mutex_unlock(&cleanup_lock);
+}
+
+static void add_cleanup_file(const char *file)
+{
+ char *fname = strdup(file);
+ if (!fname)
+ return;
+ pthread_mutex_lock(&cleanup_lock);
+ cleanup_files.push_back(fname);
+ if (!cleanup_atexit) {
+ atexit(remove_all_cleanup_files);
+ cleanup_atexit = true;
+ }
+ pthread_mutex_unlock(&cleanup_lock);
+}
+
+
+OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
+ : m_cct(cct),
+ data_max_backlog(_backlog),
+ m_sock_fd(-1),
+ m_shutdown_rd_fd(-1),
+ m_shutdown_wr_fd(-1),
+ going_down(false),
+ data_size(0),
+ skipped(0)
+{
+}
+
+OutputDataSocket::~OutputDataSocket()
+{
+ shutdown();
+}
+
+/*
+ * This thread listens on the UNIX domain socket for incoming connections.
+ * It only handles one connection at a time at the moment. All I/O is nonblocking,
+ * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
+ *
+ * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
+ * pipe, the thread terminates itself gracefully, allowing the
+ * OutputDataSocketConfigObs class to join() it.
+ */
+
+#define PFL_SUCCESS ((void*)(intptr_t)0)
+#define PFL_FAIL ((void*)(intptr_t)1)
+
+std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
+{
+ int pipefd[2];
+ if (pipe_cloexec(pipefd, 0) < 0) {
+ int e = errno;
+ ostringstream oss;
+ oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e);
+ return oss.str();
+ }
+
+ *pipe_rd = pipefd[0];
+ *pipe_wr = pipefd[1];
+ return "";
+}
+
+std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
+{
+ ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
+
+ struct sockaddr_un address;
+ if (sock_path.size() > sizeof(address.sun_path) - 1) {
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "The UNIX domain socket path " << sock_path << " is too long! The "
+ << "maximum length on this system is "
+ << (sizeof(address.sun_path) - 1);
+ return oss.str();
+ }
+ int sock_fd = socket_cloexec(PF_UNIX, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "failed to create socket: " << cpp_strerror(err);
+ return oss.str();
+ }
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&address, 0, sizeof(struct sockaddr_un));
+ address.sun_family = AF_UNIX;
+ snprintf(address.sun_path, sizeof(address.sun_path),
+ "%s", sock_path.c_str());
+ if (::bind(sock_fd, (struct sockaddr*)&address,
+ sizeof(struct sockaddr_un)) != 0) {
+ int err = errno;
+ if (err == EADDRINUSE) {
+ // The old UNIX domain socket must still be there.
+ // Let's unlink it and try again.
+ VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
+ if (::bind(sock_fd, (struct sockaddr*)&address,
+ sizeof(struct sockaddr_un)) == 0) {
+ err = 0;
+ }
+ else {
+ err = errno;
+ }
+ }
+ if (err != 0) {
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "failed to bind the UNIX domain socket to '" << sock_path
+ << "': " << cpp_strerror(err);
+ close(sock_fd);
+ return oss.str();
+ }
+ }
+ if (listen(sock_fd, 5) != 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "OutputDataSocket::bind_and_listen: "
+ << "failed to listen to socket: " << cpp_strerror(err);
+ close(sock_fd);
+ VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
+ return oss.str();
+ }
+ *fd = sock_fd;
+ return "";
+}
+
+void* OutputDataSocket::entry()
+{
+ ldout(m_cct, 5) << "entry start" << dendl;
+ while (true) {
+ struct pollfd fds[2];
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(fds, 0, sizeof(fds));
+ fds[0].fd = m_sock_fd;
+ fds[0].events = POLLIN | POLLRDBAND;
+ fds[1].fd = m_shutdown_rd_fd;
+ fds[1].events = POLLIN | POLLRDBAND;
+
+ int ret = poll(fds, 2, -1);
+ if (ret < 0) {
+ int err = errno;
+ if (err == EINTR) {
+ continue;
+ }
+ lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
+ << cpp_strerror(err) << dendl;
+ return PFL_FAIL;
+ }
+
+ if (fds[0].revents & POLLIN) {
+ // Send out some data
+ do_accept();
+ }
+ if (fds[1].revents & POLLIN) {
+ // Parent wants us to shut down
+ return PFL_SUCCESS;
+ }
+ }
+ ldout(m_cct, 5) << "entry exit" << dendl;
+
+ return PFL_SUCCESS; // unreachable
+}
+
+
+bool OutputDataSocket::do_accept()
+{
+ struct sockaddr_un address;
+ socklen_t address_length = sizeof(address);
+ ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
+ int connection_fd = accept_cloexec(m_sock_fd, (struct sockaddr*) &address,
+ &address_length);
+ if (connection_fd < 0) {
+ int err = errno;
+ lderr(m_cct) << "OutputDataSocket: do_accept error: '"
+ << cpp_strerror(err) << dendl;
+ return false;
+ }
+ ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
+
+ handle_connection(connection_fd);
+ close_connection(connection_fd);
+
+ return 0;
+}
+
+void OutputDataSocket::handle_connection(int fd)
+{
+ ceph::buffer::list bl;
+
+ m_lock.lock();
+ init_connection(bl);
+ m_lock.unlock();
+
+ if (bl.length()) {
+ /* need to special case the connection init buffer output, as it needs
+ * to be dumped before any data, including older data that was sent
+ * before the connection was established, or before we identified
+ * older connection was broken
+ */
+ int ret = safe_write(fd, bl.c_str(), bl.length());
+ if (ret < 0) {
+ return;
+ }
+ }
+
+ int ret = dump_data(fd);
+ if (ret < 0)
+ return;
+
+ do {
+ {
+ std::unique_lock l(m_lock);
+ if (!going_down) {
+ cond.wait(l);
+ }
+ if (going_down) {
+ break;
+ }
+ }
+ ret = dump_data(fd);
+ } while (ret >= 0);
+}
+
+int OutputDataSocket::dump_data(int fd)
+{
+ m_lock.lock();
+ auto l = std::move(data);
+ data.clear();
+ data_size = 0;
+ m_lock.unlock();
+
+ for (auto iter = l.begin(); iter != l.end(); ++iter) {
+ ceph::buffer::list& bl = *iter;
+ int ret = safe_write(fd, bl.c_str(), bl.length());
+ if (ret >= 0) {
+ ret = safe_write(fd, delim.c_str(), delim.length());
+ }
+ if (ret < 0) {
+ std::scoped_lock lock(m_lock);
+ for (; iter != l.end(); ++iter) {
+ ceph::buffer::list& bl = *iter;
+ data.push_back(bl);
+ data_size += bl.length();
+ }
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+void OutputDataSocket::close_connection(int fd)
+{
+ VOID_TEMP_FAILURE_RETRY(close(fd));
+}
+
+bool OutputDataSocket::init(const std::string &path)
+{
+ ldout(m_cct, 5) << "init " << path << dendl;
+
+ /* Set up things for the new thread */
+ std::string err;
+ int pipe_rd = -1, pipe_wr = -1;
+ err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
+ if (!err.empty()) {
+ lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
+ return false;
+ }
+ int sock_fd;
+ err = bind_and_listen(path, &sock_fd);
+ if (!err.empty()) {
+ lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
+ close(pipe_rd);
+ close(pipe_wr);
+ return false;
+ }
+
+ /* Create new thread */
+ m_sock_fd = sock_fd;
+ m_shutdown_rd_fd = pipe_rd;
+ m_shutdown_wr_fd = pipe_wr;
+ m_path = path;
+ create("out_data_socket");
+ add_cleanup_file(m_path.c_str());
+ return true;
+}
+
+void OutputDataSocket::shutdown()
+{
+ m_lock.lock();
+ going_down = true;
+ cond.notify_all();
+ m_lock.unlock();
+
+ if (m_shutdown_wr_fd < 0)
+ return;
+
+ ldout(m_cct, 5) << "shutdown" << dendl;
+
+ // Send a byte to the shutdown pipe that the thread is listening to
+ char buf[1] = { 0x0 };
+ int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
+ VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
+ m_shutdown_wr_fd = -1;
+
+ if (ret == 0) {
+ join();
+ } else {
+ lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
+ "to thread shutdown pipe: error " << ret << dendl;
+ }
+
+ remove_cleanup_file(m_path.c_str());
+ m_path.clear();
+}
+
+void OutputDataSocket::append_output(ceph::buffer::list& bl)
+{
+ std::lock_guard l(m_lock);
+
+ if (data_size + bl.length() > data_max_backlog) {
+ if (skipped % 100 == 0) {
+ ldout(m_cct, 0) << "dropping data output, max backlog reached (skipped=="
+ << skipped << ")"
+ << dendl;
+ skipped = 1;
+ } else
+ ++skipped;
+
+ cond.notify_all();
+ return;
+ }
+
+ data.push_back(bl);
+ data_size += bl.length();
+ cond.notify_all();
+}