diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:44:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:44:55 +0000 |
commit | 5068d34c08f951a7ea6257d305a1627b09a95817 (patch) | |
tree | 08213e2be853396a3b07ce15dbe222644dcd9a89 /src/line_buffer.cc | |
parent | Initial commit. (diff) | |
download | lnav-upstream.tar.xz lnav-upstream.zip |
Adding upstream version 0.11.1.upstream/0.11.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/line_buffer.cc | 1441 |
1 files changed, 1441 insertions, 0 deletions
diff --git a/src/line_buffer.cc b/src/line_buffer.cc new file mode 100644 index 0000000..f604b6e --- /dev/null +++ b/src/line_buffer.cc @@ -0,0 +1,1441 @@ +/** + * Copyright (c) 2007-2012, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @file line_buffer.cc + */ + +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include "config.h" + +#ifdef HAVE_BZLIB_H +# include <bzlib.h> +#endif + +#include <algorithm> +#include <set> + +#ifdef HAVE_X86INTRIN_H +# include "simdutf8check.h" +#endif + +#include "base/auto_pid.hh" +#include "base/fs_util.hh" +#include "base/injector.bind.hh" +#include "base/injector.hh" +#include "base/is_utf8.hh" +#include "base/isc.hh" +#include "base/math_util.hh" +#include "base/paths.hh" +#include "fmtlib/fmt/format.h" +#include "line_buffer.hh" +#include "lnav_util.hh" + +using namespace std::chrono_literals; + +static const ssize_t INITIAL_REQUEST_SIZE = 16 * 1024; +static const ssize_t DEFAULT_INCREMENT = 128 * 1024; +static const ssize_t INITIAL_COMPRESSED_BUFFER_SIZE = 5 * 1024 * 1024; +static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024; + +const ssize_t line_buffer::DEFAULT_LINE_BUFFER_SIZE = 256 * 1024; +const ssize_t line_buffer::MAX_LINE_BUFFER_SIZE + = 4 * 4 * line_buffer::DEFAULT_LINE_BUFFER_SIZE; + +class io_looper : public isc::service<io_looper> {}; + +struct io_looper_tag {}; + +static auto bound_io = injector::bind_multiple<isc::service_base>() + .add_singleton<io_looper, io_looper_tag>(); + +namespace injector { +template<> +void +force_linking(io_looper_tag anno) +{ +} +} // namespace injector + +/* + * XXX REMOVE ME + * + * The stock bzipped file code does not use pread, so we need to use a lock to + * get exclusive access to the file. In the future, we should just rewrite + * the bzipped file code to use pread. + */ +class lock_hack { +public: + class guard { + public: + guard() : g_lock(lock_hack::singleton()) { this->g_lock.lock(); } + + ~guard() { this->g_lock.unlock(); } + + private: + lock_hack& g_lock; + }; + + static lock_hack& singleton() + { + static lock_hack retval; + + return retval; + } + + void lock() { lockf(this->lh_fd, F_LOCK, 0); } + + void unlock() { lockf(this->lh_fd, F_ULOCK, 0); } + +private: + lock_hack() + { + char lockname[64]; + + snprintf(lockname, sizeof(lockname), "/tmp/lnav.%d.lck", getpid()); + this->lh_fd = open(lockname, O_CREAT | O_RDWR, 0600); + log_perror(fcntl(this->lh_fd, F_SETFD, FD_CLOEXEC)); + unlink(lockname); + } + + auto_fd lh_fd; +}; +/* XXX END */ + +#define Z_BUFSIZE 65536U +#define SYNCPOINT_SIZE (1024 * 1024) +line_buffer::gz_indexed::gz_indexed() +{ + if ((this->inbuf = (Bytef*) malloc(Z_BUFSIZE)) == NULL) { + throw std::bad_alloc(); + } +} + +void +line_buffer::gz_indexed::close() +{ + // Release old stream, if we were open + if (*this) { + inflateEnd(&this->strm); + ::close(this->gz_fd); + this->syncpoints.clear(); + this->gz_fd = -1; + } +} + +void +line_buffer::gz_indexed::init_stream() +{ + if (*this) { + inflateEnd(&this->strm); + } + + // initialize inflate struct + this->strm.zalloc = Z_NULL; + this->strm.zfree = Z_NULL; + this->strm.opaque = Z_NULL; + this->strm.avail_in = 0; + this->strm.next_in = Z_NULL; + this->strm.avail_out = 0; + int rc = inflateInit2(&strm, GZ_HEADER_MODE); + if (rc != Z_OK) { + throw(rc); // FIXME: exception wrapper + } +} + +void +line_buffer::gz_indexed::continue_stream() +{ + // Save our position and output buffer + auto total_in = this->strm.total_in; + auto total_out = this->strm.total_out; + auto avail_out = this->strm.avail_out; + auto next_out = this->strm.next_out; + + init_stream(); + + // Restore position and output buffer + this->strm.total_in = total_in; + this->strm.total_out = total_out; + this->strm.avail_out = avail_out; + this->strm.next_out = next_out; +} + +void +line_buffer::gz_indexed::open(int fd, header_data& hd) +{ + this->close(); + this->init_stream(); + this->gz_fd = fd; + + unsigned char name[1024]; + unsigned char comment[4096]; + + name[0] = '\0'; + comment[0] = '\0'; + + gz_header gz_hd; + memset(&gz_hd, 0, sizeof(gz_hd)); + gz_hd.name = name; + gz_hd.name_max = sizeof(name); + gz_hd.comment = comment; + gz_hd.comm_max = sizeof(comment); + + Bytef inbuf[8192]; + Bytef outbuf[8192]; + this->strm.next_out = outbuf; + this->strm.total_out = 0; + this->strm.avail_out = sizeof(outbuf); + this->strm.next_in = inbuf; + this->strm.total_in = 0; + + if (inflateGetHeader(&this->strm, &gz_hd) == Z_OK) { + auto rc = pread(fd, inbuf, sizeof(inbuf), 0); + if (rc >= 0) { + this->strm.avail_in = rc; + + inflate(&this->strm, Z_BLOCK); + inflateEnd(&this->strm); + + this->strm.next_out = Z_NULL; + this->strm.next_in = Z_NULL; + this->strm.next_in = Z_NULL; + this->strm.total_in = 0; + this->strm.avail_in = 0; + this->init_stream(); + + switch (gz_hd.done) { + case 0: + log_debug("%d: no gzip header data", fd); + break; + case 1: + hd.hd_mtime.tv_sec = gz_hd.time; + hd.hd_name = std::string((char*) name); + hd.hd_comment = std::string((char*) comment); + break; + default: + log_error("%d: failed to read gzip header data", fd); + break; + } + } else { + log_error("%d: failed to read gzip header from file: %s", + fd, + strerror(errno)); + } + } else { + log_error("%d: unable to get gzip header", fd); + } +} + +int +line_buffer::gz_indexed::stream_data(void* buf, size_t size) +{ + this->strm.avail_out = size; + this->strm.next_out = (unsigned char*) buf; + + size_t last = this->syncpoints.empty() ? 0 : this->syncpoints.back().in; + while (this->strm.avail_out) { + if (!this->strm.avail_in) { + int rc = ::pread( + this->gz_fd, &this->inbuf[0], Z_BUFSIZE, this->strm.total_in); + if (rc < 0) { + return rc; + } + this->strm.next_in = this->inbuf; + this->strm.avail_in = rc; + } + if (this->strm.avail_in) { + int flush = last > this->strm.total_in ? Z_SYNC_FLUSH : Z_BLOCK; + auto err = inflate(&this->strm, flush); + if (err == Z_STREAM_END) { + // Reached end of stream; re-init for a possible subsequent + // stream + continue_stream(); + } else if (err != Z_OK) { + log_error(" inflate-error: %d %s", + (int) err, + this->strm.msg ? this->strm.msg : ""); + break; + } + + if (this->strm.total_in >= last + SYNCPOINT_SIZE + && size > this->strm.avail_out + GZ_WINSIZE + && (this->strm.data_type & GZ_END_OF_BLOCK_MASK) + && !(this->strm.data_type & GZ_END_OF_FILE_MASK)) + { + this->syncpoints.emplace_back(this->strm, size); + last = this->strm.total_out; + } + } else if (this->strm.avail_out) { + // Processed all the gz file data but didn't fill + // the output buffer. We're done, even though we + // produced fewer bytes than requested. + break; + } + } + return size - this->strm.avail_out; +} + +void +line_buffer::gz_indexed::seek(off_t offset) +{ + if ((size_t) offset == this->strm.total_out) { + return; + } + + indexDict* dict = nullptr; + // Find highest syncpoint not past offset + // FIXME: Make this a binary-tree search + for (auto& d : this->syncpoints) { + if (d.out <= offset) { + dict = &d; + } else { + break; + } + } + + // Choose highest available syncpoint, or keep current offset if it's ok + if ((size_t) offset < this->strm.total_out + || (dict && this->strm.total_out < (size_t) dict->out)) + { + // Release the old z_stream + inflateEnd(&this->strm); + if (dict) { + dict->apply(&this->strm); + } else { + init_stream(); + } + } + + // Stream from compressed file until we reach our offset + unsigned char dummy[Z_BUFSIZE]; + while ((size_t) offset > this->strm.total_out) { + size_t to_copy + = std::min(static_cast<size_t>(Z_BUFSIZE), + static_cast<size_t>(offset - this->strm.total_out)); + auto bytes = stream_data(dummy, to_copy); + if (bytes <= 0) { + break; + } + } +} + +int +line_buffer::gz_indexed::read(void* buf, size_t offset, size_t size) +{ + if (offset != this->strm.total_out) { + // log_debug("doing seek! %d %d", offset, this->strm.total_out); + this->seek(offset); + } + + int bytes = stream_data(buf, size); + + return bytes; +} + +line_buffer::line_buffer() +{ + ensure(this->invariant()); +} + +line_buffer::~line_buffer() +{ + auto empty_fd = auto_fd(); + + // Make sure any shared refs take ownership of the data. + this->lb_share_manager.invalidate_refs(); + this->set_fd(empty_fd); +} + +void +line_buffer::set_fd(auto_fd& fd) +{ + file_off_t newoff = 0; + + { + safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file); + + if (*gi) { + gi->close(); + } + } + + if (this->lb_bz_file) { + this->lb_bz_file = false; + } + + if (fd != -1) { + /* Sync the fd's offset with the object. */ + newoff = lseek(fd, 0, SEEK_CUR); + if (newoff == -1) { + if (errno != ESPIPE) { + throw error(errno); + } + + /* It's a pipe, start with a zero offset. */ + newoff = 0; + this->lb_seekable = false; + } else { + char gz_id[2 + 1 + 1 + 4]; + + if (pread(fd, gz_id, sizeof(gz_id), 0) == sizeof(gz_id)) { + if (gz_id[0] == '\037' && gz_id[1] == '\213') { + int gzfd = dup(fd); + + log_perror(fcntl(gzfd, F_SETFD, FD_CLOEXEC)); + if (lseek(fd, 0, SEEK_SET) < 0) { + close(gzfd); + throw error(errno); + } + this->lb_gz_file.writeAccess()->open(gzfd, this->lb_header); + this->lb_compressed = true; + this->lb_file_time = this->lb_header.hd_mtime.tv_sec; + if (this->lb_file_time < 0) { + this->lb_file_time = 0; + } + this->lb_compressed_offset + = lseek(this->lb_fd, 0, SEEK_CUR); + this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE); + } +#ifdef HAVE_BZLIB_H + else if (gz_id[0] == 'B' && gz_id[1] == 'Z') + { + if (lseek(fd, 0, SEEK_SET) < 0) { + throw error(errno); + } + this->lb_bz_file = true; + this->lb_compressed = true; + + /* + * Loading data from a bzip2 file is pretty slow, so we try + * to keep as much in memory as possible. + */ + this->resize_buffer(INITIAL_COMPRESSED_BUFFER_SIZE); + + this->lb_compressed_offset = 0; + } +#endif + } + this->lb_seekable = true; + } + } + this->lb_file_offset = newoff; + this->lb_buffer.clear(); + this->lb_fd = std::move(fd); + + ensure(this->invariant()); +} + +void +line_buffer::resize_buffer(size_t new_max) +{ + if (new_max <= MAX_LINE_BUFFER_SIZE + && new_max > (size_t) this->lb_buffer.capacity()) + { + /* Still need more space, try a realloc. */ + this->lb_share_manager.invalidate_refs(); + this->lb_buffer.expand_to(new_max); + } +} + +void +line_buffer::ensure_available(file_off_t start, ssize_t max_length) +{ + ssize_t prefill, available; + + require(this->lb_compressed || max_length <= MAX_LINE_BUFFER_SIZE); + + // log_debug("ensure avail %d %d", start, max_length); + + if (this->lb_file_size != -1) { + if (start + (file_off_t) max_length > this->lb_file_size) { + max_length = (this->lb_file_size - start); + } + } + + /* + * Check to see if the start is inside the cached range or immediately + * after. + */ + if (start < this->lb_file_offset + || start > (file_off_t) (this->lb_file_offset + this->lb_buffer.size())) + { + /* + * The request is outside the cached range, need to reload the + * whole thing. + */ + this->lb_share_manager.invalidate_refs(); + prefill = 0; + this->lb_buffer.clear(); + if ((this->lb_file_size != (ssize_t) -1) + && (start + this->lb_buffer.capacity() > this->lb_file_size)) + { + require(start <= this->lb_file_size); + /* + * If the start is near the end of the file, move the offset back a + * bit so we can get more of the file in the cache. + */ + this->lb_file_offset = this->lb_file_size + - std::min(this->lb_file_size, + (file_ssize_t) this->lb_buffer.capacity()); + } else { + this->lb_file_offset = start; + } + } else { + /* The request is in the cached range. Record how much extra data is in + * the buffer before the requested range. + */ + prefill = start - this->lb_file_offset; + } + require(this->lb_file_offset <= start); + require(prefill <= this->lb_buffer.size()); + + available = this->lb_buffer.capacity() - (start - this->lb_file_offset); + require(available <= this->lb_buffer.capacity()); + + if (max_length > available) { + // log_debug("need more space!"); + /* + * Need more space, move any existing data to the front of the + * buffer. + */ + this->lb_share_manager.invalidate_refs(); + + this->lb_buffer.resize_by(-prefill); + this->lb_file_offset += prefill; + // log_debug("adjust file offset for prefill %d", this->lb_file_offset); + memmove(this->lb_buffer.at(0), + this->lb_buffer.at(prefill), + this->lb_buffer.size()); + + available = this->lb_buffer.capacity() - (start - this->lb_file_offset); + if (max_length > available) { + this->resize_buffer(roundup_size(max_length, DEFAULT_INCREMENT)); + } + } + this->lb_line_starts.clear(); + this->lb_line_is_utf.clear(); +} + +bool +line_buffer::load_next_buffer() +{ + // log_debug("loader here!"); + auto retval = false; + auto start = this->lb_loader_file_offset.value(); + ssize_t rc = 0; + safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file); + + // log_debug("BEGIN preload read"); + /* ... read in the new data. */ + if (!this->lb_cached_fd && *gi) { + if (this->lb_file_size != (ssize_t) -1 && this->in_range(start) + && this->in_range(this->lb_file_size - 1)) + { + rc = 0; + } else { + // log_debug("async decomp start"); + rc = gi->read(this->lb_alt_buffer.value().end(), + start + this->lb_alt_buffer.value().size(), + this->lb_alt_buffer.value().available()); + this->lb_compressed_offset = gi->get_source_offset(); + if (rc != -1 && (rc < this->lb_alt_buffer.value().available()) + && (start + this->lb_alt_buffer.value().size() + rc + > this->lb_file_size)) + { + this->lb_file_size + = (start + this->lb_alt_buffer.value().size() + rc); + } +#if 0 + log_debug("async decomp end %d+%d:%d", + this->lb_alt_buffer->size(), + rc, + this->lb_alt_buffer->capacity()); +#endif + } + } +#ifdef HAVE_BZLIB_H + else if (!this->lb_cached_fd && this->lb_bz_file) + { + if (this->lb_file_size != (ssize_t) -1 + && (((ssize_t) start >= this->lb_file_size) + || (this->in_range(start) + && this->in_range(this->lb_file_size - 1)))) + { + rc = 0; + } else { + lock_hack::guard guard; + char scratch[32 * 1024]; + BZFILE* bz_file; + file_off_t seek_to; + int bzfd; + + /* + * Unfortunately, there is no bzseek, so we need to reopen the + * file every time we want to do a read. + */ + bzfd = dup(this->lb_fd); + if (lseek(this->lb_fd, 0, SEEK_SET) < 0) { + close(bzfd); + throw error(errno); + } + if ((bz_file = BZ2_bzdopen(bzfd, "r")) == nullptr) { + close(bzfd); + if (errno == 0) { + throw std::bad_alloc(); + } else { + throw error(errno); + } + } + + seek_to = start + this->lb_alt_buffer.value().size(); + while (seek_to > 0) { + int count; + + count = BZ2_bzread(bz_file, + scratch, + std::min((size_t) seek_to, sizeof(scratch))); + seek_to -= count; + } + rc = BZ2_bzread(bz_file, + this->lb_alt_buffer->end(), + this->lb_alt_buffer->available()); + this->lb_compressed_offset = lseek(bzfd, 0, SEEK_SET); + BZ2_bzclose(bz_file); + + if (rc != -1 && (rc < (this->lb_alt_buffer.value().available())) + && (start + this->lb_alt_buffer.value().size() + rc + > this->lb_file_size)) + { + this->lb_file_size + = (start + this->lb_alt_buffer.value().size() + rc); + } + } + } +#endif + else + { + rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get() + : this->lb_fd.get(), + this->lb_alt_buffer.value().end(), + this->lb_alt_buffer.value().available(), + start + this->lb_alt_buffer.value().size()); + } + // XXX For some reason, cygwin is giving us a bogus return value when + // up to the end of the file. + if (rc > (this->lb_alt_buffer.value().available())) { + rc = -1; +#ifdef ENODATA + errno = ENODATA; +#else + errno = EAGAIN; +#endif + } + switch (rc) { + case 0: + if (start < (file_off_t) this->lb_file_size) { + retval = true; + } + break; + + case (ssize_t) -1: + switch (errno) { +#ifdef ENODATA + /* Cygwin seems to return this when pread reaches the end of + * the file. */ + case ENODATA: +#endif + case EINTR: + case EAGAIN: + break; + + default: + throw error(errno); + } + break; + + default: + this->lb_alt_buffer.value().resize_by(rc); + retval = true; + break; + } + // log_debug("END preload read"); + + if (start > this->lb_last_line_offset) { + auto* line_start = this->lb_alt_buffer.value().begin(); + + do { + const char* msg = nullptr; + int faulty_bytes = 0; + bool valid_utf = true; + char* lf = nullptr; + + auto before = line_start - this->lb_alt_buffer->begin(); + auto remaining = this->lb_alt_buffer.value().size() - before; + auto utf_scan_res = is_utf8((unsigned char*) line_start, + remaining, + &msg, + &faulty_bytes, + '\n'); + if (msg != nullptr) { + lf = (char*) memchr(line_start, '\n', remaining); + utf_scan_res.usr_end = lf - line_start; + valid_utf = false; + } + if (utf_scan_res.usr_end >= 0) { + lf = line_start + utf_scan_res.usr_end; + } + this->lb_alt_line_starts.emplace_back(before); + this->lb_alt_line_is_utf.emplace_back(valid_utf); + this->lb_alt_line_has_ansi.emplace_back(utf_scan_res.usr_has_ansi); + + if (lf != nullptr) { + line_start = lf + 1; + } else { + line_start = nullptr; + } + } while (line_start != nullptr + && line_start < this->lb_alt_buffer->end()); + } + + return retval; +} + +bool +line_buffer::fill_range(file_off_t start, ssize_t max_length) +{ + bool retval = false; + + require(start >= 0); + + // log_debug("fill range %d %d", start, max_length); +#if 0 + log_debug("(%p) fill range %d %d (%d) %d", + this, + start, + max_length, + this->lb_file_offset, + this->lb_buffer.size()); +#endif + if (!lnav::pid::in_child && this->lb_loader_future.valid() + && start >= this->lb_loader_file_offset.value()) + { +#if 0 + log_debug("getting preload! %d %d", + start, + this->lb_loader_file_offset.value()); +#endif + nonstd::optional<std::chrono::system_clock::time_point> wait_start; + + if (this->lb_loader_future.wait_for(std::chrono::seconds(0)) + != std::future_status::ready) + { + wait_start + = nonstd::make_optional(std::chrono::system_clock::now()); + } + retval = this->lb_loader_future.get(); + if (false && wait_start) { + auto diff = std::chrono::system_clock::now() - wait_start.value(); + log_debug("wait done! %d", diff.count()); + } + // log_debug("got preload"); + this->lb_loader_future = {}; + this->lb_share_manager.invalidate_refs(); + this->lb_file_offset = this->lb_loader_file_offset.value(); + this->lb_loader_file_offset = nonstd::nullopt; + this->lb_buffer.swap(this->lb_alt_buffer.value()); + this->lb_alt_buffer.value().clear(); + this->lb_line_starts = std::move(this->lb_alt_line_starts); + this->lb_alt_line_starts.clear(); + this->lb_line_is_utf = std::move(this->lb_alt_line_is_utf); + this->lb_alt_line_is_utf.clear(); + this->lb_line_has_ansi = std::move(this->lb_alt_line_has_ansi); + this->lb_alt_line_has_ansi.clear(); + this->lb_stats.s_used_preloads += 1; + } + if (this->in_range(start) && this->in_range(start + max_length - 1)) { + /* Cache already has the data, nothing to do. */ + retval = true; + if (!lnav::pid::in_child && this->lb_seekable && this->lb_buffer.full() + && !this->lb_loader_file_offset) + { + // log_debug("loader available start=%d", start); + auto last_lf_iter = std::find( + this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n'); + if (last_lf_iter != this->lb_buffer.rend()) { + auto usable_size + = std::distance(last_lf_iter, this->lb_buffer.rend()); + // log_debug("found linefeed %d", usable_size); + if (!this->lb_alt_buffer) { + // log_debug("allocating new buffer!"); + this->lb_alt_buffer + = auto_buffer::alloc(this->lb_buffer.capacity()); + } + this->lb_alt_buffer->resize(this->lb_buffer.size() + - usable_size); + memcpy(this->lb_alt_buffer.value().begin(), + this->lb_buffer.at(usable_size), + this->lb_alt_buffer->size()); + this->lb_loader_file_offset + = this->lb_file_offset + usable_size; +#if 0 + log_debug("load offset %d", + this->lb_loader_file_offset.value()); + log_debug("launch loader"); +#endif + auto prom = std::make_shared<std::promise<bool>>(); + this->lb_loader_future = prom->get_future(); + this->lb_stats.s_requested_preloads += 1; + isc::to<io_looper&, io_looper_tag>().send( + [this, prom](auto& ioloop) mutable { + prom->set_value(this->load_next_buffer()); + }); + } + } + } else if (this->lb_fd != -1) { + ssize_t rc; + + /* Make sure there is enough space, then */ + this->ensure_available(start, max_length); + + safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file); + + /* ... read in the new data. */ + if (!this->lb_cached_fd && *gi) { + // log_debug("old decomp start"); + if (this->lb_file_size != (ssize_t) -1 && this->in_range(start) + && this->in_range(this->lb_file_size - 1)) + { + rc = 0; + } else { + this->lb_stats.s_decompressions += 1; + if (false && this->lb_last_line_offset > 0) { + this->lb_stats.s_hist[(this->lb_file_offset * 10) + / this->lb_last_line_offset] + += 1; + } + rc = gi->read(this->lb_buffer.end(), + this->lb_file_offset + this->lb_buffer.size(), + this->lb_buffer.available()); + this->lb_compressed_offset = gi->get_source_offset(); + if (rc != -1 && (rc < this->lb_buffer.available())) { + this->lb_file_size + = (this->lb_file_offset + this->lb_buffer.size() + rc); + } + } +#if 0 + log_debug("old decomp end -- %d+%d:%d", + this->lb_buffer.size(), + rc, + this->lb_buffer.capacity()); +#endif + } +#ifdef HAVE_BZLIB_H + else if (!this->lb_cached_fd && this->lb_bz_file) + { + if (this->lb_file_size != (ssize_t) -1 + && (((ssize_t) start >= this->lb_file_size) + || (this->in_range(start) + && this->in_range(this->lb_file_size - 1)))) + { + rc = 0; + } else { + lock_hack::guard guard; + char scratch[32 * 1024]; + BZFILE* bz_file; + file_off_t seek_to; + int bzfd; + + /* + * Unfortunately, there is no bzseek, so we need to reopen the + * file every time we want to do a read. + */ + bzfd = dup(this->lb_fd); + if (lseek(this->lb_fd, 0, SEEK_SET) < 0) { + close(bzfd); + throw error(errno); + } + if ((bz_file = BZ2_bzdopen(bzfd, "r")) == NULL) { + close(bzfd); + if (errno == 0) { + throw std::bad_alloc(); + } else { + throw error(errno); + } + } + + seek_to = this->lb_file_offset + this->lb_buffer.size(); + while (seek_to > 0) { + int count; + + count = BZ2_bzread( + bz_file, + scratch, + std::min((size_t) seek_to, sizeof(scratch))); + seek_to -= count; + } + rc = BZ2_bzread(bz_file, + this->lb_buffer.end(), + this->lb_buffer.available()); + this->lb_compressed_offset = lseek(bzfd, 0, SEEK_SET); + BZ2_bzclose(bz_file); + + if (rc != -1 && (rc < (this->lb_buffer.available()))) { + this->lb_file_size + = (this->lb_file_offset + this->lb_buffer.size() + rc); + } + } + } +#endif + else if (this->lb_seekable) + { + this->lb_stats.s_preads += 1; + if (false && this->lb_last_line_offset > 0) { + this->lb_stats.s_hist[(this->lb_file_offset * 10) + / this->lb_last_line_offset] + += 1; + } +#if 0 + log_debug("%d: pread %lld", + this->lb_fd.get(), + this->lb_file_offset + this->lb_buffer.size()); +#endif + rc = pread(this->lb_cached_fd ? this->lb_cached_fd.value().get() + : this->lb_fd.get(), + this->lb_buffer.end(), + this->lb_buffer.available(), + this->lb_file_offset + this->lb_buffer.size()); + // log_debug("pread rc %d", rc); + } else { + rc = read(this->lb_fd, + this->lb_buffer.end(), + this->lb_buffer.available()); + } + // XXX For some reason, cygwin is giving us a bogus return value when + // up to the end of the file. + if (rc > (this->lb_buffer.available())) { + rc = -1; +#ifdef ENODATA + errno = ENODATA; +#else + errno = EAGAIN; +#endif + } + switch (rc) { + case 0: + if (!this->lb_seekable) { + this->lb_file_size + = this->lb_file_offset + this->lb_buffer.size(); + } + if (start < (file_off_t) this->lb_file_size) { + retval = true; + } + + if (this->lb_compressed) { + /* + * For compressed files, increase the buffer size so we + * don't have to spend as much time uncompressing the data. + */ + this->resize_buffer(MAX_COMPRESSED_BUFFER_SIZE); + } + break; + + case (ssize_t) -1: + switch (errno) { +#ifdef ENODATA + /* Cygwin seems to return this when pread reaches the end of + * the */ + /* file. */ + case ENODATA: +#endif + case EINTR: + case EAGAIN: + break; + + default: + throw error(errno); + } + break; + + default: + this->lb_buffer.resize_by(rc); + retval = true; + break; + } + + if (!lnav::pid::in_child && this->lb_seekable && this->lb_buffer.full() + && !this->lb_loader_file_offset) + { + // log_debug("loader available2 start=%d", start); + auto last_lf_iter = std::find( + this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n'); + if (last_lf_iter != this->lb_buffer.rend()) { + auto usable_size + = std::distance(last_lf_iter, this->lb_buffer.rend()); + // log_debug("found linefeed %d", usable_size); + if (!this->lb_alt_buffer) { + // log_debug("allocating new buffer!"); + this->lb_alt_buffer + = auto_buffer::alloc(this->lb_buffer.capacity()); + } else if (this->lb_alt_buffer->capacity() + < this->lb_buffer.capacity()) + { + this->lb_alt_buffer->expand_to(this->lb_buffer.capacity()); + } + this->lb_alt_buffer->resize(this->lb_buffer.size() + - usable_size); + memcpy(this->lb_alt_buffer->begin(), + this->lb_buffer.at(usable_size), + this->lb_alt_buffer->size()); + this->lb_loader_file_offset + = this->lb_file_offset + usable_size; +#if 0 + log_debug("load offset %d", + this->lb_loader_file_offset.value()); + log_debug("launch loader"); +#endif + auto prom = std::make_shared<std::promise<bool>>(); + this->lb_loader_future = prom->get_future(); + this->lb_stats.s_requested_preloads += 1; + isc::to<io_looper&, io_looper_tag>().send( + [this, prom](auto& ioloop) mutable { + prom->set_value(this->load_next_buffer()); + }); + } + } + ensure(this->lb_buffer.size() <= this->lb_buffer.capacity()); + } + + return retval; +} + +Result<line_info, std::string> +line_buffer::load_next_line(file_range prev_line) +{ + bool done = false; + line_info retval; + + require(this->lb_fd != -1); + + auto offset = prev_line.next_offset(); + ssize_t request_size = INITIAL_REQUEST_SIZE; + retval.li_file_range.fr_offset = offset; + if (this->lb_buffer.empty() || !this->in_range(offset)) { + this->fill_range(offset, this->lb_buffer.capacity()); + } else if (offset == this->lb_file_offset + this->lb_buffer.size()) { + if (!this->fill_range(offset, INITIAL_REQUEST_SIZE)) { + retval.li_file_range.fr_offset = offset; + retval.li_file_range.fr_size = 0; + if (this->is_pipe()) { + retval.li_partial = !this->is_pipe_closed(); + } else { + retval.li_partial = true; + } + return Ok(retval); + } + } + while (!done) { + auto old_retval_size = retval.li_file_range.fr_size; + const char *line_start, *lf; + + /* Find the data in the cache and */ + line_start = this->get_range(offset, retval.li_file_range.fr_size); + /* ... look for the end-of-line or end-of-file. */ + ssize_t utf8_end = -1; + + bool found_in_cache = false; + if (!this->lb_line_starts.empty()) { + auto buffer_offset = offset - this->lb_file_offset; + + auto start_iter = std::lower_bound(this->lb_line_starts.begin(), + this->lb_line_starts.end(), + buffer_offset); + if (start_iter != this->lb_line_starts.end()) { + auto next_line_iter = start_iter + 1; + + // log_debug("found offset %d %d", buffer_offset, *start_iter); + if (next_line_iter != this->lb_line_starts.end()) { + utf8_end = *next_line_iter - 1 - *start_iter; + found_in_cache = true; + } else { + // log_debug("no next iter"); + } + } else { + // log_debug("no buffer_offset found"); + } + } + + if (!found_in_cache) { + const char* msg; + int faulty_bytes; + + auto scan_res = is_utf8((unsigned char*) line_start, + retval.li_file_range.fr_size, + &msg, + &faulty_bytes, + '\n'); + if (msg != nullptr) { + lf = (char*) memchr( + line_start, '\n', retval.li_file_range.fr_size); + utf8_end = lf - line_start; + retval.li_valid_utf = false; + } else { + utf8_end = scan_res.usr_end; + } + retval.li_has_ansi = scan_res.usr_has_ansi; + } + + if (utf8_end >= 0) { + lf = line_start + utf8_end; + } else { + lf = nullptr; + } + + auto got_new_data = old_retval_size != retval.li_file_range.fr_size; +#if 0 + log_debug("load next loop %p reqsize %d lsize %d", + lf, + request_size, + retval.li_file_range.fr_size); +#endif + if (lf != nullptr + || (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) + || (request_size >= MAX_LINE_BUFFER_SIZE) + || (!got_new_data + && (!this->is_pipe() || request_size > DEFAULT_INCREMENT))) + { + if ((lf != nullptr) + && ((size_t) (lf - line_start) >= MAX_LINE_BUFFER_SIZE - 1)) + { + lf = nullptr; + } + if (lf != nullptr) { + retval.li_partial = false; + retval.li_file_range.fr_size = lf - line_start; + // delim + retval.li_file_range.fr_size += 1; + if (offset >= this->lb_last_line_offset) { + this->lb_last_line_offset + = offset + retval.li_file_range.fr_size; + } + } else { + if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) { + log_warning("Line exceeded max size: offset=%d", offset); + retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1; + retval.li_partial = false; + } else { + retval.li_partial = true; + } + this->ensure_available(offset, retval.li_file_range.fr_size); + + if (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE) { + retval.li_file_range.fr_size = MAX_LINE_BUFFER_SIZE - 1; + } + if (retval.li_partial) { + /* + * Since no delimiter was seen, we need to remember the + * offset of the last line in the file so we don't + * mistakenly return two partial lines to the caller. + * + * 1. read_line() - returns partial line + * 2. file is written + * 3. read_line() - returns the middle of partial line. + */ + this->lb_last_line_offset = offset; + } else if (offset >= this->lb_last_line_offset) { + this->lb_last_line_offset + = offset + retval.li_file_range.fr_size; + } + } + + offset += retval.li_file_range.fr_size; + + done = true; + } else { + if (!this->is_pipe() || !this->is_pipe_closed()) { + retval.li_partial = true; + } + request_size + = std::min<ssize_t>(this->lb_buffer.size() + DEFAULT_INCREMENT, + MAX_LINE_BUFFER_SIZE); + } + + if (!done + && !this->fill_range( + offset, + std::max(request_size, (ssize_t) this->lb_buffer.available()))) + { + break; + } + } + + ensure(retval.li_file_range.fr_size <= this->lb_buffer.size()); + ensure(this->invariant()); +#if 0 + log_debug("got line part %d %d", + retval.li_file_range.fr_offset, + (int) retval.li_partial); +#endif + return Ok(retval); +} + +Result<shared_buffer_ref, std::string> +line_buffer::read_range(const file_range fr) +{ + shared_buffer_ref retval; + const char* line_start; + file_ssize_t avail; + + if (this->lb_last_line_offset != -1 + && fr.fr_offset > this->lb_last_line_offset) + { + /* + * Don't return anything past the last known line. The caller needs + * to try reading at the offset of the last line again. + */ + return Err( + fmt::format(FMT_STRING("attempt to read past the known end of the " + "file: read-offset={}; last_line_offset={}"), + fr.fr_offset, + this->lb_last_line_offset)); + } + + if (!(this->in_range(fr.fr_offset) + && this->in_range(fr.fr_offset + fr.fr_size - 1))) + { + if (!this->fill_range(fr.fr_offset, fr.fr_size)) { + return Err(std::string("unable to read file")); + } + } + line_start = this->get_range(fr.fr_offset, avail); + + if (fr.fr_size > avail) { + return Err(fmt::format( + FMT_STRING("short-read (need: {}; avail: {})"), fr.fr_size, avail)); + } + retval.share(this->lb_share_manager, line_start, fr.fr_size); + retval.get_metadata() = fr.fr_metadata; + + return Ok(std::move(retval)); +} + +file_range +line_buffer::get_available() +{ + return {this->lb_file_offset, + static_cast<file_ssize_t>(this->lb_buffer.size())}; +} + +line_buffer::gz_indexed::indexDict::indexDict(const z_stream& s, + const file_size_t size) +{ + assert((s.data_type & GZ_END_OF_BLOCK_MASK)); + assert(!(s.data_type & GZ_END_OF_FILE_MASK)); + assert(size >= s.avail_out + GZ_WINSIZE); + this->bits = s.data_type & GZ_BORROW_BITS_MASK; + this->in = s.total_in; + this->out = s.total_out; + auto last_byte_in = s.next_in[-1]; + this->in_bits = last_byte_in >> (8 - this->bits); + // Copy the last 32k uncompressed data (sliding window) to our + // index + memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE); +} + +int +line_buffer::gz_indexed::indexDict::apply(z_streamp s) +{ + s->zalloc = Z_NULL; + s->zfree = Z_NULL; + s->opaque = Z_NULL; + s->avail_in = 0; + s->next_in = Z_NULL; + auto ret = inflateInit2(s, GZ_RAW_MODE); + if (ret != Z_OK) { + return ret; + } + if (this->bits) { + inflatePrime(s, this->bits, this->in_bits); + } + s->total_in = this->in; + s->total_out = this->out; + inflateSetDictionary(s, this->index, GZ_WINSIZE); + return ret; +} + +bool +line_buffer::is_likely_to_flush(file_range prev_line) +{ + auto avail = this->get_available(); + + if (prev_line.fr_offset < avail.fr_offset) { + return true; + } + auto prev_line_end = prev_line.fr_offset + prev_line.fr_size; + auto avail_end = avail.fr_offset + avail.fr_size; + if (avail_end < prev_line_end) { + return true; + } + auto remaining = avail_end - prev_line_end; + return remaining < INITIAL_REQUEST_SIZE; +} + +void +line_buffer::quiesce() +{ + if (this->lb_loader_future.valid()) { + this->lb_loader_future.wait(); + } +} + +static ghc::filesystem::path +line_buffer_cache_path() +{ + return lnav::paths::workdir() / "buffer-cache"; +} + +void +line_buffer::enable_cache() +{ + if (!this->lb_compressed || this->lb_cached_fd) { + log_info("%d: skipping cache request (compressed=%d already-cached=%d)", + this->lb_fd.get(), + this->lb_compressed, + (bool) this->lb_cached_fd); + return; + } + + struct stat st; + + if (fstat(this->lb_fd, &st) == -1) { + log_error("failed to fstat(%d) - %d", this->lb_fd.get(), errno); + return; + } + + auto cached_base_name = hasher() + .update(st.st_dev) + .update(st.st_ino) + .update(st.st_size) + .to_string(); + auto cache_dir = line_buffer_cache_path() / cached_base_name.substr(0, 2); + + ghc::filesystem::create_directories(cache_dir); + + auto cached_file_name = fmt::format(FMT_STRING("{}.bin"), cached_base_name); + auto cached_file_path = cache_dir / cached_file_name; + auto cached_done_path + = cache_dir / fmt::format(FMT_STRING("{}.done"), cached_base_name); + + log_info( + "%d:cache file path: %s", this->lb_fd.get(), cached_file_path.c_str()); + + auto fl = lnav::filesystem::file_lock(cached_file_path); + auto guard = lnav::filesystem::file_lock::guard(&fl); + + if (ghc::filesystem::exists(cached_done_path)) { + log_info("%d:using existing cache file"); + auto open_res = lnav::filesystem::open_file(cached_file_path, O_RDWR); + if (open_res.isOk()) { + this->lb_cached_fd = open_res.unwrap(); + return; + } + ghc::filesystem::remove(cached_done_path); + } + + auto create_res = lnav::filesystem::create_file( + cached_file_path, O_RDWR | O_TRUNC, 0600); + if (create_res.isErr()) { + log_error("failed to create cache file: %s -- %s", + cached_file_path.c_str(), + create_res.unwrapErr().c_str()); + return; + } + + auto write_fd = create_res.unwrap(); + auto done = false; + + static const ssize_t FILL_LENGTH = 1024 * 1024; + auto off = file_off_t{0}; + while (!done) { + log_debug("%d: caching file content at %d", this->lb_fd.get(), off); + if (!this->fill_range(off, FILL_LENGTH)) { + log_debug("%d: caching finished", this->lb_fd.get()); + done = true; + } else { + file_ssize_t avail; + + const auto* data = this->get_range(off, avail); + auto rc = write(write_fd, data, avail); + if (rc != avail) { + log_error("%d: short write!", this->lb_fd.get()); + return; + } + + off += avail; + } + } + + lnav::filesystem::create_file(cached_done_path, O_WRONLY, 0600); + + this->lb_cached_fd = std::move(write_fd); +} + +void +line_buffer::cleanup_cache() +{ + (void) std::async(std::launch::async, []() { + auto now = std::chrono::system_clock::now(); + auto cache_path = line_buffer_cache_path(); + std::vector<ghc::filesystem::path> to_remove; + + for (const auto& cache_subdir : + ghc::filesystem::directory_iterator(cache_path)) + { + for (const auto& entry : + ghc::filesystem::directory_iterator(cache_subdir)) + { + auto mtime = ghc::filesystem::last_write_time(entry.path()); + auto exp_time = mtime + 1h; + if (now < exp_time) { + continue; + } + + to_remove.emplace_back(entry.path()); + } + } + + for (auto& entry : to_remove) { + log_debug("removing compressed file cache: %s", entry.c_str()); + ghc::filesystem::remove_all(entry); + } + }); +} |