diff options
Diffstat (limited to '')
-rw-r--r-- | src/piper.looper.cc | 485 |
1 files changed, 485 insertions, 0 deletions
diff --git a/src/piper.looper.cc b/src/piper.looper.cc new file mode 100644 index 0000000..91cc19f --- /dev/null +++ b/src/piper.looper.cc @@ -0,0 +1,485 @@ +/** + * Copyright (c) 2023, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <chrono> + +#include "piper.looper.hh" + +#include <arpa/inet.h> +#include <poll.h> + +#include "base/fs_util.hh" +#include "base/injector.hh" +#include "base/time_util.hh" +#include "config.h" +#include "hasher.hh" +#include "line_buffer.hh" +#include "pcrepp/pcre2pp.hh" +#include "piper.looper.cfg.hh" + +using namespace std::chrono_literals; + +static ssize_t +write_timestamp(int fd, log_level_t level, off_t woff) +{ + char time_str[64]; + struct timeval tv; + + gettimeofday(&tv, nullptr); + auto fmt_res = fmt::format_to_n(time_str, + sizeof(time_str), + FMT_STRING("{: 12}.{:06}:{};"), + tv.tv_sec, + tv.tv_usec, + level_names[level][0]); + + return pwrite(fd, time_str, fmt_res.size, woff); +} + +extern char** environ; + +namespace lnav { +namespace piper { + +const json_path_container header_env_handlers = { + yajlpp::pattern_property_handler("(?<name>.*)") + .with_synopsis("<name>") + .for_field(&lnav::piper::header::h_env), +}; + +const typed_json_path_container<lnav::piper::header> header_handlers = { + yajlpp::property_handler("name").for_field(&lnav::piper::header::h_name), + yajlpp::property_handler("ctime").for_field(&lnav::piper::header::h_ctime), + yajlpp::property_handler("cwd").for_field(&lnav::piper::header::h_cwd), + yajlpp::property_handler("env").with_children(header_env_handlers), +}; + +static std::map<std::string, std::string> +environ_to_map() +{ + static const auto SENSITIVE_VARS + = lnav::pcre2pp::code::from_const(R"((?i)token|pass)"); + + std::map<std::string, std::string> retval; + + for (size_t lpc = 0; environ[lpc]; lpc++) { + auto full_sf = string_fragment::from_c_str(environ[lpc]); + auto pair_opt = full_sf.split_pair(string_fragment::tag1{'='}); + + if (!pair_opt) { + continue; + } + if (SENSITIVE_VARS.find_in(pair_opt->first).ignore_error()) { + retval[pair_opt->first.to_string()] = "******"; + } else { + retval[pair_opt->first.to_string()] = pair_opt->second.to_string(); + } + } + + return retval; +} + +looper::looper(std::string name, auto_fd stdout_fd, auto_fd stderr_fd) + : l_name(std::move(name)), l_cwd(ghc::filesystem::current_path().string()), + l_env(environ_to_map()), l_stdout(std::move(stdout_fd)), + l_stderr(std::move(stderr_fd)) +{ + size_t count = 0; + do { + this->l_out_dir + = storage_path() + / fmt::format( + FMT_STRING("p-{}-{:03}"), + hasher().update(getmstime()).update(l_name).to_string(), + count); + count += 1; + } while (ghc::filesystem::exists(this->l_out_dir)); + ghc::filesystem::create_directories(this->l_out_dir); + this->l_future = std::async(std::launch::async, [this]() { this->loop(); }); +} + +looper::~looper() +{ + log_info("piper destructed, shutting down: %s", this->l_name.c_str()); + this->l_looping = false; + this->l_future.wait(); +} + +enum class read_mode_t { + binary, + line, +}; + +void +looper::loop() +{ + static const auto FORCE_MTIME_UPDATE_DURATION = 8h; + + const auto& cfg = injector::get<const config&>(); + struct pollfd pfd[2]; + struct { + line_buffer lb; + file_range last_range; + pollfd* pfd{nullptr}; + log_level_t cf_level{LEVEL_INFO}; + read_mode_t cf_read_mode{read_mode_t::line}; + + void reset_pfd() + { + this->pfd->fd = this->lb.get_fd(); + this->pfd->events = POLLIN; + this->pfd->revents = 0; + } + } captured_fds[2]; + off_t woff = 0, last_woff = 0; + auto_fd outfd; + size_t rotate_count = 0; + + log_info("starting loop to capture: %s (%d %d)", + this->l_name.c_str(), + this->l_stdout.get(), + this->l_stderr.get()); + this->l_stdout.non_blocking(); + captured_fds[0].lb.set_fd(this->l_stdout); + if (this->l_stderr.has_value()) { + this->l_stderr.non_blocking(); + captured_fds[1].lb.set_fd(this->l_stderr); + } + captured_fds[1].cf_level = LEVEL_ERROR; + auto last_write = std::chrono::system_clock::now(); + do { + static const auto TIMEOUT + = std::chrono::duration_cast<std::chrono::milliseconds>(1s).count(); + + size_t used_pfds = 0; + for (auto& cap : captured_fds) { + if (cap.lb.get_fd() != -1 && cap.lb.is_pipe() + && !cap.lb.is_pipe_closed()) + { + cap.pfd = &pfd[used_pfds]; + used_pfds += 1; + cap.reset_pfd(); + } else { + cap.pfd = nullptr; + } + } + + if (used_pfds == 0) { + log_info("inputs consumed, breaking loop: %s", + this->l_name.c_str()); + this->l_looping = false; + break; + } + + auto poll_rc = poll(pfd, used_pfds, TIMEOUT); + if (poll_rc == 0) { + // update the timestamp to keep the file alive from any + // cleanup processes + if (outfd.has_value()) { + auto now = std::chrono::system_clock::now(); + + if ((now - last_write) >= FORCE_MTIME_UPDATE_DURATION) { + last_write = now; + log_perror(futimes(outfd.get(), nullptr)); + } + } + continue; + } else { + last_write = std::chrono::system_clock::now(); + } + for (auto& cap : captured_fds) { + while (this->l_looping) { + if (cap.pfd == nullptr + || !(cap.pfd->revents & (POLLIN | POLLHUP))) + { + break; + } + + if (cap.cf_read_mode == read_mode_t::binary) { + char buffer[8192]; + auto read_rc + = read(cap.lb.get_fd(), buffer, sizeof(buffer)); + + if (read_rc < 0) { + if (errno == EAGAIN) { + break; + } + log_error("failed to read next chunk: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + this->l_looping = false; + } else if (read_rc == 0) { + this->l_looping = false; + } else { + auto rc = write(outfd.get(), buffer, read_rc); + if (rc != read_rc) { + log_error( + "failed to write to capture file: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + } + } + continue; + } + + auto load_result = cap.lb.load_next_line(cap.last_range); + + if (load_result.isErr()) { + log_error("failed to load next line: %s -- %s", + this->l_name.c_str(), + load_result.unwrapErr().c_str()); + this->l_looping = false; + break; + } + + auto li = load_result.unwrap(); + + if (cap.last_range.fr_offset == 0 && !cap.lb.is_header_utf8()) { + log_info("switching capture to binary mode: %s", + this->l_name.c_str()); + cap.cf_read_mode = read_mode_t::binary; + + auto out_path = this->l_out_dir / "out.0"; + log_info("creating binary capture file: %s -- %s", + this->l_name.c_str(), + out_path.c_str()); + auto create_res = lnav::filesystem::create_file( + out_path, O_WRONLY | O_CLOEXEC | O_TRUNC, 0600); + if (create_res.isErr()) { + log_error("unable to open capture file: %s -- %s", + this->l_name.c_str(), + create_res.unwrapErr().c_str()); + break; + } + + auto hdr_path = this->l_out_dir / ".header"; + auto hdr = header{ + current_timeval(), + this->l_name, + this->l_cwd, + this->l_env, + }; + auto write_hdr_res = lnav::filesystem::write_file( + hdr_path, header_handlers.to_string(hdr)); + if (write_hdr_res.isErr()) { + log_error("unable to write header file: %s -- %s", + hdr_path.c_str(), + write_hdr_res.unwrapErr().c_str()); + break; + } + + outfd = create_res.unwrap(); + auto header_avail = cap.lb.get_available(); + auto read_res = cap.lb.read_range(header_avail); + if (read_res.isOk()) { + auto sbr = read_res.unwrap(); + write(outfd.get(), sbr.get_data(), sbr.length()); + } else { + log_error("failed to get header data: %s -- %s", + this->l_name.c_str(), + read_res.unwrapErr().c_str()); + } + continue; + } + + if (li.li_partial && !cap.lb.is_pipe_closed()) { + break; + } + + if (li.li_file_range.empty()) { + break; + } + + auto read_result = cap.lb.read_range(li.li_file_range); + + if (read_result.isErr()) { + log_error("failed to read next line: %s -- %s", + this->l_name.c_str(), + read_result.unwrapErr().c_str()); + this->l_looping = false; + break; + } + + auto sbr = read_result.unwrap(); + + if (woff > last_woff && woff >= cfg.c_max_size) { + log_info( + "capture file has reached max size, rotating: %s -- " + "%lld", + this->l_name.c_str(), + woff); + outfd.reset(); + } + + if (!outfd.has_value()) { + auto out_path = this->l_out_dir + / fmt::format(FMT_STRING("out.{}"), + rotate_count % cfg.c_rotations); + log_info("creating capturing file: %s -- %s", + this->l_name.c_str(), + out_path.c_str()); + auto create_res = lnav::filesystem::create_file( + out_path, O_WRONLY | O_CLOEXEC | O_TRUNC, 0600); + if (create_res.isErr()) { + log_error("unable to open capture file: %s -- %s", + this->l_name.c_str(), + create_res.unwrapErr().c_str()); + break; + } + + outfd = create_res.unwrap(); + rotate_count += 1; + + auto hdr = header{ + current_timeval(), + this->l_name, + this->l_cwd, + this->l_env, + }; + + woff = 0; + auto hdr_str = header_handlers.to_string(hdr); + uint32_t meta_size = htonl(hdr_str.length()); + auto prc = write( + outfd.get(), HEADER_MAGIC, sizeof(HEADER_MAGIC)); + if (prc < sizeof(HEADER_MAGIC)) { + log_error("unable to write file header: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + break; + } + woff += prc; + prc = write(outfd.get(), &meta_size, sizeof(meta_size)); + if (prc < sizeof(meta_size)) { + log_error("unable to write file header: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + break; + } + woff += prc; + prc = write(outfd.get(), hdr_str.c_str(), hdr_str.size()); + if (prc < hdr_str.size()) { + log_error("unable to write file header: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + break; + } + woff += prc; + } + + ssize_t wrc; + + last_woff = woff; + wrc = write_timestamp(outfd.get(), cap.cf_level, woff); + if (wrc == -1) { + log_error("unable to write timestamp: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + this->l_looping = false; + break; + } + woff += wrc; + + /* Need to do pwrite here since the fd is used by the main + * lnav process as well. + */ + wrc = pwrite(outfd.get(), sbr.get_data(), sbr.length(), woff); + if (wrc == -1) { + log_error("unable to write captured data: %s -- %s", + this->l_name.c_str(), + strerror(errno)); + this->l_looping = false; + break; + } + woff += wrc; + + cap.last_range = li.li_file_range; + if (li.li_partial && sbr.get_data()[sbr.length() - 1] != '\n' + && (cap.last_range.next_offset() != cap.lb.get_file_size())) + { + woff = last_woff; + } + } + } + } while (this->l_looping); + + log_info("exiting loop to capture: %s", this->l_name.c_str()); +} + +Result<handle<state::running>, std::string> +create_looper(std::string name, auto_fd stdout_fd, auto_fd stderr_fd) +{ + return Ok(handle<state::running>(std::make_shared<looper>( + name, std::move(stdout_fd), std::move(stderr_fd)))); +} + +void +cleanup() +{ + (void) std::async(std::launch::async, []() { + const auto& cfg = injector::get<const config&>(); + auto now = std::chrono::system_clock::now(); + auto cache_path = storage_path(); + std::vector<ghc::filesystem::path> to_remove; + + for (const auto& cache_subdir : + ghc::filesystem::directory_iterator(cache_path)) + { + auto mtime = ghc::filesystem::last_write_time(cache_subdir.path()); + auto exp_time = mtime + cfg.c_ttl; + if (now < exp_time) { + continue; + } + + bool is_recent = false; + + for (const auto& entry : + ghc::filesystem::directory_iterator(cache_subdir)) + { + auto mtime = ghc::filesystem::last_write_time(entry.path()); + auto exp_time = mtime + cfg.c_ttl; + if (now < exp_time) { + is_recent = true; + break; + } + } + if (!is_recent) { + to_remove.emplace_back(cache_subdir); + } + } + + for (auto& entry : to_remove) { + log_debug("removing piper directory: %s", entry.c_str()); + ghc::filesystem::remove_all(entry); + } + }); +} + +} // namespace piper +} // namespace lnav |