/** * Copyright (c) 2023, Timothy Stack * * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * Neither the name of Timothy Stack nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include "piper.looper.hh" #include #include #include "base/fs_util.hh" #include "base/injector.hh" #include "base/time_util.hh" #include "config.h" #include "hasher.hh" #include "line_buffer.hh" #include "pcrepp/pcre2pp.hh" #include "piper.looper.cfg.hh" using namespace std::chrono_literals; static ssize_t write_timestamp(int fd, log_level_t level, off_t woff) { char time_str[64]; struct timeval tv; gettimeofday(&tv, nullptr); auto fmt_res = fmt::format_to_n(time_str, sizeof(time_str), FMT_STRING("{: 12}.{:06}:{};"), tv.tv_sec, tv.tv_usec, level_names[level][0]); return pwrite(fd, time_str, fmt_res.size, woff); } extern char** environ; namespace lnav { namespace piper { const json_path_container header_env_handlers = { yajlpp::pattern_property_handler("(?.*)") .with_synopsis("") .for_field(&lnav::piper::header::h_env), }; const typed_json_path_container header_handlers = { yajlpp::property_handler("name").for_field(&lnav::piper::header::h_name), yajlpp::property_handler("ctime").for_field(&lnav::piper::header::h_ctime), yajlpp::property_handler("cwd").for_field(&lnav::piper::header::h_cwd), yajlpp::property_handler("env").with_children(header_env_handlers), }; static std::map environ_to_map() { static const auto SENSITIVE_VARS = lnav::pcre2pp::code::from_const(R"((?i)token|pass)"); std::map retval; for (size_t lpc = 0; environ[lpc]; lpc++) { auto full_sf = string_fragment::from_c_str(environ[lpc]); auto pair_opt = full_sf.split_pair(string_fragment::tag1{'='}); if (!pair_opt) { continue; } if (SENSITIVE_VARS.find_in(pair_opt->first).ignore_error()) { retval[pair_opt->first.to_string()] = "******"; } else { retval[pair_opt->first.to_string()] = pair_opt->second.to_string(); } } return retval; } looper::looper(std::string name, auto_fd stdout_fd, auto_fd stderr_fd) : l_name(std::move(name)), l_cwd(ghc::filesystem::current_path().string()), l_env(environ_to_map()), l_stdout(std::move(stdout_fd)), l_stderr(std::move(stderr_fd)) { size_t count = 0; do { this->l_out_dir = storage_path() / fmt::format( FMT_STRING("p-{}-{:03}"), hasher().update(getmstime()).update(l_name).to_string(), count); count += 1; } while (ghc::filesystem::exists(this->l_out_dir)); ghc::filesystem::create_directories(this->l_out_dir); this->l_future = std::async(std::launch::async, [this]() { this->loop(); }); } looper::~looper() { log_info("piper destructed, shutting down: %s", this->l_name.c_str()); this->l_looping = false; this->l_future.wait(); } enum class read_mode_t { binary, line, }; void looper::loop() { static const auto FORCE_MTIME_UPDATE_DURATION = 8h; const auto& cfg = injector::get(); struct pollfd pfd[2]; struct { line_buffer lb; file_range last_range; pollfd* pfd{nullptr}; log_level_t cf_level{LEVEL_INFO}; read_mode_t cf_read_mode{read_mode_t::line}; void reset_pfd() { this->pfd->fd = this->lb.get_fd(); this->pfd->events = POLLIN; this->pfd->revents = 0; } } captured_fds[2]; off_t woff = 0, last_woff = 0; auto_fd outfd; size_t rotate_count = 0; log_info("starting loop to capture: %s (%d %d)", this->l_name.c_str(), this->l_stdout.get(), this->l_stderr.get()); this->l_stdout.non_blocking(); captured_fds[0].lb.set_fd(this->l_stdout); if (this->l_stderr.has_value()) { this->l_stderr.non_blocking(); captured_fds[1].lb.set_fd(this->l_stderr); } captured_fds[1].cf_level = LEVEL_ERROR; auto last_write = std::chrono::system_clock::now(); do { static const auto TIMEOUT = std::chrono::duration_cast(1s).count(); size_t used_pfds = 0; for (auto& cap : captured_fds) { if (cap.lb.get_fd() != -1 && cap.lb.is_pipe() && !cap.lb.is_pipe_closed()) { cap.pfd = &pfd[used_pfds]; used_pfds += 1; cap.reset_pfd(); } else { cap.pfd = nullptr; } } if (used_pfds == 0) { log_info("inputs consumed, breaking loop: %s", this->l_name.c_str()); this->l_looping = false; break; } auto poll_rc = poll(pfd, used_pfds, TIMEOUT); if (poll_rc == 0) { // update the timestamp to keep the file alive from any // cleanup processes if (outfd.has_value()) { auto now = std::chrono::system_clock::now(); if ((now - last_write) >= FORCE_MTIME_UPDATE_DURATION) { last_write = now; log_perror(futimes(outfd.get(), nullptr)); } } continue; } else { last_write = std::chrono::system_clock::now(); } for (auto& cap : captured_fds) { while (this->l_looping) { if (cap.pfd == nullptr || !(cap.pfd->revents & (POLLIN | POLLHUP))) { break; } if (cap.cf_read_mode == read_mode_t::binary) { char buffer[8192]; auto read_rc = read(cap.lb.get_fd(), buffer, sizeof(buffer)); if (read_rc < 0) { if (errno == EAGAIN) { break; } log_error("failed to read next chunk: %s -- %s", this->l_name.c_str(), strerror(errno)); this->l_looping = false; } else if (read_rc == 0) { this->l_looping = false; } else { auto rc = write(outfd.get(), buffer, read_rc); if (rc != read_rc) { log_error( "failed to write to capture file: %s -- %s", this->l_name.c_str(), strerror(errno)); } } continue; } auto load_result = cap.lb.load_next_line(cap.last_range); if (load_result.isErr()) { log_error("failed to load next line: %s -- %s", this->l_name.c_str(), load_result.unwrapErr().c_str()); this->l_looping = false; break; } auto li = load_result.unwrap(); if (cap.last_range.fr_offset == 0 && !cap.lb.is_header_utf8()) { log_info("switching capture to binary mode: %s", this->l_name.c_str()); cap.cf_read_mode = read_mode_t::binary; auto out_path = this->l_out_dir / "out.0"; log_info("creating binary capture file: %s -- %s", this->l_name.c_str(), out_path.c_str()); auto create_res = lnav::filesystem::create_file( out_path, O_WRONLY | O_CLOEXEC | O_TRUNC, 0600); if (create_res.isErr()) { log_error("unable to open capture file: %s -- %s", this->l_name.c_str(), create_res.unwrapErr().c_str()); break; } auto hdr_path = this->l_out_dir / ".header"; auto hdr = header{ current_timeval(), this->l_name, this->l_cwd, this->l_env, }; auto write_hdr_res = lnav::filesystem::write_file( hdr_path, header_handlers.to_string(hdr)); if (write_hdr_res.isErr()) { log_error("unable to write header file: %s -- %s", hdr_path.c_str(), write_hdr_res.unwrapErr().c_str()); break; } outfd = create_res.unwrap(); auto header_avail = cap.lb.get_available(); auto read_res = cap.lb.read_range(header_avail); if (read_res.isOk()) { auto sbr = read_res.unwrap(); write(outfd.get(), sbr.get_data(), sbr.length()); } else { log_error("failed to get header data: %s -- %s", this->l_name.c_str(), read_res.unwrapErr().c_str()); } continue; } if (li.li_partial && !cap.lb.is_pipe_closed()) { break; } if (li.li_file_range.empty()) { break; } auto read_result = cap.lb.read_range(li.li_file_range); if (read_result.isErr()) { log_error("failed to read next line: %s -- %s", this->l_name.c_str(), read_result.unwrapErr().c_str()); this->l_looping = false; break; } auto sbr = read_result.unwrap(); if (woff > last_woff && woff >= cfg.c_max_size) { log_info( "capture file has reached max size, rotating: %s -- " "%lld", this->l_name.c_str(), woff); outfd.reset(); } if (!outfd.has_value()) { auto out_path = this->l_out_dir / fmt::format(FMT_STRING("out.{}"), rotate_count % cfg.c_rotations); log_info("creating capturing file: %s -- %s", this->l_name.c_str(), out_path.c_str()); auto create_res = lnav::filesystem::create_file( out_path, O_WRONLY | O_CLOEXEC | O_TRUNC, 0600); if (create_res.isErr()) { log_error("unable to open capture file: %s -- %s", this->l_name.c_str(), create_res.unwrapErr().c_str()); break; } outfd = create_res.unwrap(); rotate_count += 1; auto hdr = header{ current_timeval(), this->l_name, this->l_cwd, this->l_env, }; woff = 0; auto hdr_str = header_handlers.to_string(hdr); uint32_t meta_size = htonl(hdr_str.length()); auto prc = write( outfd.get(), HEADER_MAGIC, sizeof(HEADER_MAGIC)); if (prc < sizeof(HEADER_MAGIC)) { log_error("unable to write file header: %s -- %s", this->l_name.c_str(), strerror(errno)); break; } woff += prc; prc = write(outfd.get(), &meta_size, sizeof(meta_size)); if (prc < sizeof(meta_size)) { log_error("unable to write file header: %s -- %s", this->l_name.c_str(), strerror(errno)); break; } woff += prc; prc = write(outfd.get(), hdr_str.c_str(), hdr_str.size()); if (prc < hdr_str.size()) { log_error("unable to write file header: %s -- %s", this->l_name.c_str(), strerror(errno)); break; } woff += prc; } ssize_t wrc; last_woff = woff; wrc = write_timestamp(outfd.get(), cap.cf_level, woff); if (wrc == -1) { log_error("unable to write timestamp: %s -- %s", this->l_name.c_str(), strerror(errno)); this->l_looping = false; break; } woff += wrc; /* Need to do pwrite here since the fd is used by the main * lnav process as well. */ wrc = pwrite(outfd.get(), sbr.get_data(), sbr.length(), woff); if (wrc == -1) { log_error("unable to write captured data: %s -- %s", this->l_name.c_str(), strerror(errno)); this->l_looping = false; break; } woff += wrc; cap.last_range = li.li_file_range; if (li.li_partial && sbr.get_data()[sbr.length() - 1] != '\n' && (cap.last_range.next_offset() != cap.lb.get_file_size())) { woff = last_woff; } } } } while (this->l_looping); log_info("exiting loop to capture: %s", this->l_name.c_str()); } Result, std::string> create_looper(std::string name, auto_fd stdout_fd, auto_fd stderr_fd) { return Ok(handle(std::make_shared( name, std::move(stdout_fd), std::move(stderr_fd)))); } void cleanup() { (void) std::async(std::launch::async, []() { const auto& cfg = injector::get(); auto now = std::chrono::system_clock::now(); auto cache_path = storage_path(); std::vector to_remove; for (const auto& cache_subdir : ghc::filesystem::directory_iterator(cache_path)) { auto mtime = ghc::filesystem::last_write_time(cache_subdir.path()); auto exp_time = mtime + cfg.c_ttl; if (now < exp_time) { continue; } bool is_recent = false; for (const auto& entry : ghc::filesystem::directory_iterator(cache_subdir)) { auto mtime = ghc::filesystem::last_write_time(entry.path()); auto exp_time = mtime + cfg.c_ttl; if (now < exp_time) { is_recent = true; break; } } if (!is_recent) { to_remove.emplace_back(cache_subdir); } } for (auto& entry : to_remove) { log_debug("removing piper directory: %s", entry.c_str()); ghc::filesystem::remove_all(entry); } }); } } // namespace piper } // namespace lnav