summaryrefslogtreecommitdiffstats
path: root/src/file_collection.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/file_collection.cc462
1 files changed, 325 insertions, 137 deletions
diff --git a/src/file_collection.cc b/src/file_collection.cc
index 19e71c2..96ec69d 100644
--- a/src/file_collection.cc
+++ b/src/file_collection.cc
@@ -35,21 +35,29 @@
#include <glob.h>
+#include "base/fs_util.hh"
#include "base/humanize.network.hh"
#include "base/isc.hh"
#include "base/itertools.hh"
#include "base/opt_util.hh"
#include "base/string_util.hh"
#include "config.h"
-#include "lnav_util.hh"
+#include "file_converter_manager.hh"
#include "logfile.hh"
-#include "pcap_manager.hh"
#include "service_tags.hh"
#include "tailer/tailer.looper.hh"
static std::mutex REALPATH_CACHE_MUTEX;
static std::unordered_map<std::string, std::string> REALPATH_CACHE;
+void
+child_poller::send_sigint()
+{
+ if (this->cp_child) {
+ kill(this->cp_child->in(), SIGINT);
+ }
+}
+
child_poll_result_t
child_poller::poll(file_collection& fc)
{
@@ -72,6 +80,38 @@ child_poller::poll(file_collection& fc)
});
}
+file_collection::limits_t::limits_t()
+{
+ static constexpr rlim_t RESERVED_FDS = 32;
+
+ struct rlimit rl;
+
+ if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
+ this->l_fds = rl.rlim_cur;
+ } else {
+ log_error("getrlimit() failed -- %s", strerror(errno));
+
+ this->l_fds = 8192;
+ }
+
+ if (this->l_fds < RESERVED_FDS) {
+ this->l_open_files = this->l_fds;
+ } else {
+ this->l_open_files = this->l_fds - RESERVED_FDS;
+ }
+
+ log_info(
+ "fd limit: %zu; open file limit: %zu", this->l_fds, this->l_open_files);
+}
+
+const file_collection::limits_t&
+file_collection::get_limits()
+{
+ static const limits_t INSTANCE;
+
+ return INSTANCE;
+}
+
void
file_collection::close_files(const std::vector<std::shared_ptr<logfile>>& files)
{
@@ -116,25 +156,28 @@ file_collection::regenerate_unique_file_names()
upg.generate();
this->fc_largest_path_length = 0;
- for (const auto& pair : this->fc_name_to_errors) {
- auto path = ghc::filesystem::path(pair.first).filename().string();
+ {
+ safe::ReadAccess<safe_name_to_errors> errs(*this->fc_name_to_errors);
+
+ for (const auto& pair : *errs) {
+ auto path = ghc::filesystem::path(pair.first).filename().string();
- if (path.length() > this->fc_largest_path_length) {
- this->fc_largest_path_length = path.length();
+ if (path.length() > this->fc_largest_path_length) {
+ this->fc_largest_path_length = path.length();
+ }
}
}
for (const auto& lf : this->fc_files) {
const auto& path = lf->get_unique_path();
- if (path.length() > this->fc_largest_path_length) {
- this->fc_largest_path_length = path.length();
+ if (path.native().length() > this->fc_largest_path_length) {
+ this->fc_largest_path_length = path.native().length();
}
}
for (const auto& pair : this->fc_other_files) {
switch (pair.second.ofd_format) {
case file_format_t::UNKNOWN:
case file_format_t::ARCHIVE:
- case file_format_t::PCAP:
case file_format_t::SQLITE_DB: {
auto bn = ghc::filesystem::path(pair.first).filename().string();
if (bn.length() > this->fc_largest_path_length) {
@@ -155,19 +198,31 @@ file_collection::regenerate_unique_file_names()
void
file_collection::merge(file_collection& other)
{
+ bool do_regen = !other.fc_files.empty() || !other.fc_other_files.empty()
+ || !other.fc_name_to_errors->readAccess()->empty();
+
this->fc_recursive = this->fc_recursive || other.fc_recursive;
this->fc_rotated = this->fc_rotated || other.fc_rotated;
this->fc_synced_files.insert(other.fc_synced_files.begin(),
other.fc_synced_files.end());
- this->fc_name_to_errors.insert(other.fc_name_to_errors.begin(),
- other.fc_name_to_errors.end());
- this->fc_file_names.insert(
- std::make_move_iterator(other.fc_file_names.begin()),
- std::make_move_iterator(other.fc_file_names.end()));
+
+ std::map<std::string, file_error_info> new_errors;
+ {
+ safe::ReadAccess<safe_name_to_errors> errs(*other.fc_name_to_errors);
+
+ new_errors.insert(errs->cbegin(), errs->cend());
+ }
+ {
+ safe::WriteAccess<safe_name_to_errors> errs(*this->fc_name_to_errors);
+
+ errs->insert(new_errors.begin(), new_errors.end());
+ }
+ this->fc_file_names.insert(other.fc_file_names.begin(),
+ other.fc_file_names.end());
if (!other.fc_files.empty()) {
for (const auto& lf : other.fc_files) {
- this->fc_name_to_errors.erase(lf->get_filename());
+ this->fc_name_to_errors->writeAccess()->erase(lf->get_filename());
}
this->fc_files.insert(
this->fc_files.end(), other.fc_files.begin(), other.fc_files.end());
@@ -187,13 +242,17 @@ file_collection::merge(file_collection& other)
std::make_move_iterator(other.fc_child_pollers.end()));
other.fc_child_pollers.clear();
}
+
+ if (do_regen) {
+ this->regenerate_unique_file_names();
+ }
}
/**
* Functor used to compare files based on their device and inode number.
*/
struct same_file {
- explicit same_file(const struct stat& stat) : sf_stat(stat){};
+ explicit same_file(const struct stat& stat) : sf_stat(stat) {}
/**
* Compare the given log file against the 'stat' given in the constructor.
@@ -203,7 +262,20 @@ struct same_file {
*/
bool operator()(const std::shared_ptr<logfile>& lf) const
{
- return !lf->is_closed() && this->sf_stat.st_dev == lf->get_stat().st_dev
+ if (lf->is_closed()) {
+ return false;
+ }
+
+ const auto& lf_loo = lf->get_open_options();
+
+ if (lf_loo.loo_temp_dev != 0
+ && this->sf_stat.st_dev == lf_loo.loo_temp_dev
+ && this->sf_stat.st_ino == lf_loo.loo_temp_ino)
+ {
+ return true;
+ }
+
+ return this->sf_stat.st_dev == lf->get_stat().st_dev
&& this->sf_stat.st_ino == lf->get_stat().st_ino;
}
@@ -219,33 +291,20 @@ struct same_file {
* @param fd An already-opened descriptor for 'filename'.
* @param required Specifies whether or not the file must exist and be valid.
*/
-std::future<file_collection>
+nonstd::optional<std::future<file_collection>>
file_collection::watch_logfile(const std::string& filename,
logfile_open_options& loo,
bool required)
{
- file_collection retval;
struct stat st;
int rc;
+ auto filename_key = loo.loo_filename.empty() ? filename : loo.loo_filename;
if (this->fc_closed_files.count(filename)) {
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
- if (loo.loo_fd != -1) {
- rc = fstat(loo.loo_fd, &st);
- if (rc == 0) {
- loo.with_stat_for_temp(st);
- }
- } else if (loo.loo_temp_file) {
- memset(&st, 0, sizeof(st));
- st.st_dev = loo.loo_temp_dev;
- st.st_ino = loo.loo_temp_ino;
- st.st_mode = S_IFREG;
- rc = 0;
- } else {
- rc = stat(filename.c_str(), &st);
- }
+ rc = stat(filename.c_str(), &st);
if (rc == 0) {
if (S_ISDIR(st.st_mode) && this->fc_recursive) {
@@ -253,34 +312,52 @@ file_collection::watch_logfile(const std::string& filename,
if (this->fc_file_names.find(wilddir) == this->fc_file_names.end())
{
- retval.fc_file_names.emplace(wilddir, logfile_open_options());
+ file_collection retval;
+
+ retval.fc_file_names.emplace(
+ wilddir,
+ logfile_open_options()
+ .with_non_utf_visibility(false)
+ .with_visible_size_limit(256 * 1024));
+ return lnav::futures::make_ready_future(std::move(retval));
}
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
if (!S_ISREG(st.st_mode)) {
if (required) {
rc = -1;
errno = EINVAL;
} else {
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
}
- auto err_iter = this->fc_name_to_errors.find(filename);
- if (err_iter != this->fc_name_to_errors.end()) {
- if (err_iter->second.fei_mtime != st.st_mtime) {
- this->fc_name_to_errors.erase(err_iter);
+ {
+ safe::WriteAccess<safe_name_to_errors> errs(
+ *this->fc_name_to_errors);
+
+ auto err_iter = errs->find(filename_key);
+ if (err_iter != errs->end()) {
+ if (err_iter->second.fei_mtime != st.st_mtime) {
+ errs->erase(err_iter);
+ }
}
}
}
if (rc == -1) {
if (required) {
- retval.fc_name_to_errors.emplace(filename,
- file_error_info{
- time(nullptr),
- std::string(strerror(errno)),
- });
+ log_error("failed to open required file: %s -- %s",
+ filename.c_str(),
+ strerror(errno));
+ file_collection retval;
+ retval.fc_name_to_errors->writeAccess()->emplace(
+ filename,
+ file_error_info{
+ time(nullptr),
+ std::string(strerror(errno)),
+ });
+ return lnav::futures::make_ready_future(std::move(retval));
}
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
if (this->fc_new_stats | lnav::itertools::find_if([&st](const auto& elem) {
@@ -289,7 +366,7 @@ file_collection::watch_logfile(const std::string& filename,
{
// this file is probably a link that we have already scanned in this
// pass.
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
this->fc_new_stats.emplace_back(st);
@@ -299,92 +376,41 @@ file_collection::watch_logfile(const std::string& filename,
if (file_iter == this->fc_files.end()) {
if (this->fc_other_files.find(filename) != this->fc_other_files.end()) {
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
require(this->fc_progress.get() != nullptr);
auto func = [filename,
st,
- loo2 = std::move(loo),
+ loo,
prog = this->fc_progress,
errs = this->fc_name_to_errors]() mutable {
file_collection retval;
- if (errs.find(filename) != errs.end()) {
- // The file is broken, no reason to try and reopen
- return retval;
+ {
+ safe::ReadAccess<safe_name_to_errors> errs_inner(*errs);
+
+ if (errs_inner->find(filename) != errs_inner->end()) {
+ // The file is broken, no reason to try and reopen
+ return retval;
+ }
}
- auto ff = loo2.loo_temp_file ? file_format_t::UNKNOWN
- : detect_file_format(filename);
+ auto ff = detect_file_format(filename);
- loo2.loo_file_format = ff;
+ loo.loo_file_format = ff;
switch (ff) {
case file_format_t::SQLITE_DB:
retval.fc_other_files[filename].ofd_format = ff;
break;
- case file_format_t::PCAP: {
- auto res = pcap_manager::convert(filename);
-
- if (res.isOk()) {
- auto convert_res = res.unwrap();
-
- loo2.with_fd(std::move(convert_res.cr_destination));
- retval.fc_child_pollers.emplace_back(child_poller{
- std::move(convert_res.cr_child),
- [filename,
- st,
- error_queue = convert_res.cr_error_queue](
- auto& fc, auto& child) {
- if (child.was_normal_exit()
- && child.exit_status() == EXIT_SUCCESS)
- {
- log_info("pcap[%d] exited normally",
- child.in());
- return;
- }
- log_error("pcap[%d] exited with %d",
- child.in(),
- child.status());
- fc.fc_name_to_errors.emplace(
- filename,
- file_error_info{
- st.st_mtime,
- fmt::format(
- FMT_STRING("{}"),
- fmt::join(*error_queue, "\n")),
- });
- },
- });
- auto open_res = logfile::open(filename, loo2);
- if (open_res.isOk()) {
- retval.fc_files.push_back(open_res.unwrap());
- } else {
- retval.fc_name_to_errors.emplace(
- filename,
- file_error_info{
- st.st_mtime,
- open_res.unwrapErr(),
- });
- }
- } else {
- retval.fc_name_to_errors.emplace(filename,
- file_error_info{
- st.st_mtime,
- res.unwrapErr(),
- });
- }
- break;
- }
-
case file_format_t::ARCHIVE: {
nonstd::optional<
std::list<archive_manager::extract_progress>::iterator>
prog_iter_opt;
- if (loo2.loo_source == logfile_name_source::ARCHIVE) {
+ if (loo.loo_source == logfile_name_source::ARCHIVE) {
// Don't try to open nested archives
return retval;
}
@@ -431,11 +457,12 @@ file_collection::watch_logfile(const std::string& filename,
log_error("archive extraction failed: %s",
res.unwrapErr().c_str());
retval.clear();
- retval.fc_name_to_errors.emplace(filename,
- file_error_info{
- st.st_mtime,
- res.unwrapErr(),
- });
+ retval.fc_name_to_errors->writeAccess()->emplace(
+ filename,
+ file_error_info{
+ st.st_mtime,
+ res.unwrapErr(),
+ });
} else {
retval.fc_other_files[filename] = ff;
}
@@ -448,14 +475,66 @@ file_collection::watch_logfile(const std::string& filename,
break;
}
- default:
+ default: {
+ auto filename_to_open = filename;
+
+ auto eff = detect_mime_type(filename);
+
+ if (eff) {
+ auto cr = file_converter_manager::convert(eff.value(),
+ filename);
+
+ if (cr.isErr()) {
+ retval.fc_name_to_errors->writeAccess()->emplace(
+ filename,
+ file_error_info{
+ st.st_mtime,
+ cr.unwrapErr(),
+ });
+ break;
+ }
+
+ auto convert_res = cr.unwrap();
+ retval.fc_child_pollers.emplace_back(child_poller{
+ filename,
+ std::move(convert_res.cr_child),
+ [filename,
+ st,
+ error_queue = convert_res.cr_error_queue](
+ auto& fc, auto& child) {
+ if (child.was_normal_exit()
+ && child.exit_status() == EXIT_SUCCESS)
+ {
+ log_info("converter[%d] exited normally",
+ child.in());
+ return;
+ }
+ log_error("converter[%d] exited with %d",
+ child.in(),
+ child.status());
+ fc.fc_name_to_errors->writeAccess()->emplace(
+ filename,
+ file_error_info{
+ st.st_mtime,
+ fmt::format(
+ FMT_STRING("{}"),
+ fmt::join(*error_queue, "\n")),
+ });
+ },
+ });
+ loo.with_filename(filename);
+ loo.with_stat_for_temp(st);
+ loo.loo_format_name = eff->eff_format_name;
+ filename_to_open = convert_res.cr_destination;
+ }
+
log_info("loading new file: filename=%s", filename.c_str());
- auto open_res = logfile::open(filename, loo2);
+ auto open_res = logfile::open(filename_to_open, loo);
if (open_res.isOk()) {
retval.fc_files.push_back(open_res.unwrap());
} else {
- retval.fc_name_to_errors.emplace(
+ retval.fc_name_to_errors->writeAccess()->emplace(
filename,
file_error_info{
st.st_mtime,
@@ -463,6 +542,7 @@ file_collection::watch_logfile(const std::string& filename,
});
}
break;
+ }
}
return retval;
@@ -477,10 +557,13 @@ file_collection::watch_logfile(const std::string& filename,
/* The file is already loaded, but has been found under a different
* name. We just need to update the stored file name.
*/
+ file_collection retval;
+
retval.fc_renamed_files.emplace_back(lf, filename);
+ return lnav::futures::make_ready_future(std::move(retval));
}
- return lnav::futures::make_ready_future(std::move(retval));
+ return nonstd::nullopt;
}
/**
@@ -506,11 +589,13 @@ file_collection::expand_filename(
}
}
- if (is_url(path.c_str())) {
+ if (is_url(path)) {
return;
}
- if (glob(path.c_str(), GLOB_NOCHECK, nullptr, gl.inout()) == 0) {
+ auto filename_key = loo.loo_filename.empty() ? path : loo.loo_filename;
+ auto glob_flags = lnav::filesystem::is_glob(path) ? 0 : GLOB_NOCHECK;
+ if (glob(path.c_str(), glob_flags, nullptr, gl.inout()) == 0) {
int lpc;
if (gl->gl_pathc == 1 /*&& gl.gl_matchc == 0*/) {
@@ -579,17 +664,30 @@ file_collection::expand_filename(
file_collection retval;
if (gl->gl_pathc == 1) {
- retval.fc_name_to_errors.emplace(path,
- file_error_info{
- time(nullptr),
- errmsg,
- });
+ if (this->fc_name_to_errors->readAccess()->count(
+ filename_key)
+ == 0)
+ {
+ log_error("failed to find path: %s -- %s",
+ filename_key.c_str(),
+ errmsg);
+ retval.fc_name_to_errors->writeAccess()
+ ->emplace(filename_key,
+ file_error_info{
+ time(nullptr),
+ errmsg,
+ });
+ }
} else {
- retval.fc_name_to_errors.emplace(path_str,
- file_error_info{
- time(nullptr),
- errmsg,
- });
+ log_error("failed to find path: %s -- %s",
+ path_str.c_str(),
+ errmsg);
+ retval.fc_name_to_errors->writeAccess()->emplace(
+ path_str,
+ file_error_info{
+ time(nullptr),
+ errmsg,
+ });
}
fq.push_back(lnav::futures::make_ready_future(
std::move(retval)));
@@ -603,7 +701,16 @@ file_collection::expand_filename(
}
if (required || access(iter->second.c_str(), R_OK) == 0) {
- fq.push_back(watch_logfile(iter->second, loo, required));
+ auto future_opt = watch_logfile(iter->second, loo, required);
+ if (future_opt) {
+ auto fut = std::move(future_opt.value());
+ if (fq.push_back(std::move(fut))
+ == lnav::futures::future_queue<
+ file_collection>::processor_result_t::interrupt)
+ {
+ break;
+ }
+ }
}
}
}
@@ -614,18 +721,48 @@ file_collection::rescan_files(bool required)
{
file_collection retval;
lnav::futures::future_queue<file_collection> fq(
- [&retval](auto& fc) { retval.merge(fc); });
+ [this, &retval](std::future<file_collection>& fc) {
+ try {
+ auto v = fc.get();
+ retval.merge(v);
+ } catch (const std::exception& e) {
+ log_error("rescan future exception: %s", e.what());
+ } catch (...) {
+ log_error("unknown exception thrown by rescan future");
+ }
+
+ if (retval.fc_files.size() < 100
+ && this->fc_files.size() + retval.fc_files.size()
+ < get_limits().l_open_files)
+ {
+ return lnav::futures::future_queue<
+ file_collection>::processor_result_t::ok;
+ }
+ return lnav::futures::future_queue<
+ file_collection>::processor_result_t::interrupt;
+ });
for (auto& pair : this->fc_file_names) {
- if (!pair.second.loo_temp_file) {
+ if (this->fc_files.size() + retval.fc_files.size()
+ >= get_limits().l_open_files)
+ {
+ log_debug("too many files open, breaking...");
+ break;
+ }
+
+ if (pair.second.loo_piper) {
+ this->expand_filename(
+ fq,
+ pair.second.loo_piper->get_out_pattern().string(),
+ pair.second,
+ required);
+ } else {
this->expand_filename(fq, pair.first, pair.second, required);
if (this->fc_rotated) {
std::string path = pair.first + ".*";
this->expand_filename(fq, path, pair.second, false);
}
- } else if (pair.second.loo_fd.get() != -1) {
- fq.push_back(watch_logfile(pair.first, pair.second, required));
}
if (retval.fc_files.size() >= 100) {
@@ -647,3 +784,54 @@ file_collection::request_close(const std::shared_ptr<logfile>& lf)
lf->close();
this->fc_files_generation += 1;
}
+
+size_t
+file_collection::active_pipers() const
+{
+ size_t retval = 0;
+ for (const auto& pair : this->fc_file_names) {
+ if (pair.second.loo_piper && !pair.second.loo_piper->is_finished()) {
+ retval += 1;
+ }
+ }
+
+ return retval;
+}
+
+size_t
+file_collection::finished_pipers()
+{
+ size_t retval = 0;
+
+ for (auto& pair : this->fc_file_names) {
+ if (pair.second.loo_piper) {
+ retval += pair.second.loo_piper->consume_finished();
+ }
+ }
+
+ return retval;
+}
+
+file_collection
+file_collection::copy()
+{
+ file_collection retval;
+
+ retval.merge(*this);
+ retval.fc_progress = this->fc_progress;
+ return retval;
+}
+
+size_t
+file_collection::other_file_format_count(file_format_t ff) const
+{
+ size_t retval = 0;
+
+ for (const auto& pair : this->fc_other_files) {
+ if (pair.second.ofd_format == ff) {
+ retval += 1;
+ }
+ }
+
+ return retval;
+}