diff options
Diffstat (limited to '')
-rw-r--r-- | src/file_collection.cc | 462 |
1 files changed, 325 insertions, 137 deletions
diff --git a/src/file_collection.cc b/src/file_collection.cc index 19e71c2..96ec69d 100644 --- a/src/file_collection.cc +++ b/src/file_collection.cc @@ -35,21 +35,29 @@ #include <glob.h> +#include "base/fs_util.hh" #include "base/humanize.network.hh" #include "base/isc.hh" #include "base/itertools.hh" #include "base/opt_util.hh" #include "base/string_util.hh" #include "config.h" -#include "lnav_util.hh" +#include "file_converter_manager.hh" #include "logfile.hh" -#include "pcap_manager.hh" #include "service_tags.hh" #include "tailer/tailer.looper.hh" static std::mutex REALPATH_CACHE_MUTEX; static std::unordered_map<std::string, std::string> REALPATH_CACHE; +void +child_poller::send_sigint() +{ + if (this->cp_child) { + kill(this->cp_child->in(), SIGINT); + } +} + child_poll_result_t child_poller::poll(file_collection& fc) { @@ -72,6 +80,38 @@ child_poller::poll(file_collection& fc) }); } +file_collection::limits_t::limits_t() +{ + static constexpr rlim_t RESERVED_FDS = 32; + + struct rlimit rl; + + if (getrlimit(RLIMIT_NOFILE, &rl) == 0) { + this->l_fds = rl.rlim_cur; + } else { + log_error("getrlimit() failed -- %s", strerror(errno)); + + this->l_fds = 8192; + } + + if (this->l_fds < RESERVED_FDS) { + this->l_open_files = this->l_fds; + } else { + this->l_open_files = this->l_fds - RESERVED_FDS; + } + + log_info( + "fd limit: %zu; open file limit: %zu", this->l_fds, this->l_open_files); +} + +const file_collection::limits_t& +file_collection::get_limits() +{ + static const limits_t INSTANCE; + + return INSTANCE; +} + void file_collection::close_files(const std::vector<std::shared_ptr<logfile>>& files) { @@ -116,25 +156,28 @@ file_collection::regenerate_unique_file_names() upg.generate(); this->fc_largest_path_length = 0; - for (const auto& pair : this->fc_name_to_errors) { - auto path = ghc::filesystem::path(pair.first).filename().string(); + { + safe::ReadAccess<safe_name_to_errors> errs(*this->fc_name_to_errors); + + for (const auto& pair : *errs) { + auto path = ghc::filesystem::path(pair.first).filename().string(); - if (path.length() > this->fc_largest_path_length) { - this->fc_largest_path_length = path.length(); + if (path.length() > this->fc_largest_path_length) { + this->fc_largest_path_length = path.length(); + } } } for (const auto& lf : this->fc_files) { const auto& path = lf->get_unique_path(); - if (path.length() > this->fc_largest_path_length) { - this->fc_largest_path_length = path.length(); + if (path.native().length() > this->fc_largest_path_length) { + this->fc_largest_path_length = path.native().length(); } } for (const auto& pair : this->fc_other_files) { switch (pair.second.ofd_format) { case file_format_t::UNKNOWN: case file_format_t::ARCHIVE: - case file_format_t::PCAP: case file_format_t::SQLITE_DB: { auto bn = ghc::filesystem::path(pair.first).filename().string(); if (bn.length() > this->fc_largest_path_length) { @@ -155,19 +198,31 @@ file_collection::regenerate_unique_file_names() void file_collection::merge(file_collection& other) { + bool do_regen = !other.fc_files.empty() || !other.fc_other_files.empty() + || !other.fc_name_to_errors->readAccess()->empty(); + this->fc_recursive = this->fc_recursive || other.fc_recursive; this->fc_rotated = this->fc_rotated || other.fc_rotated; this->fc_synced_files.insert(other.fc_synced_files.begin(), other.fc_synced_files.end()); - this->fc_name_to_errors.insert(other.fc_name_to_errors.begin(), - other.fc_name_to_errors.end()); - this->fc_file_names.insert( - std::make_move_iterator(other.fc_file_names.begin()), - std::make_move_iterator(other.fc_file_names.end())); + + std::map<std::string, file_error_info> new_errors; + { + safe::ReadAccess<safe_name_to_errors> errs(*other.fc_name_to_errors); + + new_errors.insert(errs->cbegin(), errs->cend()); + } + { + safe::WriteAccess<safe_name_to_errors> errs(*this->fc_name_to_errors); + + errs->insert(new_errors.begin(), new_errors.end()); + } + this->fc_file_names.insert(other.fc_file_names.begin(), + other.fc_file_names.end()); if (!other.fc_files.empty()) { for (const auto& lf : other.fc_files) { - this->fc_name_to_errors.erase(lf->get_filename()); + this->fc_name_to_errors->writeAccess()->erase(lf->get_filename()); } this->fc_files.insert( this->fc_files.end(), other.fc_files.begin(), other.fc_files.end()); @@ -187,13 +242,17 @@ file_collection::merge(file_collection& other) std::make_move_iterator(other.fc_child_pollers.end())); other.fc_child_pollers.clear(); } + + if (do_regen) { + this->regenerate_unique_file_names(); + } } /** * Functor used to compare files based on their device and inode number. */ struct same_file { - explicit same_file(const struct stat& stat) : sf_stat(stat){}; + explicit same_file(const struct stat& stat) : sf_stat(stat) {} /** * Compare the given log file against the 'stat' given in the constructor. @@ -203,7 +262,20 @@ struct same_file { */ bool operator()(const std::shared_ptr<logfile>& lf) const { - return !lf->is_closed() && this->sf_stat.st_dev == lf->get_stat().st_dev + if (lf->is_closed()) { + return false; + } + + const auto& lf_loo = lf->get_open_options(); + + if (lf_loo.loo_temp_dev != 0 + && this->sf_stat.st_dev == lf_loo.loo_temp_dev + && this->sf_stat.st_ino == lf_loo.loo_temp_ino) + { + return true; + } + + return this->sf_stat.st_dev == lf->get_stat().st_dev && this->sf_stat.st_ino == lf->get_stat().st_ino; } @@ -219,33 +291,20 @@ struct same_file { * @param fd An already-opened descriptor for 'filename'. * @param required Specifies whether or not the file must exist and be valid. */ -std::future<file_collection> +nonstd::optional<std::future<file_collection>> file_collection::watch_logfile(const std::string& filename, logfile_open_options& loo, bool required) { - file_collection retval; struct stat st; int rc; + auto filename_key = loo.loo_filename.empty() ? filename : loo.loo_filename; if (this->fc_closed_files.count(filename)) { - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } - if (loo.loo_fd != -1) { - rc = fstat(loo.loo_fd, &st); - if (rc == 0) { - loo.with_stat_for_temp(st); - } - } else if (loo.loo_temp_file) { - memset(&st, 0, sizeof(st)); - st.st_dev = loo.loo_temp_dev; - st.st_ino = loo.loo_temp_ino; - st.st_mode = S_IFREG; - rc = 0; - } else { - rc = stat(filename.c_str(), &st); - } + rc = stat(filename.c_str(), &st); if (rc == 0) { if (S_ISDIR(st.st_mode) && this->fc_recursive) { @@ -253,34 +312,52 @@ file_collection::watch_logfile(const std::string& filename, if (this->fc_file_names.find(wilddir) == this->fc_file_names.end()) { - retval.fc_file_names.emplace(wilddir, logfile_open_options()); + file_collection retval; + + retval.fc_file_names.emplace( + wilddir, + logfile_open_options() + .with_non_utf_visibility(false) + .with_visible_size_limit(256 * 1024)); + return lnav::futures::make_ready_future(std::move(retval)); } - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } if (!S_ISREG(st.st_mode)) { if (required) { rc = -1; errno = EINVAL; } else { - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } } - auto err_iter = this->fc_name_to_errors.find(filename); - if (err_iter != this->fc_name_to_errors.end()) { - if (err_iter->second.fei_mtime != st.st_mtime) { - this->fc_name_to_errors.erase(err_iter); + { + safe::WriteAccess<safe_name_to_errors> errs( + *this->fc_name_to_errors); + + auto err_iter = errs->find(filename_key); + if (err_iter != errs->end()) { + if (err_iter->second.fei_mtime != st.st_mtime) { + errs->erase(err_iter); + } } } } if (rc == -1) { if (required) { - retval.fc_name_to_errors.emplace(filename, - file_error_info{ - time(nullptr), - std::string(strerror(errno)), - }); + log_error("failed to open required file: %s -- %s", + filename.c_str(), + strerror(errno)); + file_collection retval; + retval.fc_name_to_errors->writeAccess()->emplace( + filename, + file_error_info{ + time(nullptr), + std::string(strerror(errno)), + }); + return lnav::futures::make_ready_future(std::move(retval)); } - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } if (this->fc_new_stats | lnav::itertools::find_if([&st](const auto& elem) { @@ -289,7 +366,7 @@ file_collection::watch_logfile(const std::string& filename, { // this file is probably a link that we have already scanned in this // pass. - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } this->fc_new_stats.emplace_back(st); @@ -299,92 +376,41 @@ file_collection::watch_logfile(const std::string& filename, if (file_iter == this->fc_files.end()) { if (this->fc_other_files.find(filename) != this->fc_other_files.end()) { - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } require(this->fc_progress.get() != nullptr); auto func = [filename, st, - loo2 = std::move(loo), + loo, prog = this->fc_progress, errs = this->fc_name_to_errors]() mutable { file_collection retval; - if (errs.find(filename) != errs.end()) { - // The file is broken, no reason to try and reopen - return retval; + { + safe::ReadAccess<safe_name_to_errors> errs_inner(*errs); + + if (errs_inner->find(filename) != errs_inner->end()) { + // The file is broken, no reason to try and reopen + return retval; + } } - auto ff = loo2.loo_temp_file ? file_format_t::UNKNOWN - : detect_file_format(filename); + auto ff = detect_file_format(filename); - loo2.loo_file_format = ff; + loo.loo_file_format = ff; switch (ff) { case file_format_t::SQLITE_DB: retval.fc_other_files[filename].ofd_format = ff; break; - case file_format_t::PCAP: { - auto res = pcap_manager::convert(filename); - - if (res.isOk()) { - auto convert_res = res.unwrap(); - - loo2.with_fd(std::move(convert_res.cr_destination)); - retval.fc_child_pollers.emplace_back(child_poller{ - std::move(convert_res.cr_child), - [filename, - st, - error_queue = convert_res.cr_error_queue]( - auto& fc, auto& child) { - if (child.was_normal_exit() - && child.exit_status() == EXIT_SUCCESS) - { - log_info("pcap[%d] exited normally", - child.in()); - return; - } - log_error("pcap[%d] exited with %d", - child.in(), - child.status()); - fc.fc_name_to_errors.emplace( - filename, - file_error_info{ - st.st_mtime, - fmt::format( - FMT_STRING("{}"), - fmt::join(*error_queue, "\n")), - }); - }, - }); - auto open_res = logfile::open(filename, loo2); - if (open_res.isOk()) { - retval.fc_files.push_back(open_res.unwrap()); - } else { - retval.fc_name_to_errors.emplace( - filename, - file_error_info{ - st.st_mtime, - open_res.unwrapErr(), - }); - } - } else { - retval.fc_name_to_errors.emplace(filename, - file_error_info{ - st.st_mtime, - res.unwrapErr(), - }); - } - break; - } - case file_format_t::ARCHIVE: { nonstd::optional< std::list<archive_manager::extract_progress>::iterator> prog_iter_opt; - if (loo2.loo_source == logfile_name_source::ARCHIVE) { + if (loo.loo_source == logfile_name_source::ARCHIVE) { // Don't try to open nested archives return retval; } @@ -431,11 +457,12 @@ file_collection::watch_logfile(const std::string& filename, log_error("archive extraction failed: %s", res.unwrapErr().c_str()); retval.clear(); - retval.fc_name_to_errors.emplace(filename, - file_error_info{ - st.st_mtime, - res.unwrapErr(), - }); + retval.fc_name_to_errors->writeAccess()->emplace( + filename, + file_error_info{ + st.st_mtime, + res.unwrapErr(), + }); } else { retval.fc_other_files[filename] = ff; } @@ -448,14 +475,66 @@ file_collection::watch_logfile(const std::string& filename, break; } - default: + default: { + auto filename_to_open = filename; + + auto eff = detect_mime_type(filename); + + if (eff) { + auto cr = file_converter_manager::convert(eff.value(), + filename); + + if (cr.isErr()) { + retval.fc_name_to_errors->writeAccess()->emplace( + filename, + file_error_info{ + st.st_mtime, + cr.unwrapErr(), + }); + break; + } + + auto convert_res = cr.unwrap(); + retval.fc_child_pollers.emplace_back(child_poller{ + filename, + std::move(convert_res.cr_child), + [filename, + st, + error_queue = convert_res.cr_error_queue]( + auto& fc, auto& child) { + if (child.was_normal_exit() + && child.exit_status() == EXIT_SUCCESS) + { + log_info("converter[%d] exited normally", + child.in()); + return; + } + log_error("converter[%d] exited with %d", + child.in(), + child.status()); + fc.fc_name_to_errors->writeAccess()->emplace( + filename, + file_error_info{ + st.st_mtime, + fmt::format( + FMT_STRING("{}"), + fmt::join(*error_queue, "\n")), + }); + }, + }); + loo.with_filename(filename); + loo.with_stat_for_temp(st); + loo.loo_format_name = eff->eff_format_name; + filename_to_open = convert_res.cr_destination; + } + log_info("loading new file: filename=%s", filename.c_str()); - auto open_res = logfile::open(filename, loo2); + auto open_res = logfile::open(filename_to_open, loo); if (open_res.isOk()) { retval.fc_files.push_back(open_res.unwrap()); } else { - retval.fc_name_to_errors.emplace( + retval.fc_name_to_errors->writeAccess()->emplace( filename, file_error_info{ st.st_mtime, @@ -463,6 +542,7 @@ file_collection::watch_logfile(const std::string& filename, }); } break; + } } return retval; @@ -477,10 +557,13 @@ file_collection::watch_logfile(const std::string& filename, /* The file is already loaded, but has been found under a different * name. We just need to update the stored file name. */ + file_collection retval; + retval.fc_renamed_files.emplace_back(lf, filename); + return lnav::futures::make_ready_future(std::move(retval)); } - return lnav::futures::make_ready_future(std::move(retval)); + return nonstd::nullopt; } /** @@ -506,11 +589,13 @@ file_collection::expand_filename( } } - if (is_url(path.c_str())) { + if (is_url(path)) { return; } - if (glob(path.c_str(), GLOB_NOCHECK, nullptr, gl.inout()) == 0) { + auto filename_key = loo.loo_filename.empty() ? path : loo.loo_filename; + auto glob_flags = lnav::filesystem::is_glob(path) ? 0 : GLOB_NOCHECK; + if (glob(path.c_str(), glob_flags, nullptr, gl.inout()) == 0) { int lpc; if (gl->gl_pathc == 1 /*&& gl.gl_matchc == 0*/) { @@ -579,17 +664,30 @@ file_collection::expand_filename( file_collection retval; if (gl->gl_pathc == 1) { - retval.fc_name_to_errors.emplace(path, - file_error_info{ - time(nullptr), - errmsg, - }); + if (this->fc_name_to_errors->readAccess()->count( + filename_key) + == 0) + { + log_error("failed to find path: %s -- %s", + filename_key.c_str(), + errmsg); + retval.fc_name_to_errors->writeAccess() + ->emplace(filename_key, + file_error_info{ + time(nullptr), + errmsg, + }); + } } else { - retval.fc_name_to_errors.emplace(path_str, - file_error_info{ - time(nullptr), - errmsg, - }); + log_error("failed to find path: %s -- %s", + path_str.c_str(), + errmsg); + retval.fc_name_to_errors->writeAccess()->emplace( + path_str, + file_error_info{ + time(nullptr), + errmsg, + }); } fq.push_back(lnav::futures::make_ready_future( std::move(retval))); @@ -603,7 +701,16 @@ file_collection::expand_filename( } if (required || access(iter->second.c_str(), R_OK) == 0) { - fq.push_back(watch_logfile(iter->second, loo, required)); + auto future_opt = watch_logfile(iter->second, loo, required); + if (future_opt) { + auto fut = std::move(future_opt.value()); + if (fq.push_back(std::move(fut)) + == lnav::futures::future_queue< + file_collection>::processor_result_t::interrupt) + { + break; + } + } } } } @@ -614,18 +721,48 @@ file_collection::rescan_files(bool required) { file_collection retval; lnav::futures::future_queue<file_collection> fq( - [&retval](auto& fc) { retval.merge(fc); }); + [this, &retval](std::future<file_collection>& fc) { + try { + auto v = fc.get(); + retval.merge(v); + } catch (const std::exception& e) { + log_error("rescan future exception: %s", e.what()); + } catch (...) { + log_error("unknown exception thrown by rescan future"); + } + + if (retval.fc_files.size() < 100 + && this->fc_files.size() + retval.fc_files.size() + < get_limits().l_open_files) + { + return lnav::futures::future_queue< + file_collection>::processor_result_t::ok; + } + return lnav::futures::future_queue< + file_collection>::processor_result_t::interrupt; + }); for (auto& pair : this->fc_file_names) { - if (!pair.second.loo_temp_file) { + if (this->fc_files.size() + retval.fc_files.size() + >= get_limits().l_open_files) + { + log_debug("too many files open, breaking..."); + break; + } + + if (pair.second.loo_piper) { + this->expand_filename( + fq, + pair.second.loo_piper->get_out_pattern().string(), + pair.second, + required); + } else { this->expand_filename(fq, pair.first, pair.second, required); if (this->fc_rotated) { std::string path = pair.first + ".*"; this->expand_filename(fq, path, pair.second, false); } - } else if (pair.second.loo_fd.get() != -1) { - fq.push_back(watch_logfile(pair.first, pair.second, required)); } if (retval.fc_files.size() >= 100) { @@ -647,3 +784,54 @@ file_collection::request_close(const std::shared_ptr<logfile>& lf) lf->close(); this->fc_files_generation += 1; } + +size_t +file_collection::active_pipers() const +{ + size_t retval = 0; + for (const auto& pair : this->fc_file_names) { + if (pair.second.loo_piper && !pair.second.loo_piper->is_finished()) { + retval += 1; + } + } + + return retval; +} + +size_t +file_collection::finished_pipers() +{ + size_t retval = 0; + + for (auto& pair : this->fc_file_names) { + if (pair.second.loo_piper) { + retval += pair.second.loo_piper->consume_finished(); + } + } + + return retval; +} + +file_collection +file_collection::copy() +{ + file_collection retval; + + retval.merge(*this); + retval.fc_progress = this->fc_progress; + return retval; +} + +size_t +file_collection::other_file_format_count(file_format_t ff) const +{ + size_t retval = 0; + + for (const auto& pair : this->fc_other_files) { + if (pair.second.ofd_format == ff) { + retval += 1; + } + } + + return retval; +} |