summaryrefslogtreecommitdiffstats
path: root/src/tailer/tailer.looper.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 20:01:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 20:01:36 +0000
commit62e4c68907d8d33709c2c1f92a161dff00b3d5f2 (patch)
treeadbbaf3acf88ea08f6eeec4b75ee98ad3b07fbdc /src/tailer/tailer.looper.cc
parentInitial commit. (diff)
downloadlnav-62e4c68907d8d33709c2c1f92a161dff00b3d5f2.tar.xz
lnav-62e4c68907d8d33709c2c1f92a161dff00b3d5f2.zip
Adding upstream version 0.11.2.upstream/0.11.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tailer/tailer.looper.cc')
-rw-r--r--src/tailer/tailer.looper.cc1192
1 files changed, 1192 insertions, 0 deletions
diff --git a/src/tailer/tailer.looper.cc b/src/tailer/tailer.looper.cc
new file mode 100644
index 0000000..82a9fdc
--- /dev/null
+++ b/src/tailer/tailer.looper.cc
@@ -0,0 +1,1192 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <regex>
+
+#include "tailer.looper.hh"
+
+#include "base/fs_util.hh"
+#include "base/humanize.network.hh"
+#include "base/lnav_log.hh"
+#include "base/paths.hh"
+#include "config.h"
+#include "line_buffer.hh"
+#include "lnav.hh"
+#include "lnav.indexing.hh"
+#include "service_tags.hh"
+#include "tailer.h"
+#include "tailer.looper.cfg.hh"
+#include "tailerbin.h"
+#include "tailerpp.hh"
+
+using namespace std::chrono_literals;
+
+static const auto HOST_RETRY_DELAY = 1min;
+
+static void
+read_err_pipe(const std::string& netloc,
+ auto_fd& err,
+ std::vector<std::string>& eq)
+{
+ line_buffer lb;
+ file_range pipe_range;
+ bool done = false;
+
+ log_info("stderr reader started...");
+ lb.set_fd(err);
+ while (!done) {
+ auto load_res = lb.load_next_line(pipe_range);
+
+ if (load_res.isErr()) {
+ done = true;
+ } else {
+ auto li = load_res.unwrap();
+
+ pipe_range = li.li_file_range;
+ if (li.li_file_range.empty()) {
+ done = true;
+ } else {
+ lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) {
+ auto line_str
+ = string_fragment(sbr.get_data(), 0, sbr.length())
+ .trim("\n");
+ if (eq.size() < 10) {
+ eq.template emplace_back(line_str.to_string());
+ }
+
+ auto level = line_str.startswith("error:")
+ ? lnav_log_level_t::ERROR
+ : line_str.startswith("warning:")
+ ? lnav_log_level_t::WARNING
+ : line_str.startswith("info:")
+ ? lnav_log_level_t::INFO
+ : lnav_log_level_t::DEBUG;
+ log_msg_wrapper(level,
+ "tailer[%s] %.*s",
+ netloc.c_str(),
+ line_str.length(),
+ line_str.data());
+ });
+ }
+ }
+ }
+}
+
+static void
+update_tailer_progress(const std::string& netloc, const std::string& msg)
+{
+ lnav_data.ld_active_files.fc_progress->writeAccess()
+ ->sp_tailers[netloc]
+ .tp_message
+ = msg;
+}
+
+static void
+update_tailer_description(
+ const std::string& netloc,
+ const std::map<std::string, logfile_open_options_base>& desired_paths,
+ const std::string& remote_uname)
+{
+ std::vector<std::string> paths;
+
+ for (const auto& des_pair : desired_paths) {
+ paths.emplace_back(
+ fmt::format(FMT_STRING("{}{}"), netloc, des_pair.first));
+ }
+ isc::to<main_looper&, services::main_t>().send(
+ [netloc, paths, remote_uname](auto& mlooper) {
+ auto& fc = lnav_data.ld_active_files;
+
+ for (const auto& path : paths) {
+ auto iter = fc.fc_other_files.find(path);
+
+ if (iter == fc.fc_other_files.end()) {
+ continue;
+ }
+
+ iter->second.ofd_description = remote_uname;
+ }
+ fc.fc_name_to_errors.erase(netloc);
+ });
+}
+
+void
+tailer::looper::loop_body()
+{
+ auto now = std::chrono::steady_clock::now();
+ std::vector<std::string> to_erase;
+
+ for (auto& qpair : this->l_netlocs_to_paths) {
+ auto& netloc = qpair.first;
+ auto& rpq = qpair.second;
+
+ if (now < rpq.rpq_next_attempt_time) {
+ continue;
+ }
+ if (this->l_remotes.count(netloc) == 0) {
+ auto create_res = host_tailer::for_host(netloc);
+
+ if (create_res.isErr()) {
+ report_error(netloc, create_res.unwrapErr());
+ if (std::any_of(
+ rpq.rpq_new_paths.begin(),
+ rpq.rpq_new_paths.end(),
+ [](const auto& pair) { return !pair.second.loo_tail; }))
+ {
+ rpq.send_synced_to_main(netloc);
+ to_erase.push_back(netloc);
+ } else {
+ rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
+ }
+ continue;
+ }
+
+ auto ht = create_res.unwrap();
+ this->l_remotes[netloc] = ht;
+ this->s_children.add_child_service(ht);
+
+ rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(),
+ rpq.rpq_existing_paths.end());
+ rpq.rpq_existing_paths.clear();
+ }
+
+ if (!rpq.rpq_new_paths.empty()) {
+ log_debug("%s: new paths to monitor -- %s",
+ netloc.c_str(),
+ rpq.rpq_new_paths.begin()->first.c_str());
+ this->l_remotes[netloc]->send(
+ [paths = rpq.rpq_new_paths](auto& ht) {
+ for (const auto& pair : paths) {
+ log_debug("adding path to tailer -- %s",
+ pair.first.c_str());
+ ht.open_remote_path(pair.first, std::move(pair.second));
+ }
+ });
+
+ rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(),
+ rpq.rpq_new_paths.end());
+ rpq.rpq_new_paths.clear();
+ }
+ }
+
+ for (const auto& netloc : to_erase) {
+ this->l_netlocs_to_paths.erase(netloc);
+ }
+}
+
+void
+tailer::looper::add_remote(const network::path& path,
+ logfile_open_options_base options)
+{
+ auto netloc_str = fmt::to_string(path.home());
+ this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path]
+ = std::move(options);
+}
+
+void
+tailer::looper::load_preview(int64_t id, const network::path& path)
+{
+ auto netloc_str = fmt::to_string(path.home());
+ auto iter = this->l_remotes.find(netloc_str);
+
+ if (iter == this->l_remotes.end()) {
+ auto create_res = host_tailer::for_host(netloc_str);
+
+ if (create_res.isErr()) {
+ auto msg = create_res.unwrapErr();
+ isc::to<main_looper&, services::main_t>().send(
+ [id, msg](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != id) {
+ return;
+ }
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .clear();
+ lnav_data.ld_preview_source.clear();
+ lnav_data.ld_bottom_source.grep_error(msg);
+ });
+ return;
+ }
+
+ auto ht = create_res.unwrap();
+ this->l_remotes[netloc_str] = ht;
+ this->s_children.add_child_service(ht);
+ }
+
+ this->l_remotes[netloc_str]->send([id, file_path = path.p_path](auto& ht) {
+ ht.load_preview(id, file_path);
+ });
+}
+
+void
+tailer::looper::complete_path(const network::path& path)
+{
+ auto netloc_str = fmt::to_string(path.home());
+ auto iter = this->l_remotes.find(netloc_str);
+
+ if (iter == this->l_remotes.end()) {
+ auto create_res = host_tailer::for_host(netloc_str);
+
+ if (create_res.isErr()) {
+ return;
+ }
+
+ auto ht = create_res.unwrap();
+ this->l_remotes[netloc_str] = ht;
+ this->s_children.add_child_service(ht);
+ }
+
+ this->l_remotes[netloc_str]->send(
+ [file_path = path.p_path](auto& ht) { ht.complete_path(file_path); });
+}
+
+static std::vector<std::string>
+create_ssh_args_from_config(const std::string& dest)
+{
+ const auto& cfg = injector::get<const tailer::config&>();
+ std::vector<std::string> retval;
+
+ retval.emplace_back(cfg.c_ssh_cmd);
+ if (!cfg.c_ssh_flags.empty()) {
+ if (startswith(cfg.c_ssh_flags, "-")) {
+ retval.emplace_back(cfg.c_ssh_flags);
+ } else {
+ retval.emplace_back(
+ fmt::format(FMT_STRING("-{}"), cfg.c_ssh_flags));
+ }
+ }
+ for (const auto& pair : cfg.c_ssh_options) {
+ if (pair.second.empty()) {
+ continue;
+ }
+ retval.emplace_back(fmt::format(FMT_STRING("-{}"), pair.first));
+ retval.emplace_back(pair.second);
+ }
+ for (const auto& pair : cfg.c_ssh_config) {
+ if (pair.second.empty()) {
+ continue;
+ }
+ retval.emplace_back(
+ fmt::format(FMT_STRING("-o{}={}"), pair.first, pair.second));
+ }
+ retval.emplace_back(dest);
+
+ return retval;
+}
+
+Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
+tailer::looper::host_tailer::for_host(const std::string& netloc)
+{
+ log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
+
+ update_tailer_progress(netloc, "Transferring tailer...");
+
+ auto& cfg = injector::get<const tailer::config&>();
+ auto tailer_bin_name = fmt::format(FMT_STRING("tailer.bin.{}"), getpid());
+
+ auto rp = humanize::network::path::from_str(netloc).value();
+ auto ssh_dest = rp.p_locality.l_hostname;
+ if (rp.p_locality.l_username.has_value()) {
+ ssh_dest = fmt::format(FMT_STRING("{}@{}"),
+ rp.p_locality.l_username.value(),
+ rp.p_locality.l_hostname);
+ }
+
+ {
+ auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
+ auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
+ auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
+ auto child = TRY(lnav::pid::from_fork());
+
+ in_pipe.after_fork(child.in());
+ out_pipe.after_fork(child.in());
+ err_pipe.after_fork(child.in());
+
+ if (child.in_child()) {
+ auto arg_strs = create_ssh_args_from_config(ssh_dest);
+ std::vector<char*> args;
+
+ arg_strs.emplace_back(
+ fmt::format(cfg.c_transfer_cmd, tailer_bin_name));
+
+ fmt::print(stderr,
+ "tailer({}): executing -- {}\n",
+ netloc,
+ fmt::join(arg_strs, " "));
+ for (const auto& arg : arg_strs) {
+ args.push_back((char*) arg.data());
+ }
+ args.push_back(nullptr);
+
+ execvp(cfg.c_ssh_cmd.c_str(), args.data());
+ _exit(EXIT_FAILURE);
+ }
+
+ std::vector<std::string> error_queue;
+ log_debug("tailer(%s): starting err reader", netloc.c_str());
+ std::thread err_reader([netloc,
+ err = std::move(err_pipe.read_end()),
+ &error_queue]() mutable {
+ log_set_thread_prefix(
+ fmt::format(FMT_STRING("tailer({})"), netloc));
+ read_err_pipe(netloc, err, error_queue);
+ });
+
+ log_debug("tailer(%s): writing to child", netloc.c_str());
+ auto sf = tailer_bin[0].to_string_fragment();
+ ssize_t total_bytes = 0;
+ bool write_failed = false;
+
+ while (total_bytes < sf.length()) {
+ log_debug("attempting to write %d", sf.length() - total_bytes);
+ auto rc = write(
+ in_pipe.write_end(), sf.data(), sf.length() - total_bytes);
+
+ if (rc < 0) {
+ log_error(" tailer(%s): write failed -- %s",
+ netloc.c_str(),
+ strerror(errno));
+ write_failed = true;
+ break;
+ }
+ log_debug(" wrote %d", rc);
+ total_bytes += rc;
+ }
+
+ in_pipe.write_end().reset();
+
+ while (!write_failed) {
+ char buffer[1024];
+
+ auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer));
+ if (rc < 0) {
+ break;
+ }
+ if (rc == 0) {
+ break;
+ }
+ log_debug("tailer(%s): transfer output -- %.*s",
+ netloc.c_str(),
+ rc,
+ buffer);
+ }
+
+ auto finished_child = std::move(child).wait_for_child();
+
+ err_reader.join();
+ if (!finished_child.was_normal_exit()
+ || finished_child.exit_status() != EXIT_SUCCESS)
+ {
+ auto error_msg = error_queue.empty() ? "unknown"
+ : error_queue.back();
+ return Err(fmt::format(FMT_STRING("failed to ssh to host: {}"),
+ error_msg));
+ }
+ }
+
+ update_tailer_progress(netloc, "Starting tailer...");
+
+ auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
+ auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
+ auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
+ auto child = TRY(lnav::pid::from_fork());
+
+ in_pipe.after_fork(child.in());
+ out_pipe.after_fork(child.in());
+ err_pipe.after_fork(child.in());
+
+ if (child.in_child()) {
+ auto arg_strs = create_ssh_args_from_config(ssh_dest);
+ std::vector<char*> args;
+
+ arg_strs.emplace_back(fmt::format(cfg.c_start_cmd, tailer_bin_name));
+
+ fmt::print(stderr,
+ FMT_STRING("tailer({}): executing -- {}\n"),
+ netloc,
+ fmt::join(arg_strs, " "));
+ for (const auto& arg : arg_strs) {
+ args.push_back((char*) arg.data());
+ }
+ args.push_back(nullptr);
+
+ execvp(cfg.c_ssh_cmd.c_str(), args.data());
+ _exit(EXIT_FAILURE);
+ }
+
+ return Ok(std::make_shared<host_tailer>(netloc,
+ std::move(child),
+ std::move(in_pipe.write_end()),
+ std::move(out_pipe.read_end()),
+ std::move(err_pipe.read_end())));
+}
+
+static ghc::filesystem::path
+remote_cache_path()
+{
+ return lnav::paths::workdir() / "remotes";
+}
+
+ghc::filesystem::path
+tailer::looper::host_tailer::tmp_path()
+{
+ auto local_path = remote_cache_path();
+
+ ghc::filesystem::create_directories(local_path);
+ auto_mem<char> resolved_path;
+
+ resolved_path = realpath(local_path.c_str(), nullptr);
+ if (resolved_path.in() == nullptr) {
+ return local_path;
+ }
+
+ return resolved_path.in();
+}
+
+static std::string
+scrub_netloc(const std::string& netloc)
+{
+ const static std::regex TO_SCRUB(R"([^\w\.\@])");
+
+ return std::regex_replace(netloc, TO_SCRUB, "_");
+}
+
+tailer::looper::host_tailer::host_tailer(const std::string& netloc,
+ auto_pid<process_state::running> child,
+ auto_fd to_child,
+ auto_fd from_child,
+ auto_fd err_from_child)
+ : isc::service<host_tailer>(netloc), ht_netloc(netloc),
+ ht_local_path(tmp_path() / scrub_netloc(netloc)),
+ ht_error_reader([netloc,
+ err = std::move(err_from_child),
+ &eq = this->ht_error_queue]() mutable {
+ read_err_pipe(netloc, err, eq);
+ }),
+ ht_state(connected{
+ std::move(child), std::move(to_child), std::move(from_child), {}})
+{
+}
+
+void
+tailer::looper::host_tailer::open_remote_path(const std::string& path,
+ logfile_open_options_base loo)
+{
+ this->ht_state.match(
+ [&](connected& conn) {
+ conn.c_desired_paths[path] = std::move(loo);
+ send_packet(conn.ht_to_child.get(),
+ TPT_OPEN_PATH,
+ TPPT_STRING,
+ path.c_str(),
+ TPPT_DONE);
+ },
+ [&](const disconnected& d) {
+ log_warning("disconnected from host, cannot tail: %s",
+ path.c_str());
+ },
+ [&](const synced& s) {
+ log_warning("synced with host, not tailing: %s", path.c_str());
+ });
+}
+
+void
+tailer::looper::host_tailer::load_preview(int64_t id, const std::string& path)
+{
+ this->ht_state.match(
+ [&](connected& conn) {
+ send_packet(conn.ht_to_child.get(),
+ TPT_LOAD_PREVIEW,
+ TPPT_STRING,
+ path.c_str(),
+ TPPT_INT64,
+ id,
+ TPPT_DONE);
+ },
+ [&](const disconnected& d) {
+ log_warning("disconnected from host, cannot preview: %s",
+ path.c_str());
+
+ auto msg = fmt::format(FMT_STRING("error: disconnected from {}"),
+ this->ht_netloc);
+ isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != id) {
+ return;
+ }
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .set_value(msg);
+ });
+ },
+ [&](const synced& s) { require(false); });
+}
+
+void
+tailer::looper::host_tailer::complete_path(const std::string& path)
+{
+ this->ht_state.match(
+ [&](connected& conn) {
+ send_packet(conn.ht_to_child.get(),
+ TPT_COMPLETE_PATH,
+ TPPT_STRING,
+ path.c_str(),
+ TPPT_DONE);
+ },
+ [&](const disconnected& d) {
+ log_warning("disconnected from host, cannot preview: %s",
+ path.c_str());
+ },
+ [&](const synced& s) { require(false); });
+}
+
+void
+tailer::looper::host_tailer::loop_body()
+{
+ const static uint64_t TOUCH_FREQ = 10000;
+
+ if (!this->ht_state.is<connected>()) {
+ return;
+ }
+
+ this->ht_cycle_count += 1;
+ if (this->ht_cycle_count % TOUCH_FREQ == 0) {
+ auto now
+ = ghc::filesystem::file_time_type{std::chrono::system_clock::now()};
+ ghc::filesystem::last_write_time(this->ht_local_path, now);
+ }
+
+ auto& conn = this->ht_state.get<connected>();
+
+ pollfd pfds[1];
+
+ pfds[0].fd = conn.ht_from_child.get();
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+
+ auto ready_count = poll(pfds, 1, 100);
+ if (ready_count > 0) {
+ auto read_res = tailer::read_packet(conn.ht_from_child);
+
+ if (read_res.isErr()) {
+ log_error("read error: %s", read_res.unwrapErr().c_str());
+ return;
+ }
+
+ auto packet = read_res.unwrap();
+ this->ht_state = packet.match(
+ [&](const tailer::packet_eof& te) {
+ log_debug("all done!");
+
+ auto finished_child = std::move(conn).close();
+ if (finished_child.exit_status() != 0
+ && !this->ht_error_queue.empty())
+ {
+ report_error(this->ht_netloc, this->ht_error_queue.back());
+ }
+
+ return state_v{disconnected()};
+ },
+ [&](const tailer::packet_announce& pa) {
+ update_tailer_description(
+ this->ht_netloc, conn.c_desired_paths, pa.pa_uname);
+ this->ht_uname = pa.pa_uname;
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_log& pl) {
+ log_debug("%s\n", pl.pl_msg.c_str());
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_error& pe) {
+ log_debug("Got an error: %s -- %s",
+ pe.pe_path.c_str(),
+ pe.pe_msg.c_str());
+
+ lnav_data.ld_active_files.fc_progress->writeAccess()
+ ->sp_tailers.erase(this->ht_netloc);
+
+ auto desired_iter = conn.c_desired_paths.find(pe.pe_path);
+ if (desired_iter != conn.c_desired_paths.end()) {
+ report_error(this->get_display_path(pe.pe_path), pe.pe_msg);
+ if (!desired_iter->second.loo_tail) {
+ conn.c_desired_paths.erase(desired_iter);
+ }
+ } else {
+ auto child_iter = conn.c_child_paths.find(pe.pe_path);
+
+ if (child_iter != conn.c_child_paths.end()
+ && !child_iter->second.loo_tail)
+ {
+ conn.c_child_paths.erase(child_iter);
+ }
+ }
+
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pe.pe_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+
+ log_debug("removing %s", local_path.c_str());
+ this->ht_active_files.erase(local_path);
+ ghc::filesystem::remove_all(local_path);
+
+ if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
+ {
+ log_info("tailer(%s): all desired paths synced",
+ this->ht_netloc.c_str());
+ return state_v{synced{}};
+ }
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_offer_block& pob) {
+ log_debug("Got an offer: %s %lld - %lld",
+ pob.pob_path.c_str(),
+ pob.pob_offset,
+ pob.pob_length);
+
+ logfile_open_options_base loo;
+ if (pob.pob_path == pob.pob_root_path) {
+ auto root_iter = conn.c_desired_paths.find(pob.pob_path);
+
+ if (root_iter == conn.c_desired_paths.end()) {
+ log_warning("ignoring unknown root: %s",
+ pob.pob_root_path.c_str());
+ return std::move(this->ht_state);
+ }
+
+ loo = root_iter->second;
+ } else {
+ auto child_iter = conn.c_child_paths.find(pob.pob_path);
+ if (child_iter == conn.c_child_paths.end()) {
+ auto root_iter
+ = conn.c_desired_paths.find(pob.pob_root_path);
+
+ if (root_iter == conn.c_desired_paths.end()) {
+ log_warning("ignoring child of unknown root: %s",
+ pob.pob_root_path.c_str());
+ return std::move(this->ht_state);
+ }
+
+ conn.c_child_paths[pob.pob_path]
+ = std::move(root_iter->second);
+ child_iter = conn.c_child_paths.find(pob.pob_path);
+ }
+
+ loo = std::move(child_iter->second);
+ }
+
+ update_tailer_description(
+ this->ht_netloc, conn.c_desired_paths, this->ht_uname);
+
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pob.pob_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+ auto open_res
+ = lnav::filesystem::open_file(local_path, O_RDONLY);
+
+ if (this->ht_active_files.count(local_path) == 0) {
+ this->ht_active_files.insert(local_path);
+
+ auto custom_name = this->get_display_path(pob.pob_path);
+ isc::to<main_looper&, services::main_t>().send(
+ [local_path,
+ custom_name,
+ loo,
+ netloc = this->ht_netloc](auto& mlooper) {
+ auto& active_fc = lnav_data.ld_active_files;
+ auto lpath_str = local_path.string();
+
+ {
+ safe::WriteAccess<safe_scan_progress> sp(
+ *active_fc.fc_progress);
+
+ sp->sp_tailers.erase(netloc);
+ }
+ if (active_fc.fc_file_names.count(lpath_str) > 0) {
+ log_debug("already in fc_file_names");
+ return;
+ }
+ if (active_fc.fc_closed_files.count(custom_name)
+ > 0) {
+ log_debug("in closed");
+ return;
+ }
+
+ file_collection fc;
+
+ fc.fc_file_names[lpath_str]
+ .with_filename(custom_name)
+ .with_source(logfile_name_source::REMOTE)
+ .with_tail(loo.loo_tail)
+ .with_non_utf_visibility(false)
+ .with_visible_size_limit(256 * 1024);
+ update_active_files(fc);
+ });
+ }
+
+ if (open_res.isErr()) {
+ log_debug("file not found (%s), sending need block",
+ open_res.unwrapErr().c_str());
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+
+ auto fd = open_res.unwrap();
+ struct stat st;
+
+ if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
+ log_debug("path changed, sending need block");
+ ghc::filesystem::remove_all(local_path);
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+
+ if (st.st_size == pob.pob_offset) {
+ log_debug("local file is synced, sending need block");
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+
+ constexpr int64_t BUFFER_SIZE = 4 * 1024 * 1024;
+ auto_mem<unsigned char> buffer;
+
+ buffer = (unsigned char*) malloc(BUFFER_SIZE);
+ auto remaining = pob.pob_length;
+ auto remaining_offset = pob.pob_offset;
+ tailer::hash_frag thf;
+ SHA256_CTX shactx;
+ sha256_init(&shactx);
+
+ log_debug("checking offer %s[%lld..+%lld]",
+ local_path.c_str(),
+ remaining_offset,
+ remaining);
+ while (remaining > 0) {
+ auto nbytes = std::min(remaining, BUFFER_SIZE);
+ auto bytes_read
+ = pread(fd, buffer, nbytes, remaining_offset);
+ if (bytes_read == -1) {
+ log_debug(
+ "unable to read file, sending need block -- %s",
+ strerror(errno));
+ ghc::filesystem::remove_all(local_path);
+ break;
+ }
+ if (bytes_read == 0) {
+ break;
+ }
+ sha256_update(&shactx, buffer.in(), bytes_read);
+ remaining -= bytes_read;
+ remaining_offset += bytes_read;
+ }
+
+ if (remaining == 0) {
+ sha256_final(&shactx, thf.thf_hash);
+
+ if (thf == pob.pob_hash) {
+ log_debug("local file block is same, sending ack");
+ send_packet(conn.ht_to_child.get(),
+ TPT_ACK_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_INT64,
+ pob.pob_offset,
+ TPPT_INT64,
+ pob.pob_length,
+ TPPT_INT64,
+ (int64_t) st.st_size,
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+ log_debug("local file is different, sending need block");
+ }
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_tail_block& ptb) {
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(ptb.ptb_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+
+ log_debug("writing tail to: %lld/%ld %s",
+ ptb.ptb_offset,
+ ptb.ptb_bits.size(),
+ local_path.c_str());
+ ghc::filesystem::create_directories(local_path.parent_path());
+ auto create_res = lnav::filesystem::create_file(
+ local_path, O_WRONLY | O_APPEND | O_CREAT, 0600);
+
+ if (create_res.isErr()) {
+ log_error("open: %s", create_res.unwrapErr().c_str());
+ } else {
+ auto fd = create_res.unwrap();
+ ftruncate(fd, ptb.ptb_offset);
+ pwrite(fd,
+ ptb.ptb_bits.data(),
+ ptb.ptb_bits.size(),
+ ptb.ptb_offset);
+ auto mtime = ghc::filesystem::file_time_type{
+ std::chrono::seconds{ptb.ptb_mtime}};
+ // XXX This isn't atomic with the write...
+ ghc::filesystem::last_write_time(local_path, mtime);
+ }
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_synced& ps) {
+ if (ps.ps_root_path == ps.ps_path) {
+ auto iter = conn.c_desired_paths.find(ps.ps_path);
+
+ if (iter != conn.c_desired_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_desired_paths.insert(ps.ps_path);
+ } else {
+ log_info("synced desired path: %s",
+ iter->first.c_str());
+ conn.c_desired_paths.erase(iter);
+ }
+ }
+ } else {
+ auto iter = conn.c_child_paths.find(ps.ps_path);
+
+ if (iter != conn.c_child_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_child_paths.insert(ps.ps_path);
+ } else {
+ log_info("synced child path: %s",
+ iter->first.c_str());
+ conn.c_child_paths.erase(iter);
+ }
+ }
+ }
+
+ if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
+ {
+ log_info("tailer(%s): all desired paths synced",
+ this->ht_netloc.c_str());
+ return state_v{synced{}};
+ } else if (!conn.c_initial_sync_done
+ && conn.c_desired_paths.size()
+ == conn.c_synced_desired_paths.size()
+ && conn.c_child_paths.size()
+ == conn.c_synced_child_paths.size())
+ {
+ log_info("tailer(%s): all desired paths synced",
+ this->ht_netloc.c_str());
+ conn.c_initial_sync_done = true;
+
+ std::set<std::string> synced_files;
+ for (const auto& desired_pair : conn.c_desired_paths) {
+ synced_files.emplace(fmt::format(
+ FMT_STRING("{}{}"), ht_netloc, desired_pair.first));
+ }
+ isc::to<main_looper&, services::main_t>().send(
+ [file_set = std::move(synced_files)](auto& mlooper) {
+ file_collection fc;
+
+ fc.fc_synced_files = file_set;
+ update_active_files(fc);
+ });
+ }
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_link& pl) {
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pl.pl_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+ auto remote_link_path = ghc::filesystem::path(pl.pl_link_value);
+ std::string link_path;
+
+ if (remote_link_path.is_absolute()) {
+ auto local_link_path = this->ht_local_path
+ / remote_link_path.relative_path();
+
+ link_path = local_link_path.string();
+ } else {
+ link_path = remote_link_path.string();
+ }
+
+ log_debug("symlinking %s -> %s",
+ local_path.c_str(),
+ link_path.c_str());
+ ghc::filesystem::create_directories(local_path.parent_path());
+ ghc::filesystem::remove_all(local_path);
+ if (symlink(link_path.c_str(), local_path.c_str()) < 0) {
+ log_error("symlink failed: %s", strerror(errno));
+ }
+
+ if (pl.pl_root_path == pl.pl_path) {
+ auto iter = conn.c_desired_paths.find(pl.pl_path);
+
+ if (iter != conn.c_desired_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_desired_paths.insert(pl.pl_path);
+ } else {
+ log_info("synced desired path: %s",
+ iter->first.c_str());
+ conn.c_desired_paths.erase(iter);
+ }
+ }
+ } else {
+ auto iter = conn.c_child_paths.find(pl.pl_path);
+
+ if (iter != conn.c_child_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_child_paths.insert(pl.pl_path);
+ } else {
+ log_info("synced child path: %s",
+ iter->first.c_str());
+ conn.c_child_paths.erase(iter);
+ }
+ }
+ }
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_preview_error& ppe) {
+ isc::to<main_looper&, services::main_t>().send(
+ [ppe](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != ppe.ppe_id) {
+ log_debug("preview ID mismatch: %lld != %lld",
+ lnav_data.ld_preview_generation,
+ ppe.ppe_id);
+ return;
+ }
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .clear();
+ lnav_data.ld_preview_source.clear();
+ lnav_data.ld_bottom_source.grep_error(ppe.ppe_msg);
+ });
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_preview_data& ppd) {
+ isc::to<main_looper&, services::main_t>().send(
+ [netloc = this->ht_netloc, ppd](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != ppd.ppd_id) {
+ log_debug("preview ID mismatch: %lld != %lld",
+ lnav_data.ld_preview_generation,
+ ppd.ppd_id);
+ return;
+ }
+ std::string str(ppd.ppd_bits.begin(),
+ ppd.ppd_bits.end());
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .set_value("For file: %s:%s",
+ netloc.c_str(),
+ ppd.ppd_path.c_str());
+ lnav_data.ld_preview_source.replace_with(str)
+ .set_text_format(detect_text_format(str));
+ });
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_possible_path& ppp) {
+ log_debug("possible path: %s", ppp.ppp_path.c_str());
+ auto full_path = fmt::format(
+ FMT_STRING("{}{}"), this->ht_netloc, ppp.ppp_path);
+
+ isc::to<main_looper&, services::main_t>().send(
+ [full_path](auto& mlooper) {
+ lnav_data.ld_rl_view->add_possibility(
+ ln_mode_t::COMMAND, "remote-path", full_path);
+ });
+ return std::move(this->ht_state);
+ });
+
+ if (!this->ht_state.is<connected>()) {
+ this->s_looping = false;
+ }
+ }
+}
+
+std::chrono::milliseconds
+tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
+{
+ return 0s;
+}
+
+void
+tailer::looper::host_tailer::stopped()
+{
+ if (this->ht_state.is<connected>()) {
+ this->ht_state = disconnected();
+ }
+ if (this->ht_error_reader.joinable()) {
+ this->ht_error_reader.join();
+ }
+}
+
+std::string
+tailer::looper::host_tailer::get_display_path(
+ const std::string& remote_path) const
+{
+ return fmt::format(FMT_STRING("{}{}"), this->ht_netloc, remote_path);
+}
+
+void*
+tailer::looper::host_tailer::run()
+{
+ log_set_thread_prefix(
+ fmt::format(FMT_STRING("tailer({})"), this->ht_netloc));
+
+ return service_base::run();
+}
+
+auto_pid<process_state::finished>
+tailer::looper::host_tailer::connected::close() &&
+{
+ this->ht_to_child.reset();
+ this->ht_from_child.reset();
+
+ return std::move(this->ht_child).wait_for_child();
+}
+
+void
+tailer::looper::child_finished(std::shared_ptr<service_base> child)
+{
+ auto child_tailer = std::static_pointer_cast<host_tailer>(child);
+
+ for (auto iter = this->l_remotes.begin(); iter != this->l_remotes.end();
+ ++iter)
+ {
+ if (iter->second != child_tailer) {
+ continue;
+ }
+
+ if (child_tailer->is_synced()) {
+ log_info("synced with netloc '%s', removing", iter->first.c_str());
+ auto netloc_iter = this->l_netlocs_to_paths.find(iter->first);
+
+ if (netloc_iter != this->l_netlocs_to_paths.end()) {
+ netloc_iter->second.send_synced_to_main(netloc_iter->first);
+ this->l_netlocs_to_paths.erase(netloc_iter);
+ }
+ }
+ lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
+ iter->first);
+ this->l_remotes.erase(iter);
+ return;
+ }
+}
+
+void
+tailer::looper::remote_path_queue::send_synced_to_main(
+ const std::string& netloc)
+{
+ std::set<std::string> synced_files;
+
+ for (const auto& pair : this->rpq_new_paths) {
+ if (!pair.second.loo_tail) {
+ synced_files.emplace(
+ fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
+ }
+ }
+ for (const auto& pair : this->rpq_existing_paths) {
+ if (!pair.second.loo_tail) {
+ synced_files.emplace(
+ fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
+ }
+ }
+
+ isc::to<main_looper&, services::main_t>().send(
+ [file_set = std::move(synced_files)](auto& mlooper) {
+ file_collection fc;
+
+ fc.fc_synced_files = file_set;
+ update_active_files(fc);
+ });
+}
+
+void
+tailer::looper::report_error(std::string path, std::string msg)
+{
+ log_error("reporting error: %s -- %s", path.c_str(), msg.c_str());
+ isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
+ file_collection fc;
+
+ fc.fc_name_to_errors.emplace(path,
+ file_error_info{
+ {},
+ msg,
+ });
+ update_active_files(fc);
+ lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
+ path);
+ });
+}
+
+void
+tailer::cleanup_cache()
+{
+ (void) std::async(std::launch::async, []() {
+ auto now = std::chrono::system_clock::now();
+ auto cache_path = remote_cache_path();
+ const auto& cfg = injector::get<const config&>();
+ std::vector<ghc::filesystem::path> to_remove;
+
+ log_debug("cache-ttl %d", cfg.c_cache_ttl.count());
+ for (const auto& entry :
+ ghc::filesystem::directory_iterator(cache_path))
+ {
+ auto mtime = ghc::filesystem::last_write_time(entry.path());
+ auto exp_time = mtime + cfg.c_cache_ttl;
+ if (now < exp_time) {
+ continue;
+ }
+
+ to_remove.emplace_back(entry.path());
+ }
+
+ for (auto& entry : to_remove) {
+ log_debug("removing cached remote: %s", entry.c_str());
+ ghc::filesystem::remove_all(entry);
+ }
+ });
+}