diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:44:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:44:55 +0000 |
commit | 5068d34c08f951a7ea6257d305a1627b09a95817 (patch) | |
tree | 08213e2be853396a3b07ce15dbe222644dcd9a89 /src/grep_proc.cc | |
parent | Initial commit. (diff) | |
download | lnav-5068d34c08f951a7ea6257d305a1627b09a95817.tar.xz lnav-5068d34c08f951a7ea6257d305a1627b09a95817.zip |
Adding upstream version 0.11.1.upstream/0.11.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/grep_proc.cc')
-rw-r--r-- | src/grep_proc.cc | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/src/grep_proc.cc b/src/grep_proc.cc new file mode 100644 index 0000000..e67ae2b --- /dev/null +++ b/src/grep_proc.cc @@ -0,0 +1,426 @@ +/** + * Copyright (c) 2007-2012, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @file grep_proc.cc + */ + +#include "grep_proc.hh" + +#include <errno.h> +#include <fcntl.h> +#include <signal.h> +#include <stdio.h> +#include <string.h> +#include <sys/wait.h> +#include <unistd.h> + +#include "base/auto_pid.hh" +#include "base/lnav_log.hh" +#include "base/opt_util.hh" +#include "base/string_util.hh" +#include "config.h" +#include "lnav_util.hh" +#include "vis_line.hh" + +template<typename LineType> +grep_proc<LineType>::grep_proc(std::shared_ptr<lnav::pcre2pp::code> code, + grep_proc_source<LineType>& gps, + std::shared_ptr<pollable_supervisor> ps) + : pollable(ps, pollable::category::background), gp_pcre(code), + gp_source(gps) +{ + require(this->invariant()); + + gps.register_proc(this); +} + +template<typename LineType> +grep_proc<LineType>::~grep_proc() +{ + this->invalidate(); +} + +template<typename LineType> +void +grep_proc<LineType>::handle_match( + int line, std::string& line_value, int off, int* matches, int count) +{ + int lpc; + + if (off == 0) { + fprintf(stdout, "%d\n", line); + } + fprintf(stdout, "[%d:%d]\n", matches[0], matches[1]); + for (lpc = 1; lpc < count; lpc++) { + fprintf(stdout, "(%d:%d)", matches[lpc * 2], matches[lpc * 2 + 1]); + fwrite(&(line_value.c_str()[matches[lpc * 2]]), + 1, + matches[lpc * 2 + 1] - matches[lpc * 2], + stdout); + fputc('\n', stdout); + } +} + +template<typename LineType> +void +grep_proc<LineType>::start() +{ + require(this->invariant()); + + if (this->gp_sink) { + // XXX hack to make sure threads used by line_buffer are not active + // before the fork. + this->gp_sink->grep_quiesce(); + } + + log_debug("grep_proc(%p): start", this); + if (this->gp_child_started || this->gp_queue.empty()) { + log_debug("grep_proc(%p): nothing to do?", this); + return; + } + + auto_pipe in_pipe(STDIN_FILENO); + auto_pipe out_pipe(STDOUT_FILENO); + auto_pipe err_pipe(STDERR_FILENO); + + /* Get ahold of some pipes for stdout and stderr. */ + if (out_pipe.open() < 0) { + throw error(errno); + } + + if (err_pipe.open() < 0) { + throw error(errno); + } + + if ((this->gp_child = fork()) < 0) { + throw error(errno); + } + + in_pipe.after_fork(this->gp_child); + out_pipe.after_fork(this->gp_child); + err_pipe.after_fork(this->gp_child); + + if (this->gp_child != 0) { + log_perror(fcntl(out_pipe.read_end(), F_SETFL, O_NONBLOCK)); + log_perror(fcntl(out_pipe.read_end(), F_SETFD, 1)); + this->gp_line_buffer.set_fd(out_pipe.read_end()); + + log_perror(fcntl(err_pipe.read_end(), F_SETFL, O_NONBLOCK)); + log_perror(fcntl(err_pipe.read_end(), F_SETFD, 1)); + require(this->gp_err_pipe.get() == -1); + this->gp_err_pipe = std::move(err_pipe.read_end()); + this->gp_child_started = true; + this->gp_child_queue_size = this->gp_queue.size(); + + this->gp_queue.clear(); + + log_debug("grep_proc(%p): started child %d", this, this->gp_child); + return; + } + + /* In the child... */ + lnav::pid::in_child = true; + + /* + * Restore the default signal handlers so we don't hang around + * forever if there is a problem. + */ + signal(SIGINT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + + this->child_init(); + + this->child_loop(); + + _exit(0); +} + +template<typename LineType> +void +grep_proc<LineType>::child_loop() +{ + char outbuf[BUFSIZ * 2]; + std::string line_value; + + /* Make sure buffering is on, not sure of the state in the parent. */ + if (setvbuf(stdout, outbuf, _IOFBF, BUFSIZ * 2) < 0) { + perror("setvbuf"); + } + lnav_log_file + = make_optional_from_nullable(fopen("/tmp/lnav.grep.err", "a")); + line_value.reserve(BUFSIZ * 2); + while (!this->gp_queue.empty()) { + LineType start_line = this->gp_queue.front().first; + LineType stop_line = this->gp_queue.front().second; + bool done = false; + LineType line; + + this->gp_queue.pop_front(); + for (line = this->gp_source.grep_initial_line(start_line, + this->gp_highest_line); + line != -1 && (stop_line == -1 || line < stop_line) && !done; + this->gp_source.grep_next_line(line)) + { + line_value.clear(); + done = !this->gp_source.grep_value_for_line(line, line_value); + if (!done) { + this->gp_pcre->capture_from(line_value) + .for_each([&](lnav::pcre2pp::match_data& md) { + if (md.leading().sf_begin == 0) { + fprintf(stdout, "%d\n", (int) line); + } + fprintf(stdout, + "[%d:%d]\n", + md[0]->sf_begin, + md[0]->sf_end); + for (int lpc = 1; lpc < md.get_count(); lpc++) { + if (!md[lpc]) { + continue; + } + fprintf(stdout, + "(%d:%d)", + md[lpc]->sf_begin, + md[lpc]->sf_end); + + fwrite( + md[lpc]->data(), 1, md[lpc]->length(), stdout); + fputc('\n', stdout); + } + fprintf(stdout, "/\n"); + }); + } + + if (((line + 1) % 10000) == 0) { + /* Periodically flush the buffer so the parent sees progress */ + this->child_batch(); + } + } + + if (stop_line == -1) { + // When scanning to the end of the source, we need to return the + // highest line that was seen so that the next request that + // continues from the end works properly. + fprintf(stdout, "h%d\n", line - 1); + } + this->gp_highest_line = line - 1_vl; + this->child_term(); + } +} + +template<typename LineType> +void +grep_proc<LineType>::cleanup() +{ + if (this->gp_child != -1 && this->gp_child != 0) { + int status = 0; + + kill(this->gp_child, SIGTERM); + while (waitpid(this->gp_child, &status, 0) < 0 && (errno == EINTR)) { + ; + } + require(!WIFSIGNALED(status) || WTERMSIG(status) != SIGABRT); + this->gp_child = -1; + this->gp_child_started = false; + + if (this->gp_sink) { + for (size_t lpc = 0; lpc < this->gp_child_queue_size; lpc++) { + this->gp_sink->grep_end(*this); + } + } + } + + if (this->gp_err_pipe != -1) { + this->gp_err_pipe.reset(); + } + + this->gp_pipe_range.clear(); + this->gp_line_buffer.reset(); + + ensure(this->invariant()); + + if (!this->gp_queue.empty()) { + this->start(); + } +} + +template<typename LineType> +void +grep_proc<LineType>::dispatch_line(char* line) +{ + int start, end, capture_start; + + require(line != nullptr); + + if (sscanf(line, "h%d", this->gp_highest_line.out()) == 1) { + } else if (sscanf(line, "%d", this->gp_last_line.out()) == 1) { + /* Starting a new line with matches. */ + ensure(this->gp_last_line >= 0); + } else if (sscanf(line, "[%d:%d]", &start, &end) == 2) { + require(start >= 0); + require(end >= 0); + + /* Pass the match offsets to the sink delegate. */ + if (this->gp_sink != nullptr) { + this->gp_sink->grep_match(*this, this->gp_last_line, start, end); + } + } else if (sscanf(line, "(%d:%d)%n", &start, &end, &capture_start) == 2) { + require(start == -1 || start >= 0); + require(end >= 0); + + /* Pass the captured strings to the sink delegate. */ + if (this->gp_sink != nullptr) { + this->gp_sink->grep_capture( + *this, + this->gp_last_line, + start, + end, + start < 0 ? nullptr : &line[capture_start]); + } + } else if (line[0] == '/') { + if (this->gp_sink != nullptr) { + this->gp_sink->grep_match_end(*this, this->gp_last_line); + } + } else { + log_error("bad line from child -- %s", line); + } +} + +template<typename LineType> +void +grep_proc<LineType>::check_poll_set(const std::vector<struct pollfd>& pollfds) +{ + require(this->invariant()); + + if (this->gp_err_pipe != -1 && pollfd_ready(pollfds, this->gp_err_pipe)) { + char buffer[1024 + 1]; + ssize_t rc; + + rc = read(this->gp_err_pipe, buffer, sizeof(buffer) - 1); + if (rc > 0) { + static const char* PREFIX = ": "; + + buffer[rc] = '\0'; + if (strncmp(buffer, PREFIX, strlen(PREFIX)) == 0) { + char* lf; + + if ((lf = strchr(buffer, '\n')) != nullptr) { + *lf = '\0'; + } + if (this->gp_control != nullptr) { + this->gp_control->grep_error(&buffer[strlen(PREFIX)]); + } + } + } else if (rc == 0) { + this->gp_err_pipe.reset(); + } + } + + if (this->gp_line_buffer.get_fd() != -1 + && pollfd_ready(pollfds, this->gp_line_buffer.get_fd())) + { + try { + static const int MAX_LOOPS = 100; + + int loop_count = 0; + bool drained = false; + + while (loop_count < MAX_LOOPS) { + auto load_result + = this->gp_line_buffer.load_next_line(this->gp_pipe_range); + + if (load_result.isErr()) { + log_error("failed to read from grep_proc child: %s", + load_result.unwrapErr().c_str()); + break; + } + + auto li = load_result.unwrap(); + + if (li.li_file_range.empty()) { + drained = true; + break; + } + + this->gp_pipe_range = li.li_file_range; + this->gp_line_buffer.read_range(li.li_file_range) + .then([this](auto sbr) { + auto_mem<char> buf; + + buf = (char*) malloc(sbr.length() + 1); + sbr.rtrim(is_line_ending); + memcpy(buf, sbr.get_data(), sbr.length()); + buf[sbr.length()] = '\0'; + this->dispatch_line(buf); + }); + + loop_count += 1; + } + + if (this->gp_sink != nullptr) { + this->gp_sink->grep_end_batch(*this); + } + + if (drained && this->gp_line_buffer.is_pipe_closed()) { + this->cleanup(); + } + } catch (line_buffer::error& e) { + this->cleanup(); + } + } + + ensure(this->invariant()); +} + +template<typename LineType> +grep_proc<LineType>& +grep_proc<LineType>::invalidate() +{ + if (this->gp_sink) { + for (size_t lpc = 0; lpc < this->gp_queue.size(); lpc++) { + this->gp_sink->grep_end(*this); + } + } + this->gp_queue.clear(); + this->cleanup(); + return *this; +} + +template<typename LineType> +void +grep_proc<LineType>::update_poll_set(std::vector<struct pollfd>& pollfds) +{ + if (this->gp_line_buffer.get_fd() != -1) { + pollfds.push_back( + (struct pollfd){this->gp_line_buffer.get_fd(), POLLIN, 0}); + } + if (this->gp_err_pipe.get() != -1) { + pollfds.push_back((struct pollfd){this->gp_err_pipe, POLLIN, 0}); + } +} + +template class grep_proc<vis_line_t>; |