summaryrefslogtreecommitdiffstats
path: root/src/tailer
diff options
context:
space:
mode:
Diffstat (limited to 'src/tailer')
-rw-r--r--src/tailer/CMakeLists.txt24
-rw-r--r--src/tailer/Makefile.am111
-rw-r--r--src/tailer/README.md35
-rw-r--r--src/tailer/drive_tailer.cc288
-rw-r--r--src/tailer/sha-256.c161
-rw-r--r--src/tailer/sha-256.h44
-rwxr-xr-xsrc/tailer/tailer.apebin0 -> 147456 bytes
-rw-r--r--src/tailer/tailer.c100
-rw-r--r--src/tailer/tailer.h77
-rw-r--r--src/tailer/tailer.looper.cc1192
-rw-r--r--src/tailer/tailer.looper.cfg.hh53
-rw-r--r--src/tailer/tailer.looper.hh158
-rw-r--r--src/tailer/tailer.main.c1072
-rw-r--r--src/tailer/tailerpp.cc146
-rw-r--r--src/tailer/tailerpp.hh315
-rwxr-xr-xsrc/tailer/test_tailer.sh50
16 files changed, 3826 insertions, 0 deletions
diff --git a/src/tailer/CMakeLists.txt b/src/tailer/CMakeLists.txt
new file mode 100644
index 0000000..1d1fb15
--- /dev/null
+++ b/src/tailer/CMakeLists.txt
@@ -0,0 +1,24 @@
+add_library(tailercommon sha-256.c sha-256.h tailer.c tailer.h)
+
+add_executable(tailer tailer.main.c)
+
+target_link_libraries(tailer tailercommon)
+
+add_library(tailerpp tailerpp.hh tailerpp.cc)
+target_link_libraries(tailerpp base)
+
+add_custom_command(
+ OUTPUT tailerbin.h tailerbin.cc
+ COMMAND bin2c -n tailer_bin tailerbin tailer
+ DEPENDS bin2c tailer)
+
+add_library(tailerservice tailer.looper.hh tailer.looper.cc
+ tailer.looper.cfg.hh tailerbin.h tailerbin.cc)
+target_include_directories(tailerservice PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
+target_link_libraries(tailerservice base)
+
+add_executable(drive_tailer drive_tailer.cc)
+
+target_include_directories(drive_tailer PUBLIC . .. ../fmtlib
+ ${CMAKE_CURRENT_BINARY_DIR}/..)
+target_link_libraries(drive_tailer base tailercommon tailerpp ZLIB::ZLIB)
diff --git a/src/tailer/Makefile.am b/src/tailer/Makefile.am
new file mode 100644
index 0000000..bb8a39a
--- /dev/null
+++ b/src/tailer/Makefile.am
@@ -0,0 +1,111 @@
+
+include $(top_srcdir)/aminclude_static.am
+
+TESTS_ENVIRONMENT = $(SHELL) $(top_builddir)/TESTS_ENVIRONMENT
+LOG_COMPILER = $(SHELL) $(top_builddir)/TESTS_ENVIRONMENT
+
+BUILT_SOURCES = tailerbin.cc
+
+AM_CPPFLAGS = \
+ -Wall \
+ $(CODE_COVERAGE_CPPFLAGS) \
+ $(LIBARCHIVE_CFLAGS) \
+ $(READLINE_CFLAGS) \
+ $(SQLITE3_CFLAGS) \
+ $(PCRE_CFLAGS) \
+ $(LIBCURL_CPPFLAGS)
+
+AM_LIBS = $(CODE_COVERAGE_LIBS)
+AM_CFLAGS = $(CODE_COVERAGE_CFLAGS)
+AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
+
+dist_noinst_DATA = \
+ tailer.ape
+
+noinst_LIBRARIES = \
+ libtailercommon.a \
+ libtailerpp.a \
+ libtailerservice.a
+
+noinst_HEADERS = \
+ sha-256.h \
+ tailer.h \
+ tailer.looper.hh \
+ tailer.looper.cfg.hh \
+ tailerpp.hh
+
+libtailercommon_a_SOURCES = \
+ sha-256.c \
+ tailer.c
+
+libtailerpp_a_CPPFLAGS = \
+ $(AM_CPPFLAGS) \
+ -I$(srcdir)/.. \
+ -I$(srcdir)/../fmtlib \
+ -I$(srcdir)/../third-party \
+ -I$(top_srcdir)/src/third-party/scnlib/include
+
+libtailerpp_a_SOURCES = \
+ tailerpp.cc
+
+tailerbin.cc: tailer.ape ../../tools/bin2c$(BUILD_EXEEXT)
+ ../../tools/bin2c$(BUILD_EXEEXT) -n tailer_bin tailerbin $(srcdir)/tailer.ape
+
+libtailerservice_a_CPPFLAGS = \
+ $(AM_CPPFLAGS) \
+ -I$(srcdir)/.. \
+ -I$(srcdir)/../fmtlib \
+ -I$(srcdir)/../third-party \
+ -I$(top_srcdir)/src/third-party/scnlib/include
+
+libtailerservice_a_SOURCES = \
+ tailerbin.cc \
+ tailer.looper.cc
+
+check_PROGRAMS = \
+ drive_tailer \
+ tailer
+
+tailer_SOURCES = \
+ tailer.main.c
+
+tailer_LDADD = libtailercommon.a
+
+drive_tailer_CPPFLAGS = \
+ -I$(srcdir)/.. \
+ -I$(srcdir)/../fmtlib \
+ -I$(top_srcdir)/src/third-party/scnlib/include
+
+drive_tailer_SOURCES = \
+ drive_tailer.cc
+
+drive_tailer_LDADD = \
+ libtailercommon.a \
+ libtailerpp.a \
+ ../base/libbase.a \
+ ../fmtlib/libcppfmt.a
+
+dist_noinst_SCRIPTS = \
+ test_tailer.sh
+
+TESTS = \
+ test_tailer.sh
+
+DISTCLEANFILES = \
+ *.cmd \
+ *.dat \
+ *.out \
+ *.err \
+ *.db \
+ *.dpt \
+ *.diff \
+ *.index \
+ *.tmp \
+ *.outbak \
+ *.errbak \
+ *.tmpbak \
+ tailerbin.h \
+ tailerbin.cc
+
+distclean-local:
+ $(RM_V)rm -f foo
diff --git a/src/tailer/README.md b/src/tailer/README.md
new file mode 100644
index 0000000..74660f9
--- /dev/null
+++ b/src/tailer/README.md
@@ -0,0 +1,35 @@
+# Tailer
+
+This directory contains the functionality for monitoring
+[remote files](https://docs.lnav.org/en/latest/usage.html#remote-files). The
+name "tailer" refers to the binary that is transferred to the remote host that
+takes care of tailing files and sending the contents back to the host that is
+running the main lnav binary. To ease integration with lnav's existing
+functionality, the remote files are mirrored locally. The tailer also
+supports interactive use by providing previews of file contents and
+TAB-completion possibilities.
+
+## Files
+
+The important files in this directory are:
+
+- [tailer.main.c](tailer.main.c) - The main() implementation for the tailer.
+- [tailer.looper.hh](tailer.looper.hh) - The service in the main lnav binary
+ that transfers tailers to hosts and communicates with them.
+- [tailer.h](tailer.h) and [tailerpp.hh](tailerpp.hh) - Utility libraries for
+ the tailer protocol.
+- tailer.ape - The [αcτµαlly pδrταblε εxεcµταblε](https://justine.lol/ape.html)
+ build of the tailer. This binary is produced by a GitHub Action and checked
+ in so the build process doesn't need to be supported on lots of platforms.
+
+## Flow
+
+When a remote-path is passed to lnav, the
+[file_collection.hh](../file_collection.hh) logic forwards the request to the
+`tailer::looper` service. This service makes two connections to the remote
+host using the `ssh` command so that the user's custom configurations will be
+used. The first connection is used to transfer the "tailer.ape" binary and
+make it executable. The second connection starts the tailer and uses
+stdin/stdout for a binary protocol and stderr for logging. The tailer then
+waits for requests to open files, preview files, and get possible paths for
+TAB-completions.
diff --git a/src/tailer/drive_tailer.cc b/src/tailer/drive_tailer.cc
new file mode 100644
index 0000000..18ead43
--- /dev/null
+++ b/src/tailer/drive_tailer.cc
@@ -0,0 +1,288 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <thread>
+
+#include <unistd.h>
+
+#include "base/auto_fd.hh"
+#include "base/auto_pid.hh"
+#include "config.h"
+#include "ghc/filesystem.hpp"
+#include "line_buffer.hh"
+#include "tailerpp.hh"
+
+static void
+read_err_pipe(auto_fd& err, std::string& eq)
+{
+ while (true) {
+ char buffer[1024];
+ auto rc = read(err.get(), buffer, sizeof(buffer));
+
+ if (rc <= 0) {
+ break;
+ }
+
+ eq.append(buffer, rc);
+ }
+}
+
+int
+main(int argc, char* const* argv)
+{
+ if (argc != 3) {
+ fprintf(stderr, "usage: %s <cmd> <path>\n", argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ auto in_pipe_res = auto_pipe::for_child_fd(STDIN_FILENO);
+ if (in_pipe_res.isErr()) {
+ fprintf(stderr,
+ "cannot open stdin pipe for child: %s\n",
+ in_pipe_res.unwrapErr().c_str());
+ exit(EXIT_FAILURE);
+ }
+
+ auto out_pipe_res = auto_pipe::for_child_fd(STDOUT_FILENO);
+ if (out_pipe_res.isErr()) {
+ fprintf(stderr,
+ "cannot open stdout pipe for child: %s\n",
+ out_pipe_res.unwrapErr().c_str());
+ exit(EXIT_FAILURE);
+ }
+
+ auto err_pipe_res = auto_pipe::for_child_fd(STDERR_FILENO);
+ if (err_pipe_res.isErr()) {
+ fprintf(stderr,
+ "cannot open stderr pipe for child: %s\n",
+ err_pipe_res.unwrapErr().c_str());
+ exit(EXIT_FAILURE);
+ }
+
+ auto fork_res = lnav::pid::from_fork();
+ if (fork_res.isErr()) {
+ fprintf(
+ stderr, "cannot start tailer: %s\n", fork_res.unwrapErr().c_str());
+ exit(EXIT_FAILURE);
+ }
+
+ auto in_pipe = in_pipe_res.unwrap();
+ auto out_pipe = out_pipe_res.unwrap();
+ auto err_pipe = err_pipe_res.unwrap();
+ auto child = fork_res.unwrap();
+
+ in_pipe.after_fork(child.in());
+ out_pipe.after_fork(child.in());
+ err_pipe.after_fork(child.in());
+
+ if (child.in_child()) {
+ auto this_exe = ghc::filesystem::path(argv[0]);
+ auto exe_dir = this_exe.parent_path();
+ auto tailer_exe = exe_dir / "tailer";
+
+ execlp(tailer_exe.c_str(), tailer_exe.c_str(), "-k", nullptr);
+ exit(EXIT_FAILURE);
+ }
+
+ std::string error_queue;
+ std::thread err_reader(
+ [err = std::move(err_pipe.read_end()), &error_queue]() mutable {
+ read_err_pipe(err, error_queue);
+ });
+
+ auto& to_child = in_pipe.write_end();
+ auto& from_child = out_pipe.read_end();
+ auto cmd = std::string(argv[1]);
+
+ if (cmd == "open") {
+ send_packet(
+ to_child.get(), TPT_OPEN_PATH, TPPT_STRING, argv[2], TPPT_DONE);
+ } else if (cmd == "preview") {
+ send_packet(to_child.get(),
+ TPT_LOAD_PREVIEW,
+ TPPT_STRING,
+ argv[2],
+ TPPT_INT64,
+ int64_t{1234},
+ TPPT_DONE);
+ } else if (cmd == "possible") {
+ send_packet(
+ to_child.get(), TPT_COMPLETE_PATH, TPPT_STRING, argv[2], TPPT_DONE);
+ } else {
+ fprintf(stderr, "error: unknown command -- %s\n", cmd.c_str());
+ exit(EXIT_FAILURE);
+ }
+
+ close(to_child.get());
+
+ bool done = false;
+ while (!done) {
+ auto read_res = tailer::read_packet(from_child);
+
+ if (read_res.isErr()) {
+ fprintf(stderr, "read error: %s\n", read_res.unwrapErr().c_str());
+ exit(EXIT_FAILURE);
+ }
+
+ auto packet = read_res.unwrap();
+ packet.match(
+ [&](const tailer::packet_eof& te) {
+ printf("all done!\n");
+ done = true;
+ },
+ [&](const tailer::packet_announce& pa) {},
+ [&](const tailer::packet_log& te) {
+ printf("log: %s\n", te.pl_msg.c_str());
+ },
+ [&](const tailer::packet_error& pe) {
+ printf("Got an error: %s -- %s\n",
+ pe.pe_path.c_str(),
+ pe.pe_msg.c_str());
+
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pe.pe_path))
+ .relative_path();
+
+ printf("removing %s\n", remote_path.c_str());
+ },
+ [&](const tailer::packet_offer_block& pob) {
+ printf("Got an offer: %s %lld - %lld\n",
+ pob.pob_path.c_str(),
+ pob.pob_offset,
+ pob.pob_length);
+
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pob.pob_path))
+ .relative_path();
+#if 0
+ auto local_path = tmppath / remote_path;
+ auto fd = auto_fd(open(local_path.c_str(), O_RDONLY));
+
+ if (fd == -1) {
+ printf("sending need block\n");
+ send_packet(to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING, pob.pob_path.c_str(),
+ TPPT_DONE);
+ return;
+ }
+
+ struct stat st;
+
+ if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
+ ghc::filesystem::remove_all(local_path);
+ send_packet(to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING, pob.pob_path.c_str(),
+ TPPT_DONE);
+ return;
+ }
+ auto_mem<char> buffer;
+
+ buffer = (char *) malloc(pob.pob_length);
+ auto bytes_read = pread(fd, buffer, pob.pob_length,
+ pob.pob_offset);
+
+ // fprintf(stderr, "debug: bytes_read %ld\n", bytes_read);
+ if (bytes_read == pob.pob_length) {
+ tailer::hash_frag thf;
+ calc_sha_256(thf.thf_hash, buffer, bytes_read);
+
+ if (thf == pob.pob_hash) {
+ send_packet(to_child.get(),
+ TPT_ACK_BLOCK,
+ TPPT_STRING, pob.pob_path.c_str(),
+ TPPT_DONE);
+ return;
+ }
+ } else if (bytes_read == -1) {
+ ghc::filesystem::remove_all(local_path);
+ }
+ send_packet(to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING, pob.pob_path.c_str(),
+ TPPT_DONE);
+#endif
+ },
+ [&](const tailer::packet_tail_block& ptb) {
+#if 0
+ //printf("got a tail: %s %lld %ld\n", ptb.ptb_path.c_str(),
+ // ptb.ptb_offset, ptb.ptb_bits.size());
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(ptb.ptb_path)).relative_path();
+ auto local_path = tmppath / remote_path;
+
+ ghc::filesystem::create_directories(local_path.parent_path());
+ auto fd = auto_fd(
+ open(local_path.c_str(), O_WRONLY | O_APPEND | O_CREAT,
+ 0600));
+
+ if (fd == -1) {
+ perror("open");
+ } else {
+ ftruncate(fd, ptb.ptb_offset);
+ pwrite(fd, ptb.ptb_bits.data(), ptb.ptb_bits.size(), ptb.ptb_offset);
+ }
+#endif
+ },
+ [&](const tailer::packet_synced& ps) {
+
+ },
+ [&](const tailer::packet_link& pl) {
+ printf("link value: %s -> %s\n",
+ pl.pl_path.c_str(),
+ pl.pl_link_value.c_str());
+ },
+ [&](const tailer::packet_preview_error& ppe) {
+ fprintf(stderr,
+ "preview error: %s -- %s\n",
+ ppe.ppe_path.c_str(),
+ ppe.ppe_msg.c_str());
+ },
+ [&](const tailer::packet_preview_data& ppd) {
+ printf("preview of file: %s\n%.*s\n",
+ ppd.ppd_path.c_str(),
+ (int) ppd.ppd_bits.size(),
+ ppd.ppd_bits.data());
+ },
+ [&](const tailer::packet_possible_path& ppp) {
+ printf("possible path: %s\n", ppp.ppp_path.c_str());
+ });
+ }
+
+ auto finished_child = std::move(child).wait_for_child();
+ if (!finished_child.was_normal_exit()) {
+ fprintf(stderr, "error: child exited abnormally\n");
+ }
+
+ err_reader.join();
+
+ printf("tailer stderr:\n%s", error_queue.c_str());
+ fprintf(stderr, "tailer stderr:\n%s", error_queue.c_str());
+}
diff --git a/src/tailer/sha-256.c b/src/tailer/sha-256.c
new file mode 100644
index 0000000..8e390f8
--- /dev/null
+++ b/src/tailer/sha-256.c
@@ -0,0 +1,161 @@
+/*********************************************************************
+* Filename: sha256.c
+* Author: Brad Conte (brad AT bradconte.com)
+* Copyright:
+* Disclaimer: This code is presented "as is" without any guarantees.
+* Details: Implementation of the SHA-256 hashing algorithm.
+ SHA-256 is one of the three algorithms in the SHA2
+ specification. The others, SHA-384 and SHA-512, are not
+ offered in this implementation.
+ Algorithm specification can be found here:
+ * http://csrc.nist.gov/publications/fips/fips180-2/fips180-2withchangenotice.pdf
+ This implementation uses little endian byte order.
+*********************************************************************/
+
+/*************************** HEADER FILES ***************************/
+#ifndef __COSMOPOLITAN__
+#include <stdlib.h>
+#include <memory.h>
+#endif
+
+#include "sha-256.h"
+
+/****************************** MACROS ******************************/
+#define ROTLEFT(a,b) (((a) << (b)) | ((a) >> (32-(b))))
+#define ROTRIGHT(a,b) (((a) >> (b)) | ((a) << (32-(b))))
+
+#define CH(x,y,z) (((x) & (y)) ^ (~(x) & (z)))
+#define MAJ(x,y,z) (((x) & (y)) ^ ((x) & (z)) ^ ((y) & (z)))
+#define EP0(x) (ROTRIGHT(x,2) ^ ROTRIGHT(x,13) ^ ROTRIGHT(x,22))
+#define EP1(x) (ROTRIGHT(x,6) ^ ROTRIGHT(x,11) ^ ROTRIGHT(x,25))
+#define SIG0(x) (ROTRIGHT(x,7) ^ ROTRIGHT(x,18) ^ ((x) >> 3))
+#define SIG1(x) (ROTRIGHT(x,17) ^ ROTRIGHT(x,19) ^ ((x) >> 10))
+
+/**************************** VARIABLES *****************************/
+static const WORD k[64] = {
+ 0x428a2f98,0x71374491,0xb5c0fbcf,0xe9b5dba5,0x3956c25b,0x59f111f1,0x923f82a4,0xab1c5ed5,
+ 0xd807aa98,0x12835b01,0x243185be,0x550c7dc3,0x72be5d74,0x80deb1fe,0x9bdc06a7,0xc19bf174,
+ 0xe49b69c1,0xefbe4786,0x0fc19dc6,0x240ca1cc,0x2de92c6f,0x4a7484aa,0x5cb0a9dc,0x76f988da,
+ 0x983e5152,0xa831c66d,0xb00327c8,0xbf597fc7,0xc6e00bf3,0xd5a79147,0x06ca6351,0x14292967,
+ 0x27b70a85,0x2e1b2138,0x4d2c6dfc,0x53380d13,0x650a7354,0x766a0abb,0x81c2c92e,0x92722c85,
+ 0xa2bfe8a1,0xa81a664b,0xc24b8b70,0xc76c51a3,0xd192e819,0xd6990624,0xf40e3585,0x106aa070,
+ 0x19a4c116,0x1e376c08,0x2748774c,0x34b0bcb5,0x391c0cb3,0x4ed8aa4a,0x5b9cca4f,0x682e6ff3,
+ 0x748f82ee,0x78a5636f,0x84c87814,0x8cc70208,0x90befffa,0xa4506ceb,0xbef9a3f7,0xc67178f2
+};
+
+/*********************** FUNCTION DEFINITIONS ***********************/
+void sha256_transform(SHA256_CTX *ctx, const BYTE data[])
+{
+ WORD a, b, c, d, e, f, g, h, i, j, t1, t2, m[64];
+
+ for (i = 0, j = 0; i < 16; ++i, j += 4)
+ m[i] = (data[j] << 24) | (data[j + 1] << 16) | (data[j + 2] << 8) | (data[j + 3]);
+ for ( ; i < 64; ++i)
+ m[i] = SIG1(m[i - 2]) + m[i - 7] + SIG0(m[i - 15]) + m[i - 16];
+
+ a = ctx->state[0];
+ b = ctx->state[1];
+ c = ctx->state[2];
+ d = ctx->state[3];
+ e = ctx->state[4];
+ f = ctx->state[5];
+ g = ctx->state[6];
+ h = ctx->state[7];
+
+ for (i = 0; i < 64; ++i) {
+ t1 = h + EP1(e) + CH(e,f,g) + k[i] + m[i];
+ t2 = EP0(a) + MAJ(a,b,c);
+ h = g;
+ g = f;
+ f = e;
+ e = d + t1;
+ d = c;
+ c = b;
+ b = a;
+ a = t1 + t2;
+ }
+
+ ctx->state[0] += a;
+ ctx->state[1] += b;
+ ctx->state[2] += c;
+ ctx->state[3] += d;
+ ctx->state[4] += e;
+ ctx->state[5] += f;
+ ctx->state[6] += g;
+ ctx->state[7] += h;
+}
+
+void sha256_init(SHA256_CTX *ctx)
+{
+ ctx->datalen = 0;
+ ctx->bitlen = 0;
+ ctx->state[0] = 0x6a09e667;
+ ctx->state[1] = 0xbb67ae85;
+ ctx->state[2] = 0x3c6ef372;
+ ctx->state[3] = 0xa54ff53a;
+ ctx->state[4] = 0x510e527f;
+ ctx->state[5] = 0x9b05688c;
+ ctx->state[6] = 0x1f83d9ab;
+ ctx->state[7] = 0x5be0cd19;
+}
+
+void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len)
+{
+ WORD i;
+
+ for (i = 0; i < len; ++i) {
+ ctx->data[ctx->datalen] = data[i];
+ ctx->datalen++;
+ if (ctx->datalen == 64) {
+ sha256_transform(ctx, ctx->data);
+ ctx->bitlen += 512;
+ ctx->datalen = 0;
+ }
+ }
+}
+
+void sha256_final(SHA256_CTX *ctx, BYTE hash[])
+{
+ WORD i;
+
+ i = ctx->datalen;
+
+ // Pad whatever data is left in the buffer.
+ if (ctx->datalen < 56) {
+ ctx->data[i++] = 0x80;
+ while (i < 56)
+ ctx->data[i++] = 0x00;
+ }
+ else {
+ ctx->data[i++] = 0x80;
+ while (i < 64)
+ ctx->data[i++] = 0x00;
+ sha256_transform(ctx, ctx->data);
+ memset(ctx->data, 0, 56);
+ }
+
+ // Append to the padding the total message's length in bits and transform.
+ ctx->bitlen += ctx->datalen * 8;
+ ctx->data[63] = ctx->bitlen;
+ ctx->data[62] = ctx->bitlen >> 8;
+ ctx->data[61] = ctx->bitlen >> 16;
+ ctx->data[60] = ctx->bitlen >> 24;
+ ctx->data[59] = ctx->bitlen >> 32;
+ ctx->data[58] = ctx->bitlen >> 40;
+ ctx->data[57] = ctx->bitlen >> 48;
+ ctx->data[56] = ctx->bitlen >> 56;
+ sha256_transform(ctx, ctx->data);
+
+ // Since this implementation uses little endian byte ordering and SHA uses big endian,
+ // reverse all the bytes when copying the final state to the output hash.
+ for (i = 0; i < 4; ++i) {
+ hash[i] = (ctx->state[0] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 4] = (ctx->state[1] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 8] = (ctx->state[2] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 12] = (ctx->state[3] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 16] = (ctx->state[4] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 20] = (ctx->state[5] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 24] = (ctx->state[6] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 28] = (ctx->state[7] >> (24 - i * 8)) & 0x000000ff;
+ }
+}
diff --git a/src/tailer/sha-256.h b/src/tailer/sha-256.h
new file mode 100644
index 0000000..028fcfb
--- /dev/null
+++ b/src/tailer/sha-256.h
@@ -0,0 +1,44 @@
+/*********************************************************************
+* Filename: sha256.h
+* Author: Brad Conte (brad AT bradconte.com)
+* Copyright:
+* Disclaimer: This code is presented "as is" without any guarantees.
+* Details: Defines the API for the corresponding SHA1 implementation.
+*********************************************************************/
+
+#ifndef SHA256_H
+#define SHA256_H
+
+/*************************** HEADER FILES ***************************/
+#ifndef __COSMOPOLITAN__
+#include <stddef.h>
+#endif
+
+/****************************** MACROS ******************************/
+#define SHA256_BLOCK_SIZE 32 // SHA256 outputs a 32 byte digest
+
+/**************************** DATA TYPES ****************************/
+typedef unsigned char BYTE; // 8-bit byte
+typedef unsigned int WORD; // 32-bit word, change to "long" for 16-bit machines
+
+typedef struct {
+ BYTE data[64];
+ WORD datalen;
+ unsigned long long bitlen;
+ WORD state[8];
+} SHA256_CTX;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*********************** FUNCTION DECLARATIONS **********************/
+void sha256_init(SHA256_CTX *ctx);
+void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len);
+void sha256_final(SHA256_CTX *ctx, BYTE hash[]);
+
+#ifdef __cplusplus
+};
+#endif
+
+#endif // SHA256_H \ No newline at end of file
diff --git a/src/tailer/tailer.ape b/src/tailer/tailer.ape
new file mode 100755
index 0000000..dd8e894
--- /dev/null
+++ b/src/tailer/tailer.ape
Binary files differ
diff --git a/src/tailer/tailer.c b/src/tailer/tailer.c
new file mode 100644
index 0000000..c8fbfbf
--- /dev/null
+++ b/src/tailer/tailer.c
@@ -0,0 +1,100 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __COSMOPOLITAN__
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdint.h>
+#endif
+
+#include "sha-256.h"
+#include "tailer.h"
+
+ssize_t send_packet(int fd,
+ tailer_packet_type_t tpt,
+ tailer_packet_payload_type_t payload_type,
+ ...)
+{
+ va_list args;
+ int done = 0;
+
+ va_start(args, payload_type);
+ write(fd, &tpt, sizeof(tpt));
+ do {
+ write(fd, &payload_type, sizeof(payload_type));
+ switch (payload_type) {
+ case TPPT_STRING: {
+ char *str = va_arg(args, char *);
+ uint32_t length = strlen(str);
+
+ write(fd, &length, sizeof(length));
+ write(fd, str, length);
+ break;
+ }
+ case TPPT_HASH: {
+ const char *hash = va_arg(args, const char *);
+
+ write(fd, hash, SHA256_BLOCK_SIZE);
+ break;
+ }
+ case TPPT_INT64: {
+ int64_t i = va_arg(args, int64_t);
+
+ write(fd, &i, sizeof(i));
+ break;
+ }
+ case TPPT_BITS: {
+ int32_t length = va_arg(args, int32_t);
+ const char *bits = va_arg(args, const char *);
+
+ write(fd, &length, sizeof(length));
+ write(fd, bits, length);
+ break;
+ }
+ case TPPT_DONE: {
+ done = 1;
+ break;
+ }
+ default: {
+ assert(0);
+ break;
+ }
+ }
+
+ if (!done) {
+ payload_type = va_arg(args, tailer_packet_payload_type_t);
+ }
+ } while (!done);
+ va_end(args);
+
+ return 0;
+}
diff --git a/src/tailer/tailer.h b/src/tailer/tailer.h
new file mode 100644
index 0000000..b864137
--- /dev/null
+++ b/src/tailer/tailer.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef lnav_tailer_h
+#define lnav_tailer_h
+
+#ifndef __COSMOPOLITAN__
+#include <sys/types.h>
+#endif
+
+typedef enum {
+ TPPT_DONE,
+ TPPT_STRING,
+ TPPT_HASH,
+ TPPT_INT64,
+ TPPT_BITS,
+} tailer_packet_payload_type_t;
+
+typedef enum {
+ TPT_ERROR,
+ TPT_OPEN_PATH,
+ TPT_CLOSE_PATH,
+ TPT_OFFER_BLOCK,
+ TPT_NEED_BLOCK,
+ TPT_ACK_BLOCK,
+ TPT_TAIL_BLOCK,
+ TPT_LINK_BLOCK,
+ TPT_SYNCED,
+ TPT_LOG,
+ TPT_LOAD_PREVIEW,
+ TPT_PREVIEW_ERROR,
+ TPT_PREVIEW_DATA,
+ TPT_COMPLETE_PATH,
+ TPT_POSSIBLE_PATH,
+ TPT_ANNOUNCE,
+} tailer_packet_type_t;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ssize_t send_packet(int fd,
+ tailer_packet_type_t tpt,
+ tailer_packet_payload_type_t payload_type,
+ ...);
+
+#ifdef __cplusplus
+};
+#endif
+
+#endif
diff --git a/src/tailer/tailer.looper.cc b/src/tailer/tailer.looper.cc
new file mode 100644
index 0000000..82a9fdc
--- /dev/null
+++ b/src/tailer/tailer.looper.cc
@@ -0,0 +1,1192 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <regex>
+
+#include "tailer.looper.hh"
+
+#include "base/fs_util.hh"
+#include "base/humanize.network.hh"
+#include "base/lnav_log.hh"
+#include "base/paths.hh"
+#include "config.h"
+#include "line_buffer.hh"
+#include "lnav.hh"
+#include "lnav.indexing.hh"
+#include "service_tags.hh"
+#include "tailer.h"
+#include "tailer.looper.cfg.hh"
+#include "tailerbin.h"
+#include "tailerpp.hh"
+
+using namespace std::chrono_literals;
+
+static const auto HOST_RETRY_DELAY = 1min;
+
+static void
+read_err_pipe(const std::string& netloc,
+ auto_fd& err,
+ std::vector<std::string>& eq)
+{
+ line_buffer lb;
+ file_range pipe_range;
+ bool done = false;
+
+ log_info("stderr reader started...");
+ lb.set_fd(err);
+ while (!done) {
+ auto load_res = lb.load_next_line(pipe_range);
+
+ if (load_res.isErr()) {
+ done = true;
+ } else {
+ auto li = load_res.unwrap();
+
+ pipe_range = li.li_file_range;
+ if (li.li_file_range.empty()) {
+ done = true;
+ } else {
+ lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) {
+ auto line_str
+ = string_fragment(sbr.get_data(), 0, sbr.length())
+ .trim("\n");
+ if (eq.size() < 10) {
+ eq.template emplace_back(line_str.to_string());
+ }
+
+ auto level = line_str.startswith("error:")
+ ? lnav_log_level_t::ERROR
+ : line_str.startswith("warning:")
+ ? lnav_log_level_t::WARNING
+ : line_str.startswith("info:")
+ ? lnav_log_level_t::INFO
+ : lnav_log_level_t::DEBUG;
+ log_msg_wrapper(level,
+ "tailer[%s] %.*s",
+ netloc.c_str(),
+ line_str.length(),
+ line_str.data());
+ });
+ }
+ }
+ }
+}
+
+static void
+update_tailer_progress(const std::string& netloc, const std::string& msg)
+{
+ lnav_data.ld_active_files.fc_progress->writeAccess()
+ ->sp_tailers[netloc]
+ .tp_message
+ = msg;
+}
+
+static void
+update_tailer_description(
+ const std::string& netloc,
+ const std::map<std::string, logfile_open_options_base>& desired_paths,
+ const std::string& remote_uname)
+{
+ std::vector<std::string> paths;
+
+ for (const auto& des_pair : desired_paths) {
+ paths.emplace_back(
+ fmt::format(FMT_STRING("{}{}"), netloc, des_pair.first));
+ }
+ isc::to<main_looper&, services::main_t>().send(
+ [netloc, paths, remote_uname](auto& mlooper) {
+ auto& fc = lnav_data.ld_active_files;
+
+ for (const auto& path : paths) {
+ auto iter = fc.fc_other_files.find(path);
+
+ if (iter == fc.fc_other_files.end()) {
+ continue;
+ }
+
+ iter->second.ofd_description = remote_uname;
+ }
+ fc.fc_name_to_errors.erase(netloc);
+ });
+}
+
+void
+tailer::looper::loop_body()
+{
+ auto now = std::chrono::steady_clock::now();
+ std::vector<std::string> to_erase;
+
+ for (auto& qpair : this->l_netlocs_to_paths) {
+ auto& netloc = qpair.first;
+ auto& rpq = qpair.second;
+
+ if (now < rpq.rpq_next_attempt_time) {
+ continue;
+ }
+ if (this->l_remotes.count(netloc) == 0) {
+ auto create_res = host_tailer::for_host(netloc);
+
+ if (create_res.isErr()) {
+ report_error(netloc, create_res.unwrapErr());
+ if (std::any_of(
+ rpq.rpq_new_paths.begin(),
+ rpq.rpq_new_paths.end(),
+ [](const auto& pair) { return !pair.second.loo_tail; }))
+ {
+ rpq.send_synced_to_main(netloc);
+ to_erase.push_back(netloc);
+ } else {
+ rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
+ }
+ continue;
+ }
+
+ auto ht = create_res.unwrap();
+ this->l_remotes[netloc] = ht;
+ this->s_children.add_child_service(ht);
+
+ rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(),
+ rpq.rpq_existing_paths.end());
+ rpq.rpq_existing_paths.clear();
+ }
+
+ if (!rpq.rpq_new_paths.empty()) {
+ log_debug("%s: new paths to monitor -- %s",
+ netloc.c_str(),
+ rpq.rpq_new_paths.begin()->first.c_str());
+ this->l_remotes[netloc]->send(
+ [paths = rpq.rpq_new_paths](auto& ht) {
+ for (const auto& pair : paths) {
+ log_debug("adding path to tailer -- %s",
+ pair.first.c_str());
+ ht.open_remote_path(pair.first, std::move(pair.second));
+ }
+ });
+
+ rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(),
+ rpq.rpq_new_paths.end());
+ rpq.rpq_new_paths.clear();
+ }
+ }
+
+ for (const auto& netloc : to_erase) {
+ this->l_netlocs_to_paths.erase(netloc);
+ }
+}
+
+void
+tailer::looper::add_remote(const network::path& path,
+ logfile_open_options_base options)
+{
+ auto netloc_str = fmt::to_string(path.home());
+ this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path]
+ = std::move(options);
+}
+
+void
+tailer::looper::load_preview(int64_t id, const network::path& path)
+{
+ auto netloc_str = fmt::to_string(path.home());
+ auto iter = this->l_remotes.find(netloc_str);
+
+ if (iter == this->l_remotes.end()) {
+ auto create_res = host_tailer::for_host(netloc_str);
+
+ if (create_res.isErr()) {
+ auto msg = create_res.unwrapErr();
+ isc::to<main_looper&, services::main_t>().send(
+ [id, msg](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != id) {
+ return;
+ }
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .clear();
+ lnav_data.ld_preview_source.clear();
+ lnav_data.ld_bottom_source.grep_error(msg);
+ });
+ return;
+ }
+
+ auto ht = create_res.unwrap();
+ this->l_remotes[netloc_str] = ht;
+ this->s_children.add_child_service(ht);
+ }
+
+ this->l_remotes[netloc_str]->send([id, file_path = path.p_path](auto& ht) {
+ ht.load_preview(id, file_path);
+ });
+}
+
+void
+tailer::looper::complete_path(const network::path& path)
+{
+ auto netloc_str = fmt::to_string(path.home());
+ auto iter = this->l_remotes.find(netloc_str);
+
+ if (iter == this->l_remotes.end()) {
+ auto create_res = host_tailer::for_host(netloc_str);
+
+ if (create_res.isErr()) {
+ return;
+ }
+
+ auto ht = create_res.unwrap();
+ this->l_remotes[netloc_str] = ht;
+ this->s_children.add_child_service(ht);
+ }
+
+ this->l_remotes[netloc_str]->send(
+ [file_path = path.p_path](auto& ht) { ht.complete_path(file_path); });
+}
+
+static std::vector<std::string>
+create_ssh_args_from_config(const std::string& dest)
+{
+ const auto& cfg = injector::get<const tailer::config&>();
+ std::vector<std::string> retval;
+
+ retval.emplace_back(cfg.c_ssh_cmd);
+ if (!cfg.c_ssh_flags.empty()) {
+ if (startswith(cfg.c_ssh_flags, "-")) {
+ retval.emplace_back(cfg.c_ssh_flags);
+ } else {
+ retval.emplace_back(
+ fmt::format(FMT_STRING("-{}"), cfg.c_ssh_flags));
+ }
+ }
+ for (const auto& pair : cfg.c_ssh_options) {
+ if (pair.second.empty()) {
+ continue;
+ }
+ retval.emplace_back(fmt::format(FMT_STRING("-{}"), pair.first));
+ retval.emplace_back(pair.second);
+ }
+ for (const auto& pair : cfg.c_ssh_config) {
+ if (pair.second.empty()) {
+ continue;
+ }
+ retval.emplace_back(
+ fmt::format(FMT_STRING("-o{}={}"), pair.first, pair.second));
+ }
+ retval.emplace_back(dest);
+
+ return retval;
+}
+
+Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
+tailer::looper::host_tailer::for_host(const std::string& netloc)
+{
+ log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
+
+ update_tailer_progress(netloc, "Transferring tailer...");
+
+ auto& cfg = injector::get<const tailer::config&>();
+ auto tailer_bin_name = fmt::format(FMT_STRING("tailer.bin.{}"), getpid());
+
+ auto rp = humanize::network::path::from_str(netloc).value();
+ auto ssh_dest = rp.p_locality.l_hostname;
+ if (rp.p_locality.l_username.has_value()) {
+ ssh_dest = fmt::format(FMT_STRING("{}@{}"),
+ rp.p_locality.l_username.value(),
+ rp.p_locality.l_hostname);
+ }
+
+ {
+ auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
+ auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
+ auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
+ auto child = TRY(lnav::pid::from_fork());
+
+ in_pipe.after_fork(child.in());
+ out_pipe.after_fork(child.in());
+ err_pipe.after_fork(child.in());
+
+ if (child.in_child()) {
+ auto arg_strs = create_ssh_args_from_config(ssh_dest);
+ std::vector<char*> args;
+
+ arg_strs.emplace_back(
+ fmt::format(cfg.c_transfer_cmd, tailer_bin_name));
+
+ fmt::print(stderr,
+ "tailer({}): executing -- {}\n",
+ netloc,
+ fmt::join(arg_strs, " "));
+ for (const auto& arg : arg_strs) {
+ args.push_back((char*) arg.data());
+ }
+ args.push_back(nullptr);
+
+ execvp(cfg.c_ssh_cmd.c_str(), args.data());
+ _exit(EXIT_FAILURE);
+ }
+
+ std::vector<std::string> error_queue;
+ log_debug("tailer(%s): starting err reader", netloc.c_str());
+ std::thread err_reader([netloc,
+ err = std::move(err_pipe.read_end()),
+ &error_queue]() mutable {
+ log_set_thread_prefix(
+ fmt::format(FMT_STRING("tailer({})"), netloc));
+ read_err_pipe(netloc, err, error_queue);
+ });
+
+ log_debug("tailer(%s): writing to child", netloc.c_str());
+ auto sf = tailer_bin[0].to_string_fragment();
+ ssize_t total_bytes = 0;
+ bool write_failed = false;
+
+ while (total_bytes < sf.length()) {
+ log_debug("attempting to write %d", sf.length() - total_bytes);
+ auto rc = write(
+ in_pipe.write_end(), sf.data(), sf.length() - total_bytes);
+
+ if (rc < 0) {
+ log_error(" tailer(%s): write failed -- %s",
+ netloc.c_str(),
+ strerror(errno));
+ write_failed = true;
+ break;
+ }
+ log_debug(" wrote %d", rc);
+ total_bytes += rc;
+ }
+
+ in_pipe.write_end().reset();
+
+ while (!write_failed) {
+ char buffer[1024];
+
+ auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer));
+ if (rc < 0) {
+ break;
+ }
+ if (rc == 0) {
+ break;
+ }
+ log_debug("tailer(%s): transfer output -- %.*s",
+ netloc.c_str(),
+ rc,
+ buffer);
+ }
+
+ auto finished_child = std::move(child).wait_for_child();
+
+ err_reader.join();
+ if (!finished_child.was_normal_exit()
+ || finished_child.exit_status() != EXIT_SUCCESS)
+ {
+ auto error_msg = error_queue.empty() ? "unknown"
+ : error_queue.back();
+ return Err(fmt::format(FMT_STRING("failed to ssh to host: {}"),
+ error_msg));
+ }
+ }
+
+ update_tailer_progress(netloc, "Starting tailer...");
+
+ auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
+ auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
+ auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
+ auto child = TRY(lnav::pid::from_fork());
+
+ in_pipe.after_fork(child.in());
+ out_pipe.after_fork(child.in());
+ err_pipe.after_fork(child.in());
+
+ if (child.in_child()) {
+ auto arg_strs = create_ssh_args_from_config(ssh_dest);
+ std::vector<char*> args;
+
+ arg_strs.emplace_back(fmt::format(cfg.c_start_cmd, tailer_bin_name));
+
+ fmt::print(stderr,
+ FMT_STRING("tailer({}): executing -- {}\n"),
+ netloc,
+ fmt::join(arg_strs, " "));
+ for (const auto& arg : arg_strs) {
+ args.push_back((char*) arg.data());
+ }
+ args.push_back(nullptr);
+
+ execvp(cfg.c_ssh_cmd.c_str(), args.data());
+ _exit(EXIT_FAILURE);
+ }
+
+ return Ok(std::make_shared<host_tailer>(netloc,
+ std::move(child),
+ std::move(in_pipe.write_end()),
+ std::move(out_pipe.read_end()),
+ std::move(err_pipe.read_end())));
+}
+
+static ghc::filesystem::path
+remote_cache_path()
+{
+ return lnav::paths::workdir() / "remotes";
+}
+
+ghc::filesystem::path
+tailer::looper::host_tailer::tmp_path()
+{
+ auto local_path = remote_cache_path();
+
+ ghc::filesystem::create_directories(local_path);
+ auto_mem<char> resolved_path;
+
+ resolved_path = realpath(local_path.c_str(), nullptr);
+ if (resolved_path.in() == nullptr) {
+ return local_path;
+ }
+
+ return resolved_path.in();
+}
+
+static std::string
+scrub_netloc(const std::string& netloc)
+{
+ const static std::regex TO_SCRUB(R"([^\w\.\@])");
+
+ return std::regex_replace(netloc, TO_SCRUB, "_");
+}
+
+tailer::looper::host_tailer::host_tailer(const std::string& netloc,
+ auto_pid<process_state::running> child,
+ auto_fd to_child,
+ auto_fd from_child,
+ auto_fd err_from_child)
+ : isc::service<host_tailer>(netloc), ht_netloc(netloc),
+ ht_local_path(tmp_path() / scrub_netloc(netloc)),
+ ht_error_reader([netloc,
+ err = std::move(err_from_child),
+ &eq = this->ht_error_queue]() mutable {
+ read_err_pipe(netloc, err, eq);
+ }),
+ ht_state(connected{
+ std::move(child), std::move(to_child), std::move(from_child), {}})
+{
+}
+
+void
+tailer::looper::host_tailer::open_remote_path(const std::string& path,
+ logfile_open_options_base loo)
+{
+ this->ht_state.match(
+ [&](connected& conn) {
+ conn.c_desired_paths[path] = std::move(loo);
+ send_packet(conn.ht_to_child.get(),
+ TPT_OPEN_PATH,
+ TPPT_STRING,
+ path.c_str(),
+ TPPT_DONE);
+ },
+ [&](const disconnected& d) {
+ log_warning("disconnected from host, cannot tail: %s",
+ path.c_str());
+ },
+ [&](const synced& s) {
+ log_warning("synced with host, not tailing: %s", path.c_str());
+ });
+}
+
+void
+tailer::looper::host_tailer::load_preview(int64_t id, const std::string& path)
+{
+ this->ht_state.match(
+ [&](connected& conn) {
+ send_packet(conn.ht_to_child.get(),
+ TPT_LOAD_PREVIEW,
+ TPPT_STRING,
+ path.c_str(),
+ TPPT_INT64,
+ id,
+ TPPT_DONE);
+ },
+ [&](const disconnected& d) {
+ log_warning("disconnected from host, cannot preview: %s",
+ path.c_str());
+
+ auto msg = fmt::format(FMT_STRING("error: disconnected from {}"),
+ this->ht_netloc);
+ isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != id) {
+ return;
+ }
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .set_value(msg);
+ });
+ },
+ [&](const synced& s) { require(false); });
+}
+
+void
+tailer::looper::host_tailer::complete_path(const std::string& path)
+{
+ this->ht_state.match(
+ [&](connected& conn) {
+ send_packet(conn.ht_to_child.get(),
+ TPT_COMPLETE_PATH,
+ TPPT_STRING,
+ path.c_str(),
+ TPPT_DONE);
+ },
+ [&](const disconnected& d) {
+ log_warning("disconnected from host, cannot preview: %s",
+ path.c_str());
+ },
+ [&](const synced& s) { require(false); });
+}
+
+void
+tailer::looper::host_tailer::loop_body()
+{
+ const static uint64_t TOUCH_FREQ = 10000;
+
+ if (!this->ht_state.is<connected>()) {
+ return;
+ }
+
+ this->ht_cycle_count += 1;
+ if (this->ht_cycle_count % TOUCH_FREQ == 0) {
+ auto now
+ = ghc::filesystem::file_time_type{std::chrono::system_clock::now()};
+ ghc::filesystem::last_write_time(this->ht_local_path, now);
+ }
+
+ auto& conn = this->ht_state.get<connected>();
+
+ pollfd pfds[1];
+
+ pfds[0].fd = conn.ht_from_child.get();
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+
+ auto ready_count = poll(pfds, 1, 100);
+ if (ready_count > 0) {
+ auto read_res = tailer::read_packet(conn.ht_from_child);
+
+ if (read_res.isErr()) {
+ log_error("read error: %s", read_res.unwrapErr().c_str());
+ return;
+ }
+
+ auto packet = read_res.unwrap();
+ this->ht_state = packet.match(
+ [&](const tailer::packet_eof& te) {
+ log_debug("all done!");
+
+ auto finished_child = std::move(conn).close();
+ if (finished_child.exit_status() != 0
+ && !this->ht_error_queue.empty())
+ {
+ report_error(this->ht_netloc, this->ht_error_queue.back());
+ }
+
+ return state_v{disconnected()};
+ },
+ [&](const tailer::packet_announce& pa) {
+ update_tailer_description(
+ this->ht_netloc, conn.c_desired_paths, pa.pa_uname);
+ this->ht_uname = pa.pa_uname;
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_log& pl) {
+ log_debug("%s\n", pl.pl_msg.c_str());
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_error& pe) {
+ log_debug("Got an error: %s -- %s",
+ pe.pe_path.c_str(),
+ pe.pe_msg.c_str());
+
+ lnav_data.ld_active_files.fc_progress->writeAccess()
+ ->sp_tailers.erase(this->ht_netloc);
+
+ auto desired_iter = conn.c_desired_paths.find(pe.pe_path);
+ if (desired_iter != conn.c_desired_paths.end()) {
+ report_error(this->get_display_path(pe.pe_path), pe.pe_msg);
+ if (!desired_iter->second.loo_tail) {
+ conn.c_desired_paths.erase(desired_iter);
+ }
+ } else {
+ auto child_iter = conn.c_child_paths.find(pe.pe_path);
+
+ if (child_iter != conn.c_child_paths.end()
+ && !child_iter->second.loo_tail)
+ {
+ conn.c_child_paths.erase(child_iter);
+ }
+ }
+
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pe.pe_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+
+ log_debug("removing %s", local_path.c_str());
+ this->ht_active_files.erase(local_path);
+ ghc::filesystem::remove_all(local_path);
+
+ if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
+ {
+ log_info("tailer(%s): all desired paths synced",
+ this->ht_netloc.c_str());
+ return state_v{synced{}};
+ }
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_offer_block& pob) {
+ log_debug("Got an offer: %s %lld - %lld",
+ pob.pob_path.c_str(),
+ pob.pob_offset,
+ pob.pob_length);
+
+ logfile_open_options_base loo;
+ if (pob.pob_path == pob.pob_root_path) {
+ auto root_iter = conn.c_desired_paths.find(pob.pob_path);
+
+ if (root_iter == conn.c_desired_paths.end()) {
+ log_warning("ignoring unknown root: %s",
+ pob.pob_root_path.c_str());
+ return std::move(this->ht_state);
+ }
+
+ loo = root_iter->second;
+ } else {
+ auto child_iter = conn.c_child_paths.find(pob.pob_path);
+ if (child_iter == conn.c_child_paths.end()) {
+ auto root_iter
+ = conn.c_desired_paths.find(pob.pob_root_path);
+
+ if (root_iter == conn.c_desired_paths.end()) {
+ log_warning("ignoring child of unknown root: %s",
+ pob.pob_root_path.c_str());
+ return std::move(this->ht_state);
+ }
+
+ conn.c_child_paths[pob.pob_path]
+ = std::move(root_iter->second);
+ child_iter = conn.c_child_paths.find(pob.pob_path);
+ }
+
+ loo = std::move(child_iter->second);
+ }
+
+ update_tailer_description(
+ this->ht_netloc, conn.c_desired_paths, this->ht_uname);
+
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pob.pob_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+ auto open_res
+ = lnav::filesystem::open_file(local_path, O_RDONLY);
+
+ if (this->ht_active_files.count(local_path) == 0) {
+ this->ht_active_files.insert(local_path);
+
+ auto custom_name = this->get_display_path(pob.pob_path);
+ isc::to<main_looper&, services::main_t>().send(
+ [local_path,
+ custom_name,
+ loo,
+ netloc = this->ht_netloc](auto& mlooper) {
+ auto& active_fc = lnav_data.ld_active_files;
+ auto lpath_str = local_path.string();
+
+ {
+ safe::WriteAccess<safe_scan_progress> sp(
+ *active_fc.fc_progress);
+
+ sp->sp_tailers.erase(netloc);
+ }
+ if (active_fc.fc_file_names.count(lpath_str) > 0) {
+ log_debug("already in fc_file_names");
+ return;
+ }
+ if (active_fc.fc_closed_files.count(custom_name)
+ > 0) {
+ log_debug("in closed");
+ return;
+ }
+
+ file_collection fc;
+
+ fc.fc_file_names[lpath_str]
+ .with_filename(custom_name)
+ .with_source(logfile_name_source::REMOTE)
+ .with_tail(loo.loo_tail)
+ .with_non_utf_visibility(false)
+ .with_visible_size_limit(256 * 1024);
+ update_active_files(fc);
+ });
+ }
+
+ if (open_res.isErr()) {
+ log_debug("file not found (%s), sending need block",
+ open_res.unwrapErr().c_str());
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+
+ auto fd = open_res.unwrap();
+ struct stat st;
+
+ if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
+ log_debug("path changed, sending need block");
+ ghc::filesystem::remove_all(local_path);
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+
+ if (st.st_size == pob.pob_offset) {
+ log_debug("local file is synced, sending need block");
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+
+ constexpr int64_t BUFFER_SIZE = 4 * 1024 * 1024;
+ auto_mem<unsigned char> buffer;
+
+ buffer = (unsigned char*) malloc(BUFFER_SIZE);
+ auto remaining = pob.pob_length;
+ auto remaining_offset = pob.pob_offset;
+ tailer::hash_frag thf;
+ SHA256_CTX shactx;
+ sha256_init(&shactx);
+
+ log_debug("checking offer %s[%lld..+%lld]",
+ local_path.c_str(),
+ remaining_offset,
+ remaining);
+ while (remaining > 0) {
+ auto nbytes = std::min(remaining, BUFFER_SIZE);
+ auto bytes_read
+ = pread(fd, buffer, nbytes, remaining_offset);
+ if (bytes_read == -1) {
+ log_debug(
+ "unable to read file, sending need block -- %s",
+ strerror(errno));
+ ghc::filesystem::remove_all(local_path);
+ break;
+ }
+ if (bytes_read == 0) {
+ break;
+ }
+ sha256_update(&shactx, buffer.in(), bytes_read);
+ remaining -= bytes_read;
+ remaining_offset += bytes_read;
+ }
+
+ if (remaining == 0) {
+ sha256_final(&shactx, thf.thf_hash);
+
+ if (thf == pob.pob_hash) {
+ log_debug("local file block is same, sending ack");
+ send_packet(conn.ht_to_child.get(),
+ TPT_ACK_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_INT64,
+ pob.pob_offset,
+ TPPT_INT64,
+ pob.pob_length,
+ TPPT_INT64,
+ (int64_t) st.st_size,
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ }
+ log_debug("local file is different, sending need block");
+ }
+ send_packet(conn.ht_to_child.get(),
+ TPT_NEED_BLOCK,
+ TPPT_STRING,
+ pob.pob_path.c_str(),
+ TPPT_DONE);
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_tail_block& ptb) {
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(ptb.ptb_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+
+ log_debug("writing tail to: %lld/%ld %s",
+ ptb.ptb_offset,
+ ptb.ptb_bits.size(),
+ local_path.c_str());
+ ghc::filesystem::create_directories(local_path.parent_path());
+ auto create_res = lnav::filesystem::create_file(
+ local_path, O_WRONLY | O_APPEND | O_CREAT, 0600);
+
+ if (create_res.isErr()) {
+ log_error("open: %s", create_res.unwrapErr().c_str());
+ } else {
+ auto fd = create_res.unwrap();
+ ftruncate(fd, ptb.ptb_offset);
+ pwrite(fd,
+ ptb.ptb_bits.data(),
+ ptb.ptb_bits.size(),
+ ptb.ptb_offset);
+ auto mtime = ghc::filesystem::file_time_type{
+ std::chrono::seconds{ptb.ptb_mtime}};
+ // XXX This isn't atomic with the write...
+ ghc::filesystem::last_write_time(local_path, mtime);
+ }
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_synced& ps) {
+ if (ps.ps_root_path == ps.ps_path) {
+ auto iter = conn.c_desired_paths.find(ps.ps_path);
+
+ if (iter != conn.c_desired_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_desired_paths.insert(ps.ps_path);
+ } else {
+ log_info("synced desired path: %s",
+ iter->first.c_str());
+ conn.c_desired_paths.erase(iter);
+ }
+ }
+ } else {
+ auto iter = conn.c_child_paths.find(ps.ps_path);
+
+ if (iter != conn.c_child_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_child_paths.insert(ps.ps_path);
+ } else {
+ log_info("synced child path: %s",
+ iter->first.c_str());
+ conn.c_child_paths.erase(iter);
+ }
+ }
+ }
+
+ if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
+ {
+ log_info("tailer(%s): all desired paths synced",
+ this->ht_netloc.c_str());
+ return state_v{synced{}};
+ } else if (!conn.c_initial_sync_done
+ && conn.c_desired_paths.size()
+ == conn.c_synced_desired_paths.size()
+ && conn.c_child_paths.size()
+ == conn.c_synced_child_paths.size())
+ {
+ log_info("tailer(%s): all desired paths synced",
+ this->ht_netloc.c_str());
+ conn.c_initial_sync_done = true;
+
+ std::set<std::string> synced_files;
+ for (const auto& desired_pair : conn.c_desired_paths) {
+ synced_files.emplace(fmt::format(
+ FMT_STRING("{}{}"), ht_netloc, desired_pair.first));
+ }
+ isc::to<main_looper&, services::main_t>().send(
+ [file_set = std::move(synced_files)](auto& mlooper) {
+ file_collection fc;
+
+ fc.fc_synced_files = file_set;
+ update_active_files(fc);
+ });
+ }
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_link& pl) {
+ auto remote_path = ghc::filesystem::absolute(
+ ghc::filesystem::path(pl.pl_path))
+ .relative_path();
+ auto local_path = this->ht_local_path / remote_path;
+ auto remote_link_path = ghc::filesystem::path(pl.pl_link_value);
+ std::string link_path;
+
+ if (remote_link_path.is_absolute()) {
+ auto local_link_path = this->ht_local_path
+ / remote_link_path.relative_path();
+
+ link_path = local_link_path.string();
+ } else {
+ link_path = remote_link_path.string();
+ }
+
+ log_debug("symlinking %s -> %s",
+ local_path.c_str(),
+ link_path.c_str());
+ ghc::filesystem::create_directories(local_path.parent_path());
+ ghc::filesystem::remove_all(local_path);
+ if (symlink(link_path.c_str(), local_path.c_str()) < 0) {
+ log_error("symlink failed: %s", strerror(errno));
+ }
+
+ if (pl.pl_root_path == pl.pl_path) {
+ auto iter = conn.c_desired_paths.find(pl.pl_path);
+
+ if (iter != conn.c_desired_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_desired_paths.insert(pl.pl_path);
+ } else {
+ log_info("synced desired path: %s",
+ iter->first.c_str());
+ conn.c_desired_paths.erase(iter);
+ }
+ }
+ } else {
+ auto iter = conn.c_child_paths.find(pl.pl_path);
+
+ if (iter != conn.c_child_paths.end()) {
+ if (iter->second.loo_tail) {
+ conn.c_synced_child_paths.insert(pl.pl_path);
+ } else {
+ log_info("synced child path: %s",
+ iter->first.c_str());
+ conn.c_child_paths.erase(iter);
+ }
+ }
+ }
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_preview_error& ppe) {
+ isc::to<main_looper&, services::main_t>().send(
+ [ppe](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != ppe.ppe_id) {
+ log_debug("preview ID mismatch: %lld != %lld",
+ lnav_data.ld_preview_generation,
+ ppe.ppe_id);
+ return;
+ }
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .clear();
+ lnav_data.ld_preview_source.clear();
+ lnav_data.ld_bottom_source.grep_error(ppe.ppe_msg);
+ });
+
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_preview_data& ppd) {
+ isc::to<main_looper&, services::main_t>().send(
+ [netloc = this->ht_netloc, ppd](auto& mlooper) {
+ if (lnav_data.ld_preview_generation != ppd.ppd_id) {
+ log_debug("preview ID mismatch: %lld != %lld",
+ lnav_data.ld_preview_generation,
+ ppd.ppd_id);
+ return;
+ }
+ std::string str(ppd.ppd_bits.begin(),
+ ppd.ppd_bits.end());
+ lnav_data.ld_preview_status_source.get_description()
+ .set_cylon(false)
+ .set_value("For file: %s:%s",
+ netloc.c_str(),
+ ppd.ppd_path.c_str());
+ lnav_data.ld_preview_source.replace_with(str)
+ .set_text_format(detect_text_format(str));
+ });
+ return std::move(this->ht_state);
+ },
+ [&](const tailer::packet_possible_path& ppp) {
+ log_debug("possible path: %s", ppp.ppp_path.c_str());
+ auto full_path = fmt::format(
+ FMT_STRING("{}{}"), this->ht_netloc, ppp.ppp_path);
+
+ isc::to<main_looper&, services::main_t>().send(
+ [full_path](auto& mlooper) {
+ lnav_data.ld_rl_view->add_possibility(
+ ln_mode_t::COMMAND, "remote-path", full_path);
+ });
+ return std::move(this->ht_state);
+ });
+
+ if (!this->ht_state.is<connected>()) {
+ this->s_looping = false;
+ }
+ }
+}
+
+std::chrono::milliseconds
+tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
+{
+ return 0s;
+}
+
+void
+tailer::looper::host_tailer::stopped()
+{
+ if (this->ht_state.is<connected>()) {
+ this->ht_state = disconnected();
+ }
+ if (this->ht_error_reader.joinable()) {
+ this->ht_error_reader.join();
+ }
+}
+
+std::string
+tailer::looper::host_tailer::get_display_path(
+ const std::string& remote_path) const
+{
+ return fmt::format(FMT_STRING("{}{}"), this->ht_netloc, remote_path);
+}
+
+void*
+tailer::looper::host_tailer::run()
+{
+ log_set_thread_prefix(
+ fmt::format(FMT_STRING("tailer({})"), this->ht_netloc));
+
+ return service_base::run();
+}
+
+auto_pid<process_state::finished>
+tailer::looper::host_tailer::connected::close() &&
+{
+ this->ht_to_child.reset();
+ this->ht_from_child.reset();
+
+ return std::move(this->ht_child).wait_for_child();
+}
+
+void
+tailer::looper::child_finished(std::shared_ptr<service_base> child)
+{
+ auto child_tailer = std::static_pointer_cast<host_tailer>(child);
+
+ for (auto iter = this->l_remotes.begin(); iter != this->l_remotes.end();
+ ++iter)
+ {
+ if (iter->second != child_tailer) {
+ continue;
+ }
+
+ if (child_tailer->is_synced()) {
+ log_info("synced with netloc '%s', removing", iter->first.c_str());
+ auto netloc_iter = this->l_netlocs_to_paths.find(iter->first);
+
+ if (netloc_iter != this->l_netlocs_to_paths.end()) {
+ netloc_iter->second.send_synced_to_main(netloc_iter->first);
+ this->l_netlocs_to_paths.erase(netloc_iter);
+ }
+ }
+ lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
+ iter->first);
+ this->l_remotes.erase(iter);
+ return;
+ }
+}
+
+void
+tailer::looper::remote_path_queue::send_synced_to_main(
+ const std::string& netloc)
+{
+ std::set<std::string> synced_files;
+
+ for (const auto& pair : this->rpq_new_paths) {
+ if (!pair.second.loo_tail) {
+ synced_files.emplace(
+ fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
+ }
+ }
+ for (const auto& pair : this->rpq_existing_paths) {
+ if (!pair.second.loo_tail) {
+ synced_files.emplace(
+ fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
+ }
+ }
+
+ isc::to<main_looper&, services::main_t>().send(
+ [file_set = std::move(synced_files)](auto& mlooper) {
+ file_collection fc;
+
+ fc.fc_synced_files = file_set;
+ update_active_files(fc);
+ });
+}
+
+void
+tailer::looper::report_error(std::string path, std::string msg)
+{
+ log_error("reporting error: %s -- %s", path.c_str(), msg.c_str());
+ isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
+ file_collection fc;
+
+ fc.fc_name_to_errors.emplace(path,
+ file_error_info{
+ {},
+ msg,
+ });
+ update_active_files(fc);
+ lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
+ path);
+ });
+}
+
+void
+tailer::cleanup_cache()
+{
+ (void) std::async(std::launch::async, []() {
+ auto now = std::chrono::system_clock::now();
+ auto cache_path = remote_cache_path();
+ const auto& cfg = injector::get<const config&>();
+ std::vector<ghc::filesystem::path> to_remove;
+
+ log_debug("cache-ttl %d", cfg.c_cache_ttl.count());
+ for (const auto& entry :
+ ghc::filesystem::directory_iterator(cache_path))
+ {
+ auto mtime = ghc::filesystem::last_write_time(entry.path());
+ auto exp_time = mtime + cfg.c_cache_ttl;
+ if (now < exp_time) {
+ continue;
+ }
+
+ to_remove.emplace_back(entry.path());
+ }
+
+ for (auto& entry : to_remove) {
+ log_debug("removing cached remote: %s", entry.c_str());
+ ghc::filesystem::remove_all(entry);
+ }
+ });
+}
diff --git a/src/tailer/tailer.looper.cfg.hh b/src/tailer/tailer.looper.cfg.hh
new file mode 100644
index 0000000..65b5ff6
--- /dev/null
+++ b/src/tailer/tailer.looper.cfg.hh
@@ -0,0 +1,53 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef lnav_tailer_looper_cfg_hh
+#define lnav_tailer_looper_cfg_hh
+
+#include <chrono>
+
+namespace tailer {
+
+struct config {
+ int64_t c_min_free_space{32 * 1024 * 1024};
+ std::chrono::seconds c_cache_ttl{std::chrono::hours(48)};
+ std::string c_transfer_cmd{"cat > {0:} && chmod ugo+rx ./{0:}"};
+ std::string c_start_cmd{"bash -c ./{0:}"};
+ std::string c_ssh_cmd{"ssh"};
+ std::string c_ssh_flags{};
+ std::map<std::string, std::string> c_ssh_options{};
+ std::map<std::string, std::string> c_ssh_config{
+ {"BatchMode", "yes"},
+ {"ConnectTimeout", "10"},
+ };
+};
+
+} // namespace tailer
+
+#endif
diff --git a/src/tailer/tailer.looper.hh b/src/tailer/tailer.looper.hh
new file mode 100644
index 0000000..1bf84d9
--- /dev/null
+++ b/src/tailer/tailer.looper.hh
@@ -0,0 +1,158 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef lnav_tailer_looper_hh
+#define lnav_tailer_looper_hh
+
+#include <set>
+
+#include <logfile_fwd.hh>
+
+#include "base/auto_fd.hh"
+#include "base/auto_pid.hh"
+#include "base/isc.hh"
+#include "base/network.tcp.hh"
+#include "ghc/filesystem.hpp"
+#include "mapbox/variant.hpp"
+
+namespace tailer {
+
+class looper : public isc::service<looper> {
+public:
+ void add_remote(const network::path& path,
+ logfile_open_options_base options);
+
+ void load_preview(int64_t id, const network::path& path);
+
+ void complete_path(const network::path& path);
+
+ bool empty() const { return this->l_netlocs_to_paths.empty(); }
+
+ std::set<std::string> active_netlocs() const
+ {
+ std::set<std::string> retval;
+
+ for (const auto& pair : this->l_remotes) {
+ retval.insert(pair.first);
+ }
+ return retval;
+ }
+
+protected:
+ void loop_body() override;
+
+ void child_finished(std::shared_ptr<service_base> child) override;
+
+private:
+ class host_tailer : public isc::service<host_tailer> {
+ public:
+ static Result<std::shared_ptr<host_tailer>, std::string> for_host(
+ const std::string& netloc);
+
+ host_tailer(const std::string& netloc,
+ auto_pid<process_state::running> child,
+ auto_fd to_child,
+ auto_fd from_child,
+ auto_fd err_from_child);
+
+ void open_remote_path(const std::string& path,
+ logfile_open_options_base loo);
+
+ void load_preview(int64_t id, const std::string& path);
+
+ void complete_path(const std::string& path);
+
+ bool is_synced() const { return this->ht_state.is<synced>(); }
+
+ protected:
+ void* run() override;
+
+ void loop_body() override;
+
+ void stopped() override;
+
+ std::chrono::milliseconds compute_timeout(
+ mstime_t current_time) const override;
+
+ private:
+ static ghc::filesystem::path tmp_path();
+
+ std::string get_display_path(const std::string& remote_path) const;
+
+ struct connected {
+ auto_pid<process_state::running> ht_child;
+ auto_fd ht_to_child;
+ auto_fd ht_from_child;
+ std::map<std::string, logfile_open_options_base> c_desired_paths;
+ std::set<std::string> c_synced_desired_paths;
+ std::map<std::string, logfile_open_options_base> c_child_paths;
+ std::set<std::string> c_synced_child_paths;
+ bool c_initial_sync_done{false};
+
+ auto_pid<process_state::finished> close() &&;
+ };
+
+ struct disconnected {};
+ struct synced {};
+
+ using state_v = mapbox::util::variant<connected, disconnected, synced>;
+
+ const std::string ht_netloc;
+ std::string ht_uname;
+ const ghc::filesystem::path ht_local_path;
+ std::set<ghc::filesystem::path> ht_active_files;
+ std::vector<std::string> ht_error_queue;
+ std::thread ht_error_reader;
+ state_v ht_state{disconnected()};
+ uint64_t ht_cycle_count{0};
+ };
+
+ static void report_error(std::string path, std::string msg);
+
+ using attempt_time_point
+ = std::chrono::time_point<std::chrono::steady_clock>;
+
+ struct remote_path_queue {
+ attempt_time_point rpq_next_attempt_time{
+ std::chrono::steady_clock::now()};
+ std::map<std::string, logfile_open_options_base> rpq_new_paths;
+ std::map<std::string, logfile_open_options_base> rpq_existing_paths;
+
+ void send_synced_to_main(const std::string& netloc);
+ };
+
+ std::map<std::string, remote_path_queue> l_netlocs_to_paths;
+ std::map<std::string, std::shared_ptr<host_tailer>> l_remotes;
+};
+
+void cleanup_cache();
+
+} // namespace tailer
+
+#endif
diff --git a/src/tailer/tailer.main.c b/src/tailer/tailer.main.c
new file mode 100644
index 0000000..445d24c
--- /dev/null
+++ b/src/tailer/tailer.main.c
@@ -0,0 +1,1072 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __COSMOPOLITAN__
+#include <glob.h>
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <dirent.h>
+#include <stdarg.h>
+#include <limits.h>
+#include <poll.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/utsname.h>
+#include <ctype.h>
+#include <stdint.h>
+#endif
+
+#include "sha-256.h"
+#include "tailer.h"
+
+struct node {
+ struct node *n_succ;
+ struct node *n_pred;
+};
+
+struct list {
+ struct node *l_head;
+ struct node *l_tail;
+ struct node *l_tail_pred;
+};
+
+int is_glob(const char *fn)
+{
+ return (strchr(fn, '*') != NULL ||
+ strchr(fn, '?') != NULL ||
+ strchr(fn, '[') != NULL);
+};
+
+void list_init(struct list *l)
+{
+ l->l_head = (struct node *) &l->l_tail;
+ l->l_tail = NULL;
+ l->l_tail_pred = (struct node *) &l->l_head;
+}
+
+void list_move(struct list *dst, struct list *src)
+{
+ if (src->l_head->n_succ == NULL) {
+ list_init(dst);
+ return;
+ }
+
+ dst->l_head = src->l_head;
+ dst->l_head->n_pred = (struct node *) &dst->l_head;
+ dst->l_tail = NULL;
+ dst->l_tail_pred = src->l_tail_pred;
+ dst->l_tail_pred->n_succ = (struct node *) &dst->l_tail;
+
+ list_init(src);
+}
+
+void list_remove(struct node *n)
+{
+ n->n_pred->n_succ = n->n_succ;
+ n->n_succ->n_pred = n->n_pred;
+
+ n->n_succ = NULL;
+ n->n_pred = NULL;
+}
+
+struct node *list_remove_head(struct list *l)
+{
+ struct node *retval = NULL;
+
+ if (l->l_head->n_succ != NULL) {
+ retval = l->l_head;
+ list_remove(l->l_head);
+ }
+
+ return retval;
+}
+
+void list_append(struct list *l, struct node *n)
+{
+ n->n_pred = l->l_tail_pred;
+ n->n_succ = (struct node *) &l->l_tail;
+ l->l_tail_pred->n_succ = n;
+ l->l_tail_pred = n;
+}
+
+typedef enum {
+ CS_INIT,
+ CS_OFFERED,
+ CS_TAILING,
+ CS_SYNCED,
+} client_state_t;
+
+typedef enum {
+ PS_UNKNOWN,
+ PS_OK,
+ PS_ERROR,
+} path_state_t;
+
+struct client_path_state {
+ struct node cps_node;
+ char *cps_path;
+ path_state_t cps_last_path_state;
+ struct stat cps_last_stat;
+ int64_t cps_client_file_offset;
+ int64_t cps_client_file_size;
+ client_state_t cps_client_state;
+ struct list cps_children;
+};
+
+struct client_path_state *create_client_path_state(const char *path)
+{
+ struct client_path_state *retval = malloc(sizeof(struct client_path_state));
+
+ retval->cps_path = strdup(path);
+ retval->cps_last_path_state = PS_UNKNOWN;
+ memset(&retval->cps_last_stat, 0, sizeof(retval->cps_last_stat));
+ retval->cps_client_file_offset = -1;
+ retval->cps_client_file_size = 0;
+ retval->cps_client_state = CS_INIT;
+ list_init(&retval->cps_children);
+ return retval;
+}
+
+void delete_client_path_state(struct client_path_state *cps);
+
+void delete_client_path_list(struct list *l)
+{
+ struct client_path_state *child_cps;
+
+ while ((child_cps = (struct client_path_state *) list_remove_head(l)) != NULL) {
+ list_remove(&child_cps->cps_node);
+ delete_client_path_state(child_cps);
+ }
+}
+
+void delete_client_path_state(struct client_path_state *cps)
+{
+ free(cps->cps_path);
+ delete_client_path_list(&cps->cps_children);
+ free(cps);
+}
+
+void dump_client_path_states(struct list *path_list)
+{
+ struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
+
+ while (curr->cps_node.n_succ != NULL) {
+ fprintf(stderr, "debug: path %s\n", curr->cps_path);
+ dump_client_path_states(&curr->cps_children);
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ }
+
+ curr = (struct client_path_state *) path_list->l_tail_pred;
+ while (curr->cps_node.n_pred != NULL) {
+ fprintf(stderr, "debug: back path %s\n", curr->cps_path);
+ dump_client_path_states(&curr->cps_children);
+
+ curr = (struct client_path_state *) curr->cps_node.n_pred;
+ }
+}
+
+void send_error(struct client_path_state *cps, char *msg, ...)
+{
+ char buffer[1024];
+ va_list args;
+
+ va_start(args, msg);
+ vsnprintf(buffer, sizeof(buffer), msg, args);
+ va_end(args);
+
+ send_packet(STDOUT_FILENO,
+ TPT_ERROR,
+ TPPT_STRING, cps->cps_path,
+ TPPT_STRING, buffer,
+ TPPT_DONE);
+}
+
+void set_client_path_state_error(struct client_path_state *cps, const char *op)
+{
+ if (cps->cps_last_path_state != PS_ERROR) {
+ // tell client of the problem
+ send_error(cps, "unable to %s -- %s", op, strerror(errno));
+ }
+ cps->cps_last_path_state = PS_ERROR;
+ cps->cps_client_file_offset = -1;
+ cps->cps_client_state = CS_INIT;
+ delete_client_path_list(&cps->cps_children);
+}
+
+typedef enum {
+ RS_ERROR,
+ RS_PACKET_TYPE,
+ RS_PAYLOAD_TYPE,
+ RS_PAYLOAD,
+ RS_PAYLOAD_LENGTH,
+ RS_PAYLOAD_CONTENT,
+} recv_state_t;
+
+static recv_state_t readall(recv_state_t state, int sock, void *buf, size_t len)
+{
+ char *cbuf = (char *) buf;
+ off_t offset = 0;
+
+ if (state == RS_ERROR) {
+ return RS_ERROR;
+ }
+
+ while (len > 0) {
+ ssize_t rc = read(sock, &cbuf[offset], len);
+
+ if (rc == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+
+ } else {
+ return RS_ERROR;
+ }
+ }
+ else if (rc == 0) {
+ errno = EIO;
+ return RS_ERROR;
+ }
+ else {
+ len -= rc;
+ offset += rc;
+ }
+ }
+
+ switch (state) {
+ case RS_PACKET_TYPE:
+ return RS_PAYLOAD_TYPE;
+ case RS_PAYLOAD_TYPE:
+ return RS_PAYLOAD;
+ case RS_PAYLOAD_LENGTH:
+ return RS_PAYLOAD_CONTENT;
+ case RS_PAYLOAD_CONTENT:
+ return RS_PAYLOAD_TYPE;
+ default:
+ return RS_ERROR;
+ }
+}
+
+static tailer_packet_payload_type_t read_payload_type(recv_state_t *state, int sock)
+{
+ tailer_packet_payload_type_t retval = TPPT_DONE;
+
+ assert(*state == RS_PAYLOAD_TYPE);
+
+ *state = readall(*state, sock, &retval, sizeof(retval));
+ if (*state != RS_ERROR && retval == TPPT_DONE) {
+ *state = RS_PACKET_TYPE;
+ }
+ return retval;
+}
+
+static char *readstr(recv_state_t *state, int sock)
+{
+ assert(*state == RS_PAYLOAD_TYPE);
+
+ tailer_packet_payload_type_t payload_type = read_payload_type(state, sock);
+
+ if (payload_type != TPPT_STRING) {
+ fprintf(stderr, "error: expected string, got: %d\n", payload_type);
+ return NULL;
+ }
+
+ int32_t length;
+
+ *state = RS_PAYLOAD_LENGTH;
+ *state = readall(*state, sock, &length, sizeof(length));
+ if (*state == RS_ERROR) {
+ fprintf(stderr, "error: unable to read string length\n");
+ return NULL;
+ }
+
+ char *retval = malloc(length + 1);
+ if (retval == NULL) {
+ return NULL;
+ }
+
+ *state = readall(*state, sock, retval, length);
+ if (*state == RS_ERROR) {
+ fprintf(stderr, "error: unable to read string of length: %d\n", length);
+ free(retval);
+ return NULL;
+ }
+ retval[length] = '\0';
+
+ return retval;
+}
+
+static int readint64(recv_state_t *state, int sock, int64_t *i)
+{
+ tailer_packet_payload_type_t payload_type = read_payload_type(state, sock);
+
+ if (payload_type != TPPT_INT64) {
+ fprintf(stderr, "error: expected int64, got: %d\n", payload_type);
+ return -1;
+ }
+
+ *state = RS_PAYLOAD_CONTENT;
+ *state = readall(*state, sock, i, sizeof(*i));
+ if (*state == -1) {
+ fprintf(stderr, "error: unable to read int64\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct list client_path_list;
+
+struct client_path_state *find_client_path_state(struct list *path_list, const char *path)
+{
+ struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
+
+ while (curr->cps_node.n_succ != NULL) {
+ if (strcmp(curr->cps_path, path) == 0) {
+ return curr;
+ }
+
+ struct client_path_state *child =
+ find_client_path_state(&curr->cps_children, path);
+
+ if (child != NULL) {
+ return child;
+ }
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ }
+
+ return NULL;
+}
+
+void send_preview_error(int64_t id, const char *path, const char *msg)
+{
+ send_packet(STDOUT_FILENO,
+ TPT_PREVIEW_ERROR,
+ TPPT_INT64, id,
+ TPPT_STRING, path,
+ TPPT_STRING, msg,
+ TPPT_DONE);
+}
+
+void send_preview_data(int64_t id, const char *path, int32_t len, const char *bits)
+{
+ send_packet(STDOUT_FILENO,
+ TPT_PREVIEW_DATA,
+ TPPT_INT64, id,
+ TPPT_STRING, path,
+ TPPT_BITS, len, bits,
+ TPPT_DONE);
+}
+
+int poll_paths(struct list *path_list, struct client_path_state *root_cps)
+{
+ struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
+ int is_top = root_cps == NULL;
+ int retval = 0;
+
+ while (curr->cps_node.n_succ != NULL) {
+ if (is_top) {
+ root_cps = curr;
+ }
+
+ if (is_glob(curr->cps_path)) {
+ int changes = 0;
+ glob_t gl;
+
+ memset(&gl, 0, sizeof(gl));
+ if (glob(curr->cps_path, 0, NULL, &gl) != 0) {
+ set_client_path_state_error(curr, "glob");
+ } else {
+ struct list prev_children;
+
+ list_move(&prev_children, &curr->cps_children);
+ for (size_t lpc = 0; lpc < gl.gl_pathc; lpc++) {
+ struct client_path_state *child;
+
+ if ((child = find_client_path_state(
+ &prev_children, gl.gl_pathv[lpc])) == NULL) {
+ child = create_client_path_state(gl.gl_pathv[lpc]);
+ changes += 1;
+ } else {
+ list_remove(&child->cps_node);
+ }
+ list_append(&curr->cps_children, &child->cps_node);
+ }
+ globfree(&gl);
+
+ struct client_path_state *child;
+
+ while ((child = (struct client_path_state *) list_remove_head(
+ &prev_children)) != NULL) {
+ send_error(child, "deleted");
+ delete_client_path_state(child);
+ changes += 1;
+ }
+
+ retval += poll_paths(&curr->cps_children, root_cps);
+ }
+
+ if (changes) {
+ curr->cps_client_state = CS_INIT;
+ } else if (curr->cps_client_state != CS_SYNCED) {
+ send_packet(STDOUT_FILENO,
+ TPT_SYNCED,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+ }
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ continue;
+ }
+
+ struct stat st;
+ int rc = lstat(curr->cps_path, &st);
+
+ if (rc == -1) {
+ memset(&st, 0, sizeof(st));
+ set_client_path_state_error(curr, "lstat");
+ } else if (curr->cps_client_file_offset >= 0 &&
+ ((curr->cps_last_stat.st_dev != st.st_dev &&
+ curr->cps_last_stat.st_ino != st.st_ino) ||
+ (st.st_size < curr->cps_last_stat.st_size))) {
+ send_error(curr, "replaced");
+ set_client_path_state_error(curr, "replace");
+ } else if (S_ISLNK(st.st_mode)) {
+ switch (curr->cps_client_state) {
+ case CS_INIT: {
+ char buffer[PATH_MAX];
+ ssize_t link_len;
+
+ link_len = readlink(curr->cps_path, buffer, sizeof(buffer));
+ if (link_len < 0) {
+ set_client_path_state_error(curr, "readlink");
+ } else {
+ buffer[link_len] = '\0';
+ send_packet(STDOUT_FILENO,
+ TPT_LINK_BLOCK,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_STRING, buffer,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+
+ if (buffer[0] == '/') {
+ struct client_path_state *child =
+ create_client_path_state(buffer);
+
+ fprintf(stderr, "info: monitoring link path %s\n",
+ buffer);
+ list_append(&curr->cps_children, &child->cps_node);
+ }
+
+ retval += 1;
+ }
+ break;
+ }
+ case CS_SYNCED:
+ break;
+ case CS_OFFERED:
+ case CS_TAILING:
+ fprintf(stderr,
+ "internal-error: unexpected state for path -- %s\n",
+ curr->cps_path);
+ break;
+ }
+
+ retval += poll_paths(&curr->cps_children, root_cps);
+
+ curr->cps_last_path_state = PS_OK;
+ } else if (S_ISREG(st.st_mode)) {
+ switch (curr->cps_client_state) {
+ case CS_INIT:
+ case CS_TAILING:
+ case CS_SYNCED: {
+ if (curr->cps_client_file_offset < st.st_size) {
+ int fd = open(curr->cps_path, O_RDONLY);
+
+ if (fd == -1) {
+ set_client_path_state_error(curr, "open");
+ } else {
+ static unsigned char buffer[4 * 1024 * 1024];
+
+ int64_t file_offset =
+ curr->cps_client_file_offset < 0 ?
+ 0 :
+ curr->cps_client_file_offset;
+ int64_t nbytes = sizeof(buffer);
+ if (curr->cps_client_state == CS_INIT) {
+ if (curr->cps_client_file_size == 0) {
+ // initial state, haven't heard from client yet.
+ nbytes = 32 * 1024;
+ } else if (file_offset < curr->cps_client_file_size) {
+ // heard from client, try to catch up
+ nbytes = curr->cps_client_file_size - file_offset;
+ if (nbytes > sizeof(buffer)) {
+ nbytes = sizeof(buffer);
+ }
+ }
+ }
+ int32_t bytes_read = pread(fd, buffer, nbytes, file_offset);
+
+ if (bytes_read == -1) {
+ set_client_path_state_error(curr, "pread");
+ } else if (curr->cps_client_state == CS_INIT &&
+ (curr->cps_client_file_offset < 0 ||
+ bytes_read > 0)) {
+ static unsigned char
+ HASH_BUFFER[4 * 1024 * 1024];
+ BYTE hash[SHA256_BLOCK_SIZE];
+ size_t remaining = 0;
+ int64_t remaining_offset
+ = file_offset + bytes_read;
+ SHA256_CTX shactx;
+
+ if (curr->cps_client_file_size > 0
+ && file_offset < curr->cps_client_file_size)
+ {
+ remaining = curr->cps_client_file_size
+ - file_offset - bytes_read;
+ }
+
+ fprintf(stderr,
+ "info: prepping offer: init=%d; "
+ "remaining=%zu; %s\n",
+ bytes_read,
+ remaining,
+ curr->cps_path);
+ sha256_init(&shactx);
+ sha256_update(&shactx, buffer, bytes_read);
+ while (remaining > 0) {
+ nbytes = sizeof(HASH_BUFFER);
+ if (remaining < nbytes) {
+ nbytes = remaining;
+ }
+ ssize_t remaining_bytes_read
+ = pread(fd,
+ HASH_BUFFER,
+ nbytes,
+ remaining_offset);
+ if (remaining_bytes_read < 0) {
+ set_client_path_state_error(curr, "pread");
+ break;
+ }
+ if (remaining_bytes_read == 0) {
+ remaining = 0;
+ break;
+ }
+ sha256_update(&shactx, HASH_BUFFER, remaining_bytes_read);
+ remaining -= remaining_bytes_read;
+ remaining_offset += remaining_bytes_read;
+ bytes_read += remaining_bytes_read;
+ }
+
+ if (remaining == 0) {
+ sha256_final(&shactx, hash);
+
+ send_packet(STDOUT_FILENO,
+ TPT_OFFER_BLOCK,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_INT64,
+ (int64_t) st.st_mtime,
+ TPPT_INT64, file_offset,
+ TPPT_INT64, (int64_t) bytes_read,
+ TPPT_HASH, hash,
+ TPPT_DONE);
+ curr->cps_client_state = CS_OFFERED;
+ }
+ } else {
+ if (curr->cps_client_file_offset < 0) {
+ curr->cps_client_file_offset = 0;
+ }
+
+ send_packet(STDOUT_FILENO,
+ TPT_TAIL_BLOCK,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_INT64, (int64_t) st.st_mtime,
+ TPPT_INT64, curr->cps_client_file_offset,
+ TPPT_BITS, bytes_read, buffer,
+ TPPT_DONE);
+ curr->cps_client_file_offset += bytes_read;
+ curr->cps_client_state = CS_TAILING;
+ }
+ close(fd);
+
+ retval = 1;
+ }
+ } else if (curr->cps_client_state != CS_SYNCED) {
+ send_packet(STDOUT_FILENO,
+ TPT_SYNCED,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+ }
+ break;
+ }
+ case CS_OFFERED: {
+ // Still waiting for the client ack
+ break;
+ }
+ }
+
+ curr->cps_last_path_state = PS_OK;
+ } else if (S_ISDIR(st.st_mode)) {
+ DIR *dir = opendir(curr->cps_path);
+
+ if (dir == NULL) {
+ set_client_path_state_error(curr, "opendir");
+ } else {
+ struct list prev_children;
+ struct dirent *entry;
+ int changes = 0;
+
+ list_move(&prev_children, &curr->cps_children);
+ while ((entry = readdir(dir)) != NULL) {
+ if (strcmp(entry->d_name, ".") == 0 ||
+ strcmp(entry->d_name, "..") == 0) {
+ continue;
+ }
+
+ if (entry->d_type != DT_REG &&
+ entry->d_type != DT_LNK) {
+ continue;
+ }
+
+ char full_path[PATH_MAX];
+
+ snprintf(full_path, sizeof(full_path),
+ "%s/%s",
+ curr->cps_path, entry->d_name);
+
+ struct client_path_state *child = find_client_path_state(&prev_children, full_path);
+
+ if (child == NULL) {
+ // new file
+ fprintf(stderr, "info: monitoring child path: %s\n", full_path);
+ child = create_client_path_state(full_path);
+ changes += 1;
+ } else {
+ list_remove(&child->cps_node);
+ }
+ list_append(&curr->cps_children, &child->cps_node);
+ }
+ closedir(dir);
+
+ struct client_path_state *child;
+
+ while ((child = (struct client_path_state *) list_remove_head(
+ &prev_children)) != NULL) {
+ send_error(child, "deleted");
+ delete_client_path_state(child);
+ changes += 1;
+ }
+
+ retval += poll_paths(&curr->cps_children, root_cps);
+
+ if (changes) {
+ curr->cps_client_state = CS_INIT;
+ } else if (curr->cps_client_state != CS_SYNCED) {
+ send_packet(STDOUT_FILENO,
+ TPT_SYNCED,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+ }
+ }
+
+ curr->cps_last_path_state = PS_OK;
+ }
+
+ curr->cps_last_stat = st;
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ }
+
+ fflush(stderr);
+
+ return retval;
+}
+
+static
+void send_possible_paths(const char *glob_path, int depth)
+{
+ glob_t gl;
+
+ memset(&gl, 0, sizeof(gl));
+ if (glob(glob_path, GLOB_MARK, NULL, &gl) == 0) {
+ for (size_t lpc = 0;
+ lpc < gl.gl_pathc;
+ lpc++) {
+ const char *child_path = gl.gl_pathv[lpc];
+ size_t child_len = strlen(gl.gl_pathv[lpc]);
+
+ send_packet(STDOUT_FILENO,
+ TPT_POSSIBLE_PATH,
+ TPPT_STRING, child_path,
+ TPPT_DONE);
+
+ if (depth == 0 && child_path[child_len - 1] == '/') {
+ char *child_copy = malloc(child_len + 2);
+
+ strcpy(child_copy, child_path);
+ strcat(child_copy, "*");
+ send_possible_paths(child_copy, depth + 1);
+ free(child_copy);
+ }
+ }
+ }
+
+ globfree(&gl);
+}
+
+static
+void handle_load_preview_request(const char *path, int64_t preview_id)
+{
+ struct stat st;
+
+ fprintf(stderr, "info: load preview request -- %lld\n", preview_id);
+ if (is_glob(path)) {
+ glob_t gl;
+
+ memset(&gl, 0, sizeof(gl));
+ if (glob(path, 0, NULL, &gl) != 0) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: cannot glob %s -- %s",
+ path,
+ strerror(errno));
+ send_preview_error(preview_id, path, msg);
+ } else {
+ char *bits = malloc(1024 * 1024);
+ int lpc, line_count = 10;
+
+ bits[0] = '\0';
+ for (lpc = 0;
+ line_count > 0 && lpc < gl.gl_pathc;
+ lpc++, line_count--) {
+ strcat(bits, gl.gl_pathv[lpc]);
+ strcat(bits, "\n");
+ }
+
+ if (lpc < gl.gl_pathc) {
+ strcat(bits, " ... and more! ...\n");
+ }
+
+ send_preview_data(preview_id, path, strlen(bits), bits);
+
+ globfree(&gl);
+ free(bits);
+ }
+ }
+ else if (stat(path, &st) == -1) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: cannot open %s -- %s",
+ path,
+ strerror(errno));
+ send_preview_error(preview_id, path, msg);
+ } else if (S_ISREG(st.st_mode)) {
+ size_t capacity = 1024 * 1024;
+ char *bits = malloc(capacity);
+ FILE *file;
+
+ if ((file = fopen(path, "r")) == NULL) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: cannot open %s -- %s",
+ path,
+ strerror(errno));
+ send_preview_error(preview_id, path, msg);
+ } else {
+ int line_count = 10;
+ size_t offset = 0;
+ char *line;
+
+ while (line_count &&
+ (capacity - offset) > 1024 &&
+ (line = fgets(&bits[offset], capacity - offset, file)) != NULL) {
+ offset += strlen(line);
+ line_count -= 1;
+ }
+
+ fclose(file);
+
+ send_preview_data(preview_id, path, offset, bits);
+ }
+ free(bits);
+ } else if (S_ISDIR(st.st_mode)) {
+ DIR *dir = opendir(path);
+
+ if (dir == NULL) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: unable to open directory -- %s",
+ path);
+ send_preview_error(preview_id, path, msg);
+ } else {
+ char *bits = malloc(1024 * 1024);
+ struct dirent *entry;
+ int line_count = 10;
+
+ bits[0] = '\0';
+ while ((entry = readdir(dir)) != NULL) {
+ if (strcmp(entry->d_name, ".") == 0 ||
+ strcmp(entry->d_name, "..") == 0) {
+ continue;
+ }
+ if (entry->d_type != DT_REG &&
+ entry->d_type != DT_DIR) {
+ continue;
+ }
+ if (line_count == 1) {
+ strcat(bits, " ... and more! ...\n");
+ break;
+ }
+
+ strcat(bits, entry->d_name);
+ strcat(bits, "\n");
+
+ line_count -= 1;
+ }
+
+ closedir(dir);
+
+ send_preview_data(preview_id, path, strlen(bits), bits);
+
+ free(bits);
+ }
+ } else {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: path is not a file or directory -- %s",
+ path);
+ send_preview_error(preview_id, path, msg);
+ }
+}
+
+static
+void handle_complete_path_request(const char *path)
+{
+ size_t path_len = strlen(path);
+ char *glob_path = malloc(path_len + 3);
+ struct stat st;
+
+ strcpy(glob_path, path);
+ fprintf(stderr, "complete path: %s\n", path);
+ if (path[path_len - 1] != '/' &&
+ stat(path, &st) == 0 &&
+ S_ISDIR(st.st_mode)) {
+ strcat(glob_path, "/");
+ }
+ if (path[path_len - 1] != '*') {
+ strcat(glob_path, "*");
+ }
+ fprintf(stderr, "complete glob path: %s\n", glob_path);
+ send_possible_paths(glob_path, 0);
+
+ free(glob_path);
+}
+
+int main(int argc, char *argv[])
+{
+ int done = 0, timeout = 0;
+ recv_state_t rstate = RS_PACKET_TYPE;
+
+ // No need to leave ourselves around
+ if (argc == 1) {
+ unlink(argv[0]);
+ }
+
+ list_init(&client_path_list);
+
+ {
+ FILE *unameFile = popen("uname -mrsv", "r");
+
+ if (unameFile != NULL) {
+ char buffer[1024];
+
+ fgets(buffer, sizeof(buffer), unameFile);
+ char *bufend = buffer + strlen(buffer) - 1;
+ while (isspace(*bufend)) {
+ bufend -= 1;
+ }
+ *bufend = '\0';
+ send_packet(STDOUT_FILENO,
+ TPT_ANNOUNCE,
+ TPPT_STRING, buffer,
+ TPPT_DONE);
+ pclose(unameFile);
+ }
+ }
+
+ while (!done) {
+ struct pollfd pfds[1];
+
+ pfds[0].fd = STDIN_FILENO;
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+
+ int ready_count = poll(pfds, 1, timeout);
+
+ if (ready_count) {
+ tailer_packet_type_t type;
+
+ assert(rstate == RS_PACKET_TYPE);
+ rstate = readall(rstate, STDIN_FILENO, &type, sizeof(type));
+ if (rstate == RS_ERROR) {
+ fprintf(stderr, "info: exiting...\n");
+ done = 1;
+ } else {
+ switch (type) {
+ case TPT_OPEN_PATH:
+ case TPT_CLOSE_PATH:
+ case TPT_LOAD_PREVIEW:
+ case TPT_COMPLETE_PATH: {
+ char *path = readstr(&rstate, STDIN_FILENO);
+ int64_t preview_id = 0;
+
+ if (type == TPT_LOAD_PREVIEW) {
+ if (readint64(&rstate, STDIN_FILENO, &preview_id) == -1) {
+ done = 1;
+ break;
+ }
+ }
+ if (path == NULL) {
+ fprintf(stderr, "error: unable to get path to open\n");
+ done = 1;
+ } else if (read_payload_type(&rstate, STDIN_FILENO) != TPPT_DONE) {
+ fprintf(stderr, "error: invalid open packet\n");
+ done = 1;
+ } else if (type == TPT_OPEN_PATH) {
+ struct client_path_state *cps;
+
+ cps = find_client_path_state(&client_path_list, path);
+ if (cps != NULL) {
+ fprintf(stderr, "warning: already monitoring -- %s\n", path);
+ } else {
+ cps = create_client_path_state(path);
+
+ fprintf(stderr, "info: monitoring path: %s\n", path);
+ list_append(&client_path_list, &cps->cps_node);
+ }
+ } else if (type == TPT_CLOSE_PATH) {
+ struct client_path_state *cps = find_client_path_state(&client_path_list, path);
+
+ if (cps == NULL) {
+ fprintf(stderr, "warning: path is not open: %s\n", path);
+ } else {
+ list_remove(&cps->cps_node);
+ delete_client_path_state(cps);
+ }
+ } else if (type == TPT_LOAD_PREVIEW) {
+ handle_load_preview_request(path, preview_id);
+ } else if (type == TPT_COMPLETE_PATH) {
+ handle_complete_path_request(path);
+ }
+
+ free(path);
+ break;
+ }
+ case TPT_ACK_BLOCK:
+ case TPT_NEED_BLOCK: {
+ char *path = readstr(&rstate, STDIN_FILENO);
+ int64_t ack_offset = 0, ack_len = 0, client_size = 0;
+
+ if (type == TPT_ACK_BLOCK &&
+ (readint64(&rstate, STDIN_FILENO, &ack_offset) == -1 ||
+ readint64(&rstate, STDIN_FILENO, &ack_len) == -1 ||
+ readint64(&rstate, STDIN_FILENO, &client_size) == -1)) {
+ done = 1;
+ break;
+ }
+
+ // fprintf(stderr, "info: block packet path: %s\n", path);
+ if (path == NULL) {
+ fprintf(stderr, "error: unable to get block path\n");
+ done = 1;
+ } else if (read_payload_type(&rstate, STDIN_FILENO) != TPPT_DONE) {
+ fprintf(stderr, "error: invalid block packet\n");
+ done = 1;
+ } else {
+ struct client_path_state *cps = find_client_path_state(&client_path_list, path);
+
+ if (cps == NULL) {
+ fprintf(stderr, "warning: unknown path in block packet: %s\n", path);
+ } else if (type == TPT_NEED_BLOCK) {
+ fprintf(stderr, "info: client is tailing: %s\n", path);
+ cps->cps_client_state = CS_TAILING;
+ } else if (type == TPT_ACK_BLOCK) {
+ fprintf(stderr,
+ "info: client acked: %s %lld\n",
+ path,
+ client_size);
+ if (ack_len == 0) {
+ cps->cps_client_state = CS_TAILING;
+ } else {
+ cps->cps_client_file_offset = ack_offset + ack_len;
+ cps->cps_client_state = CS_INIT;
+ cps->cps_client_file_size = client_size;
+ }
+ }
+ free(path);
+ }
+ break;
+ }
+ default: {
+ assert(0);
+ }
+ }
+ }
+ }
+
+ if (!done) {
+ if (poll_paths(&client_path_list, NULL)) {
+ timeout = 0;
+ } else {
+ timeout = 1000;
+ }
+ }
+ }
+
+ return EXIT_SUCCESS;
+}
diff --git a/src/tailer/tailerpp.cc b/src/tailer/tailerpp.cc
new file mode 100644
index 0000000..1ea0a9e
--- /dev/null
+++ b/src/tailer/tailerpp.cc
@@ -0,0 +1,146 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "tailerpp.hh"
+
+#include <unistd.h>
+
+namespace tailer {
+
+int
+readall(int sock, void* buf, size_t len)
+{
+ char* cbuf = (char*) buf;
+ off_t offset = 0;
+
+ while (len > 0) {
+ ssize_t rc = read(sock, &cbuf[offset], len);
+
+ if (rc == -1) {
+ switch (errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ return -1;
+ }
+ } else if (rc == 0) {
+ errno = EIO;
+ return -1;
+ } else {
+ len -= rc;
+ offset += rc;
+ }
+ }
+
+ return 0;
+}
+
+Result<packet, std::string>
+read_packet(int fd)
+{
+ tailer_packet_type_t type;
+
+ if (readall(fd, &type, sizeof(type)) == -1) {
+ return Ok(packet{packet_eof{}});
+ }
+ switch (type) {
+ case TPT_ERROR: {
+ packet_error pe;
+
+ TRY(read_payloads_into(fd, pe.pe_path, pe.pe_msg));
+ return Ok(packet{pe});
+ }
+ case TPT_ANNOUNCE: {
+ packet_announce pa;
+
+ TRY(read_payloads_into(fd, pa.pa_uname));
+ return Ok(packet{pa});
+ }
+ case TPT_OFFER_BLOCK: {
+ packet_offer_block pob;
+
+ TRY(read_payloads_into(fd,
+ pob.pob_root_path,
+ pob.pob_path,
+ pob.pob_mtime,
+ pob.pob_offset,
+ pob.pob_length,
+ pob.pob_hash));
+ return Ok(packet{pob});
+ }
+ case TPT_TAIL_BLOCK: {
+ packet_tail_block ptb;
+
+ TRY(read_payloads_into(fd,
+ ptb.ptb_root_path,
+ ptb.ptb_path,
+ ptb.ptb_mtime,
+ ptb.ptb_offset,
+ ptb.ptb_bits));
+ return Ok(packet{ptb});
+ }
+ case TPT_SYNCED: {
+ packet_synced ps;
+
+ TRY(read_payloads_into(fd, ps.ps_root_path, ps.ps_path));
+ return Ok(packet{ps});
+ }
+ case TPT_LINK_BLOCK: {
+ packet_link pl;
+
+ TRY(read_payloads_into(
+ fd, pl.pl_root_path, pl.pl_path, pl.pl_link_value));
+ return Ok(packet{pl});
+ }
+ case TPT_PREVIEW_ERROR: {
+ packet_preview_error ppe;
+
+ TRY(read_payloads_into(fd, ppe.ppe_id, ppe.ppe_path, ppe.ppe_msg));
+ return Ok(packet{ppe});
+ }
+ case TPT_PREVIEW_DATA: {
+ packet_preview_data ppd;
+
+ TRY(read_payloads_into(fd, ppd.ppd_id, ppd.ppd_path, ppd.ppd_bits));
+ return Ok(packet{ppd});
+ }
+ case TPT_POSSIBLE_PATH: {
+ packet_possible_path ppp;
+
+ TRY(read_payloads_into(fd, ppp.ppp_path));
+ return Ok(packet{ppp});
+ }
+ default:
+ assert(0);
+ break;
+ }
+}
+
+} // namespace tailer
diff --git a/src/tailer/tailerpp.hh b/src/tailer/tailerpp.hh
new file mode 100644
index 0000000..0447379
--- /dev/null
+++ b/src/tailer/tailerpp.hh
@@ -0,0 +1,315 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef lnav_tailerpp_hh
+#define lnav_tailerpp_hh
+
+#include <string>
+#include <vector>
+
+#include "base/result.h"
+#include "fmt/format.h"
+#include "mapbox/variant.hpp"
+#include "sha-256.h"
+#include "tailer.h"
+
+namespace tailer {
+
+struct packet_eof {};
+
+struct packet_error {
+ std::string pe_path;
+ std::string pe_msg;
+};
+
+struct packet_announce {
+ std::string pa_uname;
+};
+
+struct hash_frag {
+ uint8_t thf_hash[SHA256_BLOCK_SIZE];
+
+ bool operator==(const hash_frag& other) const
+ {
+ return memcmp(this->thf_hash, other.thf_hash, sizeof(this->thf_hash))
+ == 0;
+ }
+};
+
+struct packet_log {
+ std::string pl_msg;
+};
+
+struct packet_offer_block {
+ std::string pob_root_path;
+ std::string pob_path;
+ int64_t pob_mtime;
+ int64_t pob_offset;
+ int64_t pob_length;
+ hash_frag pob_hash;
+};
+
+struct packet_tail_block {
+ std::string ptb_root_path;
+ std::string ptb_path;
+ int64_t ptb_mtime;
+ int64_t ptb_offset;
+ std::vector<uint8_t> ptb_bits;
+};
+
+struct packet_synced {
+ std::string ps_root_path;
+ std::string ps_path;
+};
+
+struct packet_link {
+ std::string pl_root_path;
+ std::string pl_path;
+ std::string pl_link_value;
+};
+
+struct packet_preview_error {
+ int64_t ppe_id;
+ std::string ppe_path;
+ std::string ppe_msg;
+};
+
+struct packet_preview_data {
+ int64_t ppd_id;
+ std::string ppd_path;
+ std::vector<uint8_t> ppd_bits;
+};
+
+struct packet_possible_path {
+ std::string ppp_path;
+};
+
+using packet = mapbox::util::variant<packet_eof,
+ packet_announce,
+ packet_error,
+ packet_offer_block,
+ packet_tail_block,
+ packet_link,
+ packet_preview_error,
+ packet_preview_data,
+ packet_possible_path,
+ packet_synced>;
+
+struct recv_payload_type {};
+struct recv_payload_length {};
+struct recv_payload_content {};
+
+int readall(int sock, void* buf, size_t len);
+
+namespace details {
+
+template<class...>
+using void_t = void;
+
+template<class, class = void>
+struct has_data : std::false_type {};
+
+template<class T>
+struct has_data<T, decltype(void(std::declval<T&>().data()))>
+ : std::true_type {};
+
+template<typename T, std::enable_if_t<has_data<T>::value, bool> = true>
+uint8_t*
+get_data(T& t)
+{
+ return (uint8_t*) t.data();
+}
+
+template<typename T, std::enable_if_t<!has_data<T>::value, bool> = true>
+uint8_t*
+get_data(T& t)
+{
+ return (uint8_t*) &t;
+}
+
+} // namespace details
+
+template<int PAYLOAD_TYPE, typename EXPECT = recv_payload_type>
+struct protocol_recv {
+ static constexpr bool HAS_LENGTH = (PAYLOAD_TYPE == TPPT_STRING)
+ || (PAYLOAD_TYPE == TPPT_BITS);
+
+ using after_type = typename std::conditional<HAS_LENGTH,
+ recv_payload_length,
+ recv_payload_content>::type;
+
+ static Result<protocol_recv<PAYLOAD_TYPE, after_type>, std::string> create(
+ int fd)
+ {
+ return protocol_recv<PAYLOAD_TYPE>(fd).read_type();
+ }
+
+ Result<protocol_recv<PAYLOAD_TYPE, after_type>, std::string> read_type() &&
+ {
+ static_assert(std::is_same<EXPECT, recv_payload_type>::value,
+ "read_type() cannot be called in this state");
+
+ tailer_packet_payload_type_t payload_type;
+
+ if (readall(this->pr_fd, &payload_type, sizeof(payload_type)) == -1) {
+ return Err(
+ fmt::format(FMT_STRING("unable to read payload type: {}"),
+ strerror(errno)));
+ }
+
+ if (payload_type != PAYLOAD_TYPE) {
+ return Err(fmt::format(
+ FMT_STRING("payload-type mismatch, got: {}; expected: {}"),
+ (int) payload_type,
+ PAYLOAD_TYPE));
+ }
+
+ return Ok(protocol_recv<PAYLOAD_TYPE, after_type>(this->pr_fd));
+ }
+
+ template<typename T>
+ Result<protocol_recv<PAYLOAD_TYPE, recv_payload_content>, std::string>
+ read_length(T& data) &&
+ {
+ static_assert(std::is_same<EXPECT, recv_payload_length>::value,
+ "read_length() cannot be called in this state");
+
+ if (readall(this->pr_fd, &this->pr_length, sizeof(this->pr_length))
+ == -1)
+ {
+ return Err(
+ fmt::format(FMT_STRING("unable to read content length: {}"),
+ strerror(errno)));
+ }
+
+ try {
+ data.resize(this->pr_length);
+ } catch (...) {
+ return Err(fmt::format(FMT_STRING("unable to resize data to {}"),
+ this->pr_length));
+ }
+
+ return Ok(protocol_recv<PAYLOAD_TYPE, recv_payload_content>(
+ this->pr_fd, this->pr_length));
+ }
+
+ template<typename T>
+ Result<void, std::string> read_content(T& data) &&
+ {
+ static_assert(std::is_same<EXPECT, recv_payload_content>::value,
+ "read_content() cannot be called in this state");
+ static_assert(!HAS_LENGTH || details::has_data<T>::value, "boo");
+
+ if (!HAS_LENGTH) {
+ this->pr_length = sizeof(T);
+ }
+ if (readall(this->pr_fd, details::get_data(data), this->pr_length)
+ == -1)
+ {
+ return Err(fmt::format(FMT_STRING("unable to read content -- {}"),
+ strerror(errno)));
+ }
+
+ return Ok();
+ }
+
+private:
+ template<int P, typename E>
+ friend struct protocol_recv;
+
+ explicit protocol_recv(int fd, int32_t length = 0)
+ : pr_fd(fd), pr_length(length)
+ {
+ }
+
+ int pr_fd;
+ int32_t pr_length;
+};
+
+inline Result<void, std::string>
+read_payloads_into(int fd)
+{
+ tailer_packet_payload_type_t payload_type;
+
+ readall(fd, &payload_type, sizeof(payload_type));
+ if (payload_type != TPPT_DONE) {
+ return Err(std::string("not done"));
+ }
+
+ return Ok();
+}
+
+template<typename... Ts>
+Result<void, std::string> read_payloads_into(int fd,
+ std::string& str,
+ Ts&... args);
+
+template<typename... Ts>
+Result<void, std::string>
+read_payloads_into(int fd, std::vector<uint8_t>& bits, Ts&... args)
+{
+ TRY(TRY(TRY(protocol_recv<TPPT_BITS>::create(fd)).read_length(bits))
+ .read_content(bits));
+
+ return read_payloads_into(fd, args...);
+}
+
+template<typename... Ts>
+Result<void, std::string>
+read_payloads_into(int fd, hash_frag& thf, Ts&... args)
+{
+ TRY(TRY(protocol_recv<TPPT_HASH>::create(fd)).read_content(thf.thf_hash));
+
+ return read_payloads_into(fd, args...);
+}
+
+template<typename... Ts>
+Result<void, std::string>
+read_payloads_into(int fd, int64_t& i, Ts&... args)
+{
+ TRY(TRY(protocol_recv<TPPT_INT64>::create(fd)).read_content(i));
+
+ return read_payloads_into(fd, args...);
+}
+
+template<typename... Ts>
+Result<void, std::string>
+read_payloads_into(int fd, std::string& str, Ts&... args)
+{
+ TRY(TRY(TRY(protocol_recv<TPPT_STRING>::create(fd)).read_length(str))
+ .read_content(str));
+
+ return read_payloads_into(fd, args...);
+}
+
+Result<packet, std::string> read_packet(int fd);
+
+} // namespace tailer
+
+#endif
diff --git a/src/tailer/test_tailer.sh b/src/tailer/test_tailer.sh
new file mode 100755
index 0000000..932fe24
--- /dev/null
+++ b/src/tailer/test_tailer.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+
+run_test ./drive_tailer preview nonexistent-file
+
+check_error_output "preview of nonexistent-file failed?" <<EOF
+preview error: nonexistent-file -- error: cannot open nonexistent-file -- No such file or directory
+tailer stderr:
+info: load preview request -- 1234
+info: exiting...
+EOF
+
+run_test ./drive_tailer preview ${test_dir}/logfile_access_log.0
+
+check_output "preview of file failed?" <<EOF
+preview of file: {test_dir}/logfile_access_log.0
+192.168.202.254 - - [20/Jul/2009:22:59:26 +0000] "GET /vmw/cgi/tramp HTTP/1.0" 200 134 "-" "gPXE/0.9.7"
+192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkboot.gz HTTP/1.0" 404 46210 "-" "gPXE/0.9.7"
+192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkernel.gz HTTP/1.0" 200 78929 "-" "gPXE/0.9.7"
+
+all done!
+tailer stderr:
+info: load preview request -- 1234
+info: exiting...
+EOF
+
+run_cap_test ./drive_tailer preview "${test_dir}/remote-log-dir/*"
+
+run_test ./drive_tailer possible "${test_dir}/logfile_access_log.*"
+
+check_output "possible path list failed?" <<EOF
+possible path: {test_dir}/logfile_access_log.0
+possible path: {test_dir}/logfile_access_log.1
+all done!
+tailer stderr:
+complete path: {test_dir}/logfile_access_log.*
+complete glob path: {test_dir}/logfile_access_log.*
+info: exiting...
+EOF
+
+ln -sf bar foo
+
+run_test ./drive_tailer open foo
+
+check_output "open link not working?" <<EOF
+link value: foo -> bar
+all done!
+tailer stderr:
+info: monitoring path: foo
+info: exiting...
+EOF