diff options
Diffstat (limited to 'src/tailer')
-rw-r--r-- | src/tailer/CMakeLists.txt | 24 | ||||
-rw-r--r-- | src/tailer/Makefile.am | 111 | ||||
-rw-r--r-- | src/tailer/README.md | 35 | ||||
-rw-r--r-- | src/tailer/drive_tailer.cc | 288 | ||||
-rw-r--r-- | src/tailer/sha-256.c | 161 | ||||
-rw-r--r-- | src/tailer/sha-256.h | 44 | ||||
-rwxr-xr-x | src/tailer/tailer.ape | bin | 0 -> 147456 bytes | |||
-rw-r--r-- | src/tailer/tailer.c | 100 | ||||
-rw-r--r-- | src/tailer/tailer.h | 77 | ||||
-rw-r--r-- | src/tailer/tailer.looper.cc | 1192 | ||||
-rw-r--r-- | src/tailer/tailer.looper.cfg.hh | 53 | ||||
-rw-r--r-- | src/tailer/tailer.looper.hh | 158 | ||||
-rw-r--r-- | src/tailer/tailer.main.c | 1072 | ||||
-rw-r--r-- | src/tailer/tailerpp.cc | 146 | ||||
-rw-r--r-- | src/tailer/tailerpp.hh | 315 | ||||
-rwxr-xr-x | src/tailer/test_tailer.sh | 50 |
16 files changed, 3826 insertions, 0 deletions
diff --git a/src/tailer/CMakeLists.txt b/src/tailer/CMakeLists.txt new file mode 100644 index 0000000..1d1fb15 --- /dev/null +++ b/src/tailer/CMakeLists.txt @@ -0,0 +1,24 @@ +add_library(tailercommon sha-256.c sha-256.h tailer.c tailer.h) + +add_executable(tailer tailer.main.c) + +target_link_libraries(tailer tailercommon) + +add_library(tailerpp tailerpp.hh tailerpp.cc) +target_link_libraries(tailerpp base) + +add_custom_command( + OUTPUT tailerbin.h tailerbin.cc + COMMAND bin2c -n tailer_bin tailerbin tailer + DEPENDS bin2c tailer) + +add_library(tailerservice tailer.looper.hh tailer.looper.cc + tailer.looper.cfg.hh tailerbin.h tailerbin.cc) +target_include_directories(tailerservice PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) +target_link_libraries(tailerservice base) + +add_executable(drive_tailer drive_tailer.cc) + +target_include_directories(drive_tailer PUBLIC . .. ../fmtlib + ${CMAKE_CURRENT_BINARY_DIR}/..) +target_link_libraries(drive_tailer base tailercommon tailerpp ZLIB::ZLIB) diff --git a/src/tailer/Makefile.am b/src/tailer/Makefile.am new file mode 100644 index 0000000..bb8a39a --- /dev/null +++ b/src/tailer/Makefile.am @@ -0,0 +1,111 @@ + +include $(top_srcdir)/aminclude_static.am + +TESTS_ENVIRONMENT = $(SHELL) $(top_builddir)/TESTS_ENVIRONMENT +LOG_COMPILER = $(SHELL) $(top_builddir)/TESTS_ENVIRONMENT + +BUILT_SOURCES = tailerbin.cc + +AM_CPPFLAGS = \ + -Wall \ + $(CODE_COVERAGE_CPPFLAGS) \ + $(LIBARCHIVE_CFLAGS) \ + $(READLINE_CFLAGS) \ + $(SQLITE3_CFLAGS) \ + $(PCRE_CFLAGS) \ + $(LIBCURL_CPPFLAGS) + +AM_LIBS = $(CODE_COVERAGE_LIBS) +AM_CFLAGS = $(CODE_COVERAGE_CFLAGS) +AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) + +dist_noinst_DATA = \ + tailer.ape + +noinst_LIBRARIES = \ + libtailercommon.a \ + libtailerpp.a \ + libtailerservice.a + +noinst_HEADERS = \ + sha-256.h \ + tailer.h \ + tailer.looper.hh \ + tailer.looper.cfg.hh \ + tailerpp.hh + +libtailercommon_a_SOURCES = \ + sha-256.c \ + tailer.c + +libtailerpp_a_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + -I$(srcdir)/.. \ + -I$(srcdir)/../fmtlib \ + -I$(srcdir)/../third-party \ + -I$(top_srcdir)/src/third-party/scnlib/include + +libtailerpp_a_SOURCES = \ + tailerpp.cc + +tailerbin.cc: tailer.ape ../../tools/bin2c$(BUILD_EXEEXT) + ../../tools/bin2c$(BUILD_EXEEXT) -n tailer_bin tailerbin $(srcdir)/tailer.ape + +libtailerservice_a_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + -I$(srcdir)/.. \ + -I$(srcdir)/../fmtlib \ + -I$(srcdir)/../third-party \ + -I$(top_srcdir)/src/third-party/scnlib/include + +libtailerservice_a_SOURCES = \ + tailerbin.cc \ + tailer.looper.cc + +check_PROGRAMS = \ + drive_tailer \ + tailer + +tailer_SOURCES = \ + tailer.main.c + +tailer_LDADD = libtailercommon.a + +drive_tailer_CPPFLAGS = \ + -I$(srcdir)/.. \ + -I$(srcdir)/../fmtlib \ + -I$(top_srcdir)/src/third-party/scnlib/include + +drive_tailer_SOURCES = \ + drive_tailer.cc + +drive_tailer_LDADD = \ + libtailercommon.a \ + libtailerpp.a \ + ../base/libbase.a \ + ../fmtlib/libcppfmt.a + +dist_noinst_SCRIPTS = \ + test_tailer.sh + +TESTS = \ + test_tailer.sh + +DISTCLEANFILES = \ + *.cmd \ + *.dat \ + *.out \ + *.err \ + *.db \ + *.dpt \ + *.diff \ + *.index \ + *.tmp \ + *.outbak \ + *.errbak \ + *.tmpbak \ + tailerbin.h \ + tailerbin.cc + +distclean-local: + $(RM_V)rm -f foo diff --git a/src/tailer/README.md b/src/tailer/README.md new file mode 100644 index 0000000..74660f9 --- /dev/null +++ b/src/tailer/README.md @@ -0,0 +1,35 @@ +# Tailer + +This directory contains the functionality for monitoring +[remote files](https://docs.lnav.org/en/latest/usage.html#remote-files). The +name "tailer" refers to the binary that is transferred to the remote host that +takes care of tailing files and sending the contents back to the host that is +running the main lnav binary. To ease integration with lnav's existing +functionality, the remote files are mirrored locally. The tailer also +supports interactive use by providing previews of file contents and +TAB-completion possibilities. + +## Files + +The important files in this directory are: + +- [tailer.main.c](tailer.main.c) - The main() implementation for the tailer. +- [tailer.looper.hh](tailer.looper.hh) - The service in the main lnav binary + that transfers tailers to hosts and communicates with them. +- [tailer.h](tailer.h) and [tailerpp.hh](tailerpp.hh) - Utility libraries for + the tailer protocol. +- tailer.ape - The [αcτµαlly pδrταblε εxεcµταblε](https://justine.lol/ape.html) + build of the tailer. This binary is produced by a GitHub Action and checked + in so the build process doesn't need to be supported on lots of platforms. + +## Flow + +When a remote-path is passed to lnav, the +[file_collection.hh](../file_collection.hh) logic forwards the request to the +`tailer::looper` service. This service makes two connections to the remote +host using the `ssh` command so that the user's custom configurations will be +used. The first connection is used to transfer the "tailer.ape" binary and +make it executable. The second connection starts the tailer and uses +stdin/stdout for a binary protocol and stderr for logging. The tailer then +waits for requests to open files, preview files, and get possible paths for +TAB-completions. diff --git a/src/tailer/drive_tailer.cc b/src/tailer/drive_tailer.cc new file mode 100644 index 0000000..18ead43 --- /dev/null +++ b/src/tailer/drive_tailer.cc @@ -0,0 +1,288 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <thread> + +#include <unistd.h> + +#include "base/auto_fd.hh" +#include "base/auto_pid.hh" +#include "config.h" +#include "ghc/filesystem.hpp" +#include "line_buffer.hh" +#include "tailerpp.hh" + +static void +read_err_pipe(auto_fd& err, std::string& eq) +{ + while (true) { + char buffer[1024]; + auto rc = read(err.get(), buffer, sizeof(buffer)); + + if (rc <= 0) { + break; + } + + eq.append(buffer, rc); + } +} + +int +main(int argc, char* const* argv) +{ + if (argc != 3) { + fprintf(stderr, "usage: %s <cmd> <path>\n", argv[0]); + exit(EXIT_FAILURE); + } + + auto in_pipe_res = auto_pipe::for_child_fd(STDIN_FILENO); + if (in_pipe_res.isErr()) { + fprintf(stderr, + "cannot open stdin pipe for child: %s\n", + in_pipe_res.unwrapErr().c_str()); + exit(EXIT_FAILURE); + } + + auto out_pipe_res = auto_pipe::for_child_fd(STDOUT_FILENO); + if (out_pipe_res.isErr()) { + fprintf(stderr, + "cannot open stdout pipe for child: %s\n", + out_pipe_res.unwrapErr().c_str()); + exit(EXIT_FAILURE); + } + + auto err_pipe_res = auto_pipe::for_child_fd(STDERR_FILENO); + if (err_pipe_res.isErr()) { + fprintf(stderr, + "cannot open stderr pipe for child: %s\n", + err_pipe_res.unwrapErr().c_str()); + exit(EXIT_FAILURE); + } + + auto fork_res = lnav::pid::from_fork(); + if (fork_res.isErr()) { + fprintf( + stderr, "cannot start tailer: %s\n", fork_res.unwrapErr().c_str()); + exit(EXIT_FAILURE); + } + + auto in_pipe = in_pipe_res.unwrap(); + auto out_pipe = out_pipe_res.unwrap(); + auto err_pipe = err_pipe_res.unwrap(); + auto child = fork_res.unwrap(); + + in_pipe.after_fork(child.in()); + out_pipe.after_fork(child.in()); + err_pipe.after_fork(child.in()); + + if (child.in_child()) { + auto this_exe = ghc::filesystem::path(argv[0]); + auto exe_dir = this_exe.parent_path(); + auto tailer_exe = exe_dir / "tailer"; + + execlp(tailer_exe.c_str(), tailer_exe.c_str(), "-k", nullptr); + exit(EXIT_FAILURE); + } + + std::string error_queue; + std::thread err_reader( + [err = std::move(err_pipe.read_end()), &error_queue]() mutable { + read_err_pipe(err, error_queue); + }); + + auto& to_child = in_pipe.write_end(); + auto& from_child = out_pipe.read_end(); + auto cmd = std::string(argv[1]); + + if (cmd == "open") { + send_packet( + to_child.get(), TPT_OPEN_PATH, TPPT_STRING, argv[2], TPPT_DONE); + } else if (cmd == "preview") { + send_packet(to_child.get(), + TPT_LOAD_PREVIEW, + TPPT_STRING, + argv[2], + TPPT_INT64, + int64_t{1234}, + TPPT_DONE); + } else if (cmd == "possible") { + send_packet( + to_child.get(), TPT_COMPLETE_PATH, TPPT_STRING, argv[2], TPPT_DONE); + } else { + fprintf(stderr, "error: unknown command -- %s\n", cmd.c_str()); + exit(EXIT_FAILURE); + } + + close(to_child.get()); + + bool done = false; + while (!done) { + auto read_res = tailer::read_packet(from_child); + + if (read_res.isErr()) { + fprintf(stderr, "read error: %s\n", read_res.unwrapErr().c_str()); + exit(EXIT_FAILURE); + } + + auto packet = read_res.unwrap(); + packet.match( + [&](const tailer::packet_eof& te) { + printf("all done!\n"); + done = true; + }, + [&](const tailer::packet_announce& pa) {}, + [&](const tailer::packet_log& te) { + printf("log: %s\n", te.pl_msg.c_str()); + }, + [&](const tailer::packet_error& pe) { + printf("Got an error: %s -- %s\n", + pe.pe_path.c_str(), + pe.pe_msg.c_str()); + + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(pe.pe_path)) + .relative_path(); + + printf("removing %s\n", remote_path.c_str()); + }, + [&](const tailer::packet_offer_block& pob) { + printf("Got an offer: %s %lld - %lld\n", + pob.pob_path.c_str(), + pob.pob_offset, + pob.pob_length); + + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(pob.pob_path)) + .relative_path(); +#if 0 + auto local_path = tmppath / remote_path; + auto fd = auto_fd(open(local_path.c_str(), O_RDONLY)); + + if (fd == -1) { + printf("sending need block\n"); + send_packet(to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, pob.pob_path.c_str(), + TPPT_DONE); + return; + } + + struct stat st; + + if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) { + ghc::filesystem::remove_all(local_path); + send_packet(to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, pob.pob_path.c_str(), + TPPT_DONE); + return; + } + auto_mem<char> buffer; + + buffer = (char *) malloc(pob.pob_length); + auto bytes_read = pread(fd, buffer, pob.pob_length, + pob.pob_offset); + + // fprintf(stderr, "debug: bytes_read %ld\n", bytes_read); + if (bytes_read == pob.pob_length) { + tailer::hash_frag thf; + calc_sha_256(thf.thf_hash, buffer, bytes_read); + + if (thf == pob.pob_hash) { + send_packet(to_child.get(), + TPT_ACK_BLOCK, + TPPT_STRING, pob.pob_path.c_str(), + TPPT_DONE); + return; + } + } else if (bytes_read == -1) { + ghc::filesystem::remove_all(local_path); + } + send_packet(to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, pob.pob_path.c_str(), + TPPT_DONE); +#endif + }, + [&](const tailer::packet_tail_block& ptb) { +#if 0 + //printf("got a tail: %s %lld %ld\n", ptb.ptb_path.c_str(), + // ptb.ptb_offset, ptb.ptb_bits.size()); + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(ptb.ptb_path)).relative_path(); + auto local_path = tmppath / remote_path; + + ghc::filesystem::create_directories(local_path.parent_path()); + auto fd = auto_fd( + open(local_path.c_str(), O_WRONLY | O_APPEND | O_CREAT, + 0600)); + + if (fd == -1) { + perror("open"); + } else { + ftruncate(fd, ptb.ptb_offset); + pwrite(fd, ptb.ptb_bits.data(), ptb.ptb_bits.size(), ptb.ptb_offset); + } +#endif + }, + [&](const tailer::packet_synced& ps) { + + }, + [&](const tailer::packet_link& pl) { + printf("link value: %s -> %s\n", + pl.pl_path.c_str(), + pl.pl_link_value.c_str()); + }, + [&](const tailer::packet_preview_error& ppe) { + fprintf(stderr, + "preview error: %s -- %s\n", + ppe.ppe_path.c_str(), + ppe.ppe_msg.c_str()); + }, + [&](const tailer::packet_preview_data& ppd) { + printf("preview of file: %s\n%.*s\n", + ppd.ppd_path.c_str(), + (int) ppd.ppd_bits.size(), + ppd.ppd_bits.data()); + }, + [&](const tailer::packet_possible_path& ppp) { + printf("possible path: %s\n", ppp.ppp_path.c_str()); + }); + } + + auto finished_child = std::move(child).wait_for_child(); + if (!finished_child.was_normal_exit()) { + fprintf(stderr, "error: child exited abnormally\n"); + } + + err_reader.join(); + + printf("tailer stderr:\n%s", error_queue.c_str()); + fprintf(stderr, "tailer stderr:\n%s", error_queue.c_str()); +} diff --git a/src/tailer/sha-256.c b/src/tailer/sha-256.c new file mode 100644 index 0000000..8e390f8 --- /dev/null +++ b/src/tailer/sha-256.c @@ -0,0 +1,161 @@ +/********************************************************************* +* Filename: sha256.c +* Author: Brad Conte (brad AT bradconte.com) +* Copyright: +* Disclaimer: This code is presented "as is" without any guarantees. +* Details: Implementation of the SHA-256 hashing algorithm. + SHA-256 is one of the three algorithms in the SHA2 + specification. The others, SHA-384 and SHA-512, are not + offered in this implementation. + Algorithm specification can be found here: + * http://csrc.nist.gov/publications/fips/fips180-2/fips180-2withchangenotice.pdf + This implementation uses little endian byte order. +*********************************************************************/ + +/*************************** HEADER FILES ***************************/ +#ifndef __COSMOPOLITAN__ +#include <stdlib.h> +#include <memory.h> +#endif + +#include "sha-256.h" + +/****************************** MACROS ******************************/ +#define ROTLEFT(a,b) (((a) << (b)) | ((a) >> (32-(b)))) +#define ROTRIGHT(a,b) (((a) >> (b)) | ((a) << (32-(b)))) + +#define CH(x,y,z) (((x) & (y)) ^ (~(x) & (z))) +#define MAJ(x,y,z) (((x) & (y)) ^ ((x) & (z)) ^ ((y) & (z))) +#define EP0(x) (ROTRIGHT(x,2) ^ ROTRIGHT(x,13) ^ ROTRIGHT(x,22)) +#define EP1(x) (ROTRIGHT(x,6) ^ ROTRIGHT(x,11) ^ ROTRIGHT(x,25)) +#define SIG0(x) (ROTRIGHT(x,7) ^ ROTRIGHT(x,18) ^ ((x) >> 3)) +#define SIG1(x) (ROTRIGHT(x,17) ^ ROTRIGHT(x,19) ^ ((x) >> 10)) + +/**************************** VARIABLES *****************************/ +static const WORD k[64] = { + 0x428a2f98,0x71374491,0xb5c0fbcf,0xe9b5dba5,0x3956c25b,0x59f111f1,0x923f82a4,0xab1c5ed5, + 0xd807aa98,0x12835b01,0x243185be,0x550c7dc3,0x72be5d74,0x80deb1fe,0x9bdc06a7,0xc19bf174, + 0xe49b69c1,0xefbe4786,0x0fc19dc6,0x240ca1cc,0x2de92c6f,0x4a7484aa,0x5cb0a9dc,0x76f988da, + 0x983e5152,0xa831c66d,0xb00327c8,0xbf597fc7,0xc6e00bf3,0xd5a79147,0x06ca6351,0x14292967, + 0x27b70a85,0x2e1b2138,0x4d2c6dfc,0x53380d13,0x650a7354,0x766a0abb,0x81c2c92e,0x92722c85, + 0xa2bfe8a1,0xa81a664b,0xc24b8b70,0xc76c51a3,0xd192e819,0xd6990624,0xf40e3585,0x106aa070, + 0x19a4c116,0x1e376c08,0x2748774c,0x34b0bcb5,0x391c0cb3,0x4ed8aa4a,0x5b9cca4f,0x682e6ff3, + 0x748f82ee,0x78a5636f,0x84c87814,0x8cc70208,0x90befffa,0xa4506ceb,0xbef9a3f7,0xc67178f2 +}; + +/*********************** FUNCTION DEFINITIONS ***********************/ +void sha256_transform(SHA256_CTX *ctx, const BYTE data[]) +{ + WORD a, b, c, d, e, f, g, h, i, j, t1, t2, m[64]; + + for (i = 0, j = 0; i < 16; ++i, j += 4) + m[i] = (data[j] << 24) | (data[j + 1] << 16) | (data[j + 2] << 8) | (data[j + 3]); + for ( ; i < 64; ++i) + m[i] = SIG1(m[i - 2]) + m[i - 7] + SIG0(m[i - 15]) + m[i - 16]; + + a = ctx->state[0]; + b = ctx->state[1]; + c = ctx->state[2]; + d = ctx->state[3]; + e = ctx->state[4]; + f = ctx->state[5]; + g = ctx->state[6]; + h = ctx->state[7]; + + for (i = 0; i < 64; ++i) { + t1 = h + EP1(e) + CH(e,f,g) + k[i] + m[i]; + t2 = EP0(a) + MAJ(a,b,c); + h = g; + g = f; + f = e; + e = d + t1; + d = c; + c = b; + b = a; + a = t1 + t2; + } + + ctx->state[0] += a; + ctx->state[1] += b; + ctx->state[2] += c; + ctx->state[3] += d; + ctx->state[4] += e; + ctx->state[5] += f; + ctx->state[6] += g; + ctx->state[7] += h; +} + +void sha256_init(SHA256_CTX *ctx) +{ + ctx->datalen = 0; + ctx->bitlen = 0; + ctx->state[0] = 0x6a09e667; + ctx->state[1] = 0xbb67ae85; + ctx->state[2] = 0x3c6ef372; + ctx->state[3] = 0xa54ff53a; + ctx->state[4] = 0x510e527f; + ctx->state[5] = 0x9b05688c; + ctx->state[6] = 0x1f83d9ab; + ctx->state[7] = 0x5be0cd19; +} + +void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len) +{ + WORD i; + + for (i = 0; i < len; ++i) { + ctx->data[ctx->datalen] = data[i]; + ctx->datalen++; + if (ctx->datalen == 64) { + sha256_transform(ctx, ctx->data); + ctx->bitlen += 512; + ctx->datalen = 0; + } + } +} + +void sha256_final(SHA256_CTX *ctx, BYTE hash[]) +{ + WORD i; + + i = ctx->datalen; + + // Pad whatever data is left in the buffer. + if (ctx->datalen < 56) { + ctx->data[i++] = 0x80; + while (i < 56) + ctx->data[i++] = 0x00; + } + else { + ctx->data[i++] = 0x80; + while (i < 64) + ctx->data[i++] = 0x00; + sha256_transform(ctx, ctx->data); + memset(ctx->data, 0, 56); + } + + // Append to the padding the total message's length in bits and transform. + ctx->bitlen += ctx->datalen * 8; + ctx->data[63] = ctx->bitlen; + ctx->data[62] = ctx->bitlen >> 8; + ctx->data[61] = ctx->bitlen >> 16; + ctx->data[60] = ctx->bitlen >> 24; + ctx->data[59] = ctx->bitlen >> 32; + ctx->data[58] = ctx->bitlen >> 40; + ctx->data[57] = ctx->bitlen >> 48; + ctx->data[56] = ctx->bitlen >> 56; + sha256_transform(ctx, ctx->data); + + // Since this implementation uses little endian byte ordering and SHA uses big endian, + // reverse all the bytes when copying the final state to the output hash. + for (i = 0; i < 4; ++i) { + hash[i] = (ctx->state[0] >> (24 - i * 8)) & 0x000000ff; + hash[i + 4] = (ctx->state[1] >> (24 - i * 8)) & 0x000000ff; + hash[i + 8] = (ctx->state[2] >> (24 - i * 8)) & 0x000000ff; + hash[i + 12] = (ctx->state[3] >> (24 - i * 8)) & 0x000000ff; + hash[i + 16] = (ctx->state[4] >> (24 - i * 8)) & 0x000000ff; + hash[i + 20] = (ctx->state[5] >> (24 - i * 8)) & 0x000000ff; + hash[i + 24] = (ctx->state[6] >> (24 - i * 8)) & 0x000000ff; + hash[i + 28] = (ctx->state[7] >> (24 - i * 8)) & 0x000000ff; + } +} diff --git a/src/tailer/sha-256.h b/src/tailer/sha-256.h new file mode 100644 index 0000000..028fcfb --- /dev/null +++ b/src/tailer/sha-256.h @@ -0,0 +1,44 @@ +/********************************************************************* +* Filename: sha256.h +* Author: Brad Conte (brad AT bradconte.com) +* Copyright: +* Disclaimer: This code is presented "as is" without any guarantees. +* Details: Defines the API for the corresponding SHA1 implementation. +*********************************************************************/ + +#ifndef SHA256_H +#define SHA256_H + +/*************************** HEADER FILES ***************************/ +#ifndef __COSMOPOLITAN__ +#include <stddef.h> +#endif + +/****************************** MACROS ******************************/ +#define SHA256_BLOCK_SIZE 32 // SHA256 outputs a 32 byte digest + +/**************************** DATA TYPES ****************************/ +typedef unsigned char BYTE; // 8-bit byte +typedef unsigned int WORD; // 32-bit word, change to "long" for 16-bit machines + +typedef struct { + BYTE data[64]; + WORD datalen; + unsigned long long bitlen; + WORD state[8]; +} SHA256_CTX; + +#ifdef __cplusplus +extern "C" { +#endif + +/*********************** FUNCTION DECLARATIONS **********************/ +void sha256_init(SHA256_CTX *ctx); +void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len); +void sha256_final(SHA256_CTX *ctx, BYTE hash[]); + +#ifdef __cplusplus +}; +#endif + +#endif // SHA256_H
\ No newline at end of file diff --git a/src/tailer/tailer.ape b/src/tailer/tailer.ape Binary files differnew file mode 100755 index 0000000..dd8e894 --- /dev/null +++ b/src/tailer/tailer.ape diff --git a/src/tailer/tailer.c b/src/tailer/tailer.c new file mode 100644 index 0000000..c8fbfbf --- /dev/null +++ b/src/tailer/tailer.c @@ -0,0 +1,100 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __COSMOPOLITAN__ +#include <assert.h> +#include <string.h> +#include <unistd.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdint.h> +#endif + +#include "sha-256.h" +#include "tailer.h" + +ssize_t send_packet(int fd, + tailer_packet_type_t tpt, + tailer_packet_payload_type_t payload_type, + ...) +{ + va_list args; + int done = 0; + + va_start(args, payload_type); + write(fd, &tpt, sizeof(tpt)); + do { + write(fd, &payload_type, sizeof(payload_type)); + switch (payload_type) { + case TPPT_STRING: { + char *str = va_arg(args, char *); + uint32_t length = strlen(str); + + write(fd, &length, sizeof(length)); + write(fd, str, length); + break; + } + case TPPT_HASH: { + const char *hash = va_arg(args, const char *); + + write(fd, hash, SHA256_BLOCK_SIZE); + break; + } + case TPPT_INT64: { + int64_t i = va_arg(args, int64_t); + + write(fd, &i, sizeof(i)); + break; + } + case TPPT_BITS: { + int32_t length = va_arg(args, int32_t); + const char *bits = va_arg(args, const char *); + + write(fd, &length, sizeof(length)); + write(fd, bits, length); + break; + } + case TPPT_DONE: { + done = 1; + break; + } + default: { + assert(0); + break; + } + } + + if (!done) { + payload_type = va_arg(args, tailer_packet_payload_type_t); + } + } while (!done); + va_end(args); + + return 0; +} diff --git a/src/tailer/tailer.h b/src/tailer/tailer.h new file mode 100644 index 0000000..b864137 --- /dev/null +++ b/src/tailer/tailer.h @@ -0,0 +1,77 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef lnav_tailer_h +#define lnav_tailer_h + +#ifndef __COSMOPOLITAN__ +#include <sys/types.h> +#endif + +typedef enum { + TPPT_DONE, + TPPT_STRING, + TPPT_HASH, + TPPT_INT64, + TPPT_BITS, +} tailer_packet_payload_type_t; + +typedef enum { + TPT_ERROR, + TPT_OPEN_PATH, + TPT_CLOSE_PATH, + TPT_OFFER_BLOCK, + TPT_NEED_BLOCK, + TPT_ACK_BLOCK, + TPT_TAIL_BLOCK, + TPT_LINK_BLOCK, + TPT_SYNCED, + TPT_LOG, + TPT_LOAD_PREVIEW, + TPT_PREVIEW_ERROR, + TPT_PREVIEW_DATA, + TPT_COMPLETE_PATH, + TPT_POSSIBLE_PATH, + TPT_ANNOUNCE, +} tailer_packet_type_t; + +#ifdef __cplusplus +extern "C" { +#endif + +ssize_t send_packet(int fd, + tailer_packet_type_t tpt, + tailer_packet_payload_type_t payload_type, + ...); + +#ifdef __cplusplus +}; +#endif + +#endif diff --git a/src/tailer/tailer.looper.cc b/src/tailer/tailer.looper.cc new file mode 100644 index 0000000..82a9fdc --- /dev/null +++ b/src/tailer/tailer.looper.cc @@ -0,0 +1,1192 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <regex> + +#include "tailer.looper.hh" + +#include "base/fs_util.hh" +#include "base/humanize.network.hh" +#include "base/lnav_log.hh" +#include "base/paths.hh" +#include "config.h" +#include "line_buffer.hh" +#include "lnav.hh" +#include "lnav.indexing.hh" +#include "service_tags.hh" +#include "tailer.h" +#include "tailer.looper.cfg.hh" +#include "tailerbin.h" +#include "tailerpp.hh" + +using namespace std::chrono_literals; + +static const auto HOST_RETRY_DELAY = 1min; + +static void +read_err_pipe(const std::string& netloc, + auto_fd& err, + std::vector<std::string>& eq) +{ + line_buffer lb; + file_range pipe_range; + bool done = false; + + log_info("stderr reader started..."); + lb.set_fd(err); + while (!done) { + auto load_res = lb.load_next_line(pipe_range); + + if (load_res.isErr()) { + done = true; + } else { + auto li = load_res.unwrap(); + + pipe_range = li.li_file_range; + if (li.li_file_range.empty()) { + done = true; + } else { + lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) { + auto line_str + = string_fragment(sbr.get_data(), 0, sbr.length()) + .trim("\n"); + if (eq.size() < 10) { + eq.template emplace_back(line_str.to_string()); + } + + auto level = line_str.startswith("error:") + ? lnav_log_level_t::ERROR + : line_str.startswith("warning:") + ? lnav_log_level_t::WARNING + : line_str.startswith("info:") + ? lnav_log_level_t::INFO + : lnav_log_level_t::DEBUG; + log_msg_wrapper(level, + "tailer[%s] %.*s", + netloc.c_str(), + line_str.length(), + line_str.data()); + }); + } + } + } +} + +static void +update_tailer_progress(const std::string& netloc, const std::string& msg) +{ + lnav_data.ld_active_files.fc_progress->writeAccess() + ->sp_tailers[netloc] + .tp_message + = msg; +} + +static void +update_tailer_description( + const std::string& netloc, + const std::map<std::string, logfile_open_options_base>& desired_paths, + const std::string& remote_uname) +{ + std::vector<std::string> paths; + + for (const auto& des_pair : desired_paths) { + paths.emplace_back( + fmt::format(FMT_STRING("{}{}"), netloc, des_pair.first)); + } + isc::to<main_looper&, services::main_t>().send( + [netloc, paths, remote_uname](auto& mlooper) { + auto& fc = lnav_data.ld_active_files; + + for (const auto& path : paths) { + auto iter = fc.fc_other_files.find(path); + + if (iter == fc.fc_other_files.end()) { + continue; + } + + iter->second.ofd_description = remote_uname; + } + fc.fc_name_to_errors.erase(netloc); + }); +} + +void +tailer::looper::loop_body() +{ + auto now = std::chrono::steady_clock::now(); + std::vector<std::string> to_erase; + + for (auto& qpair : this->l_netlocs_to_paths) { + auto& netloc = qpair.first; + auto& rpq = qpair.second; + + if (now < rpq.rpq_next_attempt_time) { + continue; + } + if (this->l_remotes.count(netloc) == 0) { + auto create_res = host_tailer::for_host(netloc); + + if (create_res.isErr()) { + report_error(netloc, create_res.unwrapErr()); + if (std::any_of( + rpq.rpq_new_paths.begin(), + rpq.rpq_new_paths.end(), + [](const auto& pair) { return !pair.second.loo_tail; })) + { + rpq.send_synced_to_main(netloc); + to_erase.push_back(netloc); + } else { + rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY; + } + continue; + } + + auto ht = create_res.unwrap(); + this->l_remotes[netloc] = ht; + this->s_children.add_child_service(ht); + + rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(), + rpq.rpq_existing_paths.end()); + rpq.rpq_existing_paths.clear(); + } + + if (!rpq.rpq_new_paths.empty()) { + log_debug("%s: new paths to monitor -- %s", + netloc.c_str(), + rpq.rpq_new_paths.begin()->first.c_str()); + this->l_remotes[netloc]->send( + [paths = rpq.rpq_new_paths](auto& ht) { + for (const auto& pair : paths) { + log_debug("adding path to tailer -- %s", + pair.first.c_str()); + ht.open_remote_path(pair.first, std::move(pair.second)); + } + }); + + rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(), + rpq.rpq_new_paths.end()); + rpq.rpq_new_paths.clear(); + } + } + + for (const auto& netloc : to_erase) { + this->l_netlocs_to_paths.erase(netloc); + } +} + +void +tailer::looper::add_remote(const network::path& path, + logfile_open_options_base options) +{ + auto netloc_str = fmt::to_string(path.home()); + this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path] + = std::move(options); +} + +void +tailer::looper::load_preview(int64_t id, const network::path& path) +{ + auto netloc_str = fmt::to_string(path.home()); + auto iter = this->l_remotes.find(netloc_str); + + if (iter == this->l_remotes.end()) { + auto create_res = host_tailer::for_host(netloc_str); + + if (create_res.isErr()) { + auto msg = create_res.unwrapErr(); + isc::to<main_looper&, services::main_t>().send( + [id, msg](auto& mlooper) { + if (lnav_data.ld_preview_generation != id) { + return; + } + lnav_data.ld_preview_status_source.get_description() + .set_cylon(false) + .clear(); + lnav_data.ld_preview_source.clear(); + lnav_data.ld_bottom_source.grep_error(msg); + }); + return; + } + + auto ht = create_res.unwrap(); + this->l_remotes[netloc_str] = ht; + this->s_children.add_child_service(ht); + } + + this->l_remotes[netloc_str]->send([id, file_path = path.p_path](auto& ht) { + ht.load_preview(id, file_path); + }); +} + +void +tailer::looper::complete_path(const network::path& path) +{ + auto netloc_str = fmt::to_string(path.home()); + auto iter = this->l_remotes.find(netloc_str); + + if (iter == this->l_remotes.end()) { + auto create_res = host_tailer::for_host(netloc_str); + + if (create_res.isErr()) { + return; + } + + auto ht = create_res.unwrap(); + this->l_remotes[netloc_str] = ht; + this->s_children.add_child_service(ht); + } + + this->l_remotes[netloc_str]->send( + [file_path = path.p_path](auto& ht) { ht.complete_path(file_path); }); +} + +static std::vector<std::string> +create_ssh_args_from_config(const std::string& dest) +{ + const auto& cfg = injector::get<const tailer::config&>(); + std::vector<std::string> retval; + + retval.emplace_back(cfg.c_ssh_cmd); + if (!cfg.c_ssh_flags.empty()) { + if (startswith(cfg.c_ssh_flags, "-")) { + retval.emplace_back(cfg.c_ssh_flags); + } else { + retval.emplace_back( + fmt::format(FMT_STRING("-{}"), cfg.c_ssh_flags)); + } + } + for (const auto& pair : cfg.c_ssh_options) { + if (pair.second.empty()) { + continue; + } + retval.emplace_back(fmt::format(FMT_STRING("-{}"), pair.first)); + retval.emplace_back(pair.second); + } + for (const auto& pair : cfg.c_ssh_config) { + if (pair.second.empty()) { + continue; + } + retval.emplace_back( + fmt::format(FMT_STRING("-o{}={}"), pair.first, pair.second)); + } + retval.emplace_back(dest); + + return retval; +} + +Result<std::shared_ptr<tailer::looper::host_tailer>, std::string> +tailer::looper::host_tailer::for_host(const std::string& netloc) +{ + log_debug("tailer(%s): transferring tailer to remote", netloc.c_str()); + + update_tailer_progress(netloc, "Transferring tailer..."); + + auto& cfg = injector::get<const tailer::config&>(); + auto tailer_bin_name = fmt::format(FMT_STRING("tailer.bin.{}"), getpid()); + + auto rp = humanize::network::path::from_str(netloc).value(); + auto ssh_dest = rp.p_locality.l_hostname; + if (rp.p_locality.l_username.has_value()) { + ssh_dest = fmt::format(FMT_STRING("{}@{}"), + rp.p_locality.l_username.value(), + rp.p_locality.l_hostname); + } + + { + auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO)); + auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO)); + auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO)); + auto child = TRY(lnav::pid::from_fork()); + + in_pipe.after_fork(child.in()); + out_pipe.after_fork(child.in()); + err_pipe.after_fork(child.in()); + + if (child.in_child()) { + auto arg_strs = create_ssh_args_from_config(ssh_dest); + std::vector<char*> args; + + arg_strs.emplace_back( + fmt::format(cfg.c_transfer_cmd, tailer_bin_name)); + + fmt::print(stderr, + "tailer({}): executing -- {}\n", + netloc, + fmt::join(arg_strs, " ")); + for (const auto& arg : arg_strs) { + args.push_back((char*) arg.data()); + } + args.push_back(nullptr); + + execvp(cfg.c_ssh_cmd.c_str(), args.data()); + _exit(EXIT_FAILURE); + } + + std::vector<std::string> error_queue; + log_debug("tailer(%s): starting err reader", netloc.c_str()); + std::thread err_reader([netloc, + err = std::move(err_pipe.read_end()), + &error_queue]() mutable { + log_set_thread_prefix( + fmt::format(FMT_STRING("tailer({})"), netloc)); + read_err_pipe(netloc, err, error_queue); + }); + + log_debug("tailer(%s): writing to child", netloc.c_str()); + auto sf = tailer_bin[0].to_string_fragment(); + ssize_t total_bytes = 0; + bool write_failed = false; + + while (total_bytes < sf.length()) { + log_debug("attempting to write %d", sf.length() - total_bytes); + auto rc = write( + in_pipe.write_end(), sf.data(), sf.length() - total_bytes); + + if (rc < 0) { + log_error(" tailer(%s): write failed -- %s", + netloc.c_str(), + strerror(errno)); + write_failed = true; + break; + } + log_debug(" wrote %d", rc); + total_bytes += rc; + } + + in_pipe.write_end().reset(); + + while (!write_failed) { + char buffer[1024]; + + auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer)); + if (rc < 0) { + break; + } + if (rc == 0) { + break; + } + log_debug("tailer(%s): transfer output -- %.*s", + netloc.c_str(), + rc, + buffer); + } + + auto finished_child = std::move(child).wait_for_child(); + + err_reader.join(); + if (!finished_child.was_normal_exit() + || finished_child.exit_status() != EXIT_SUCCESS) + { + auto error_msg = error_queue.empty() ? "unknown" + : error_queue.back(); + return Err(fmt::format(FMT_STRING("failed to ssh to host: {}"), + error_msg)); + } + } + + update_tailer_progress(netloc, "Starting tailer..."); + + auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO)); + auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO)); + auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO)); + auto child = TRY(lnav::pid::from_fork()); + + in_pipe.after_fork(child.in()); + out_pipe.after_fork(child.in()); + err_pipe.after_fork(child.in()); + + if (child.in_child()) { + auto arg_strs = create_ssh_args_from_config(ssh_dest); + std::vector<char*> args; + + arg_strs.emplace_back(fmt::format(cfg.c_start_cmd, tailer_bin_name)); + + fmt::print(stderr, + FMT_STRING("tailer({}): executing -- {}\n"), + netloc, + fmt::join(arg_strs, " ")); + for (const auto& arg : arg_strs) { + args.push_back((char*) arg.data()); + } + args.push_back(nullptr); + + execvp(cfg.c_ssh_cmd.c_str(), args.data()); + _exit(EXIT_FAILURE); + } + + return Ok(std::make_shared<host_tailer>(netloc, + std::move(child), + std::move(in_pipe.write_end()), + std::move(out_pipe.read_end()), + std::move(err_pipe.read_end()))); +} + +static ghc::filesystem::path +remote_cache_path() +{ + return lnav::paths::workdir() / "remotes"; +} + +ghc::filesystem::path +tailer::looper::host_tailer::tmp_path() +{ + auto local_path = remote_cache_path(); + + ghc::filesystem::create_directories(local_path); + auto_mem<char> resolved_path; + + resolved_path = realpath(local_path.c_str(), nullptr); + if (resolved_path.in() == nullptr) { + return local_path; + } + + return resolved_path.in(); +} + +static std::string +scrub_netloc(const std::string& netloc) +{ + const static std::regex TO_SCRUB(R"([^\w\.\@])"); + + return std::regex_replace(netloc, TO_SCRUB, "_"); +} + +tailer::looper::host_tailer::host_tailer(const std::string& netloc, + auto_pid<process_state::running> child, + auto_fd to_child, + auto_fd from_child, + auto_fd err_from_child) + : isc::service<host_tailer>(netloc), ht_netloc(netloc), + ht_local_path(tmp_path() / scrub_netloc(netloc)), + ht_error_reader([netloc, + err = std::move(err_from_child), + &eq = this->ht_error_queue]() mutable { + read_err_pipe(netloc, err, eq); + }), + ht_state(connected{ + std::move(child), std::move(to_child), std::move(from_child), {}}) +{ +} + +void +tailer::looper::host_tailer::open_remote_path(const std::string& path, + logfile_open_options_base loo) +{ + this->ht_state.match( + [&](connected& conn) { + conn.c_desired_paths[path] = std::move(loo); + send_packet(conn.ht_to_child.get(), + TPT_OPEN_PATH, + TPPT_STRING, + path.c_str(), + TPPT_DONE); + }, + [&](const disconnected& d) { + log_warning("disconnected from host, cannot tail: %s", + path.c_str()); + }, + [&](const synced& s) { + log_warning("synced with host, not tailing: %s", path.c_str()); + }); +} + +void +tailer::looper::host_tailer::load_preview(int64_t id, const std::string& path) +{ + this->ht_state.match( + [&](connected& conn) { + send_packet(conn.ht_to_child.get(), + TPT_LOAD_PREVIEW, + TPPT_STRING, + path.c_str(), + TPPT_INT64, + id, + TPPT_DONE); + }, + [&](const disconnected& d) { + log_warning("disconnected from host, cannot preview: %s", + path.c_str()); + + auto msg = fmt::format(FMT_STRING("error: disconnected from {}"), + this->ht_netloc); + isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) { + if (lnav_data.ld_preview_generation != id) { + return; + } + lnav_data.ld_preview_status_source.get_description() + .set_cylon(false) + .set_value(msg); + }); + }, + [&](const synced& s) { require(false); }); +} + +void +tailer::looper::host_tailer::complete_path(const std::string& path) +{ + this->ht_state.match( + [&](connected& conn) { + send_packet(conn.ht_to_child.get(), + TPT_COMPLETE_PATH, + TPPT_STRING, + path.c_str(), + TPPT_DONE); + }, + [&](const disconnected& d) { + log_warning("disconnected from host, cannot preview: %s", + path.c_str()); + }, + [&](const synced& s) { require(false); }); +} + +void +tailer::looper::host_tailer::loop_body() +{ + const static uint64_t TOUCH_FREQ = 10000; + + if (!this->ht_state.is<connected>()) { + return; + } + + this->ht_cycle_count += 1; + if (this->ht_cycle_count % TOUCH_FREQ == 0) { + auto now + = ghc::filesystem::file_time_type{std::chrono::system_clock::now()}; + ghc::filesystem::last_write_time(this->ht_local_path, now); + } + + auto& conn = this->ht_state.get<connected>(); + + pollfd pfds[1]; + + pfds[0].fd = conn.ht_from_child.get(); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + + auto ready_count = poll(pfds, 1, 100); + if (ready_count > 0) { + auto read_res = tailer::read_packet(conn.ht_from_child); + + if (read_res.isErr()) { + log_error("read error: %s", read_res.unwrapErr().c_str()); + return; + } + + auto packet = read_res.unwrap(); + this->ht_state = packet.match( + [&](const tailer::packet_eof& te) { + log_debug("all done!"); + + auto finished_child = std::move(conn).close(); + if (finished_child.exit_status() != 0 + && !this->ht_error_queue.empty()) + { + report_error(this->ht_netloc, this->ht_error_queue.back()); + } + + return state_v{disconnected()}; + }, + [&](const tailer::packet_announce& pa) { + update_tailer_description( + this->ht_netloc, conn.c_desired_paths, pa.pa_uname); + this->ht_uname = pa.pa_uname; + return std::move(this->ht_state); + }, + [&](const tailer::packet_log& pl) { + log_debug("%s\n", pl.pl_msg.c_str()); + return std::move(this->ht_state); + }, + [&](const tailer::packet_error& pe) { + log_debug("Got an error: %s -- %s", + pe.pe_path.c_str(), + pe.pe_msg.c_str()); + + lnav_data.ld_active_files.fc_progress->writeAccess() + ->sp_tailers.erase(this->ht_netloc); + + auto desired_iter = conn.c_desired_paths.find(pe.pe_path); + if (desired_iter != conn.c_desired_paths.end()) { + report_error(this->get_display_path(pe.pe_path), pe.pe_msg); + if (!desired_iter->second.loo_tail) { + conn.c_desired_paths.erase(desired_iter); + } + } else { + auto child_iter = conn.c_child_paths.find(pe.pe_path); + + if (child_iter != conn.c_child_paths.end() + && !child_iter->second.loo_tail) + { + conn.c_child_paths.erase(child_iter); + } + } + + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(pe.pe_path)) + .relative_path(); + auto local_path = this->ht_local_path / remote_path; + + log_debug("removing %s", local_path.c_str()); + this->ht_active_files.erase(local_path); + ghc::filesystem::remove_all(local_path); + + if (conn.c_desired_paths.empty() && conn.c_child_paths.empty()) + { + log_info("tailer(%s): all desired paths synced", + this->ht_netloc.c_str()); + return state_v{synced{}}; + } + + return std::move(this->ht_state); + }, + [&](const tailer::packet_offer_block& pob) { + log_debug("Got an offer: %s %lld - %lld", + pob.pob_path.c_str(), + pob.pob_offset, + pob.pob_length); + + logfile_open_options_base loo; + if (pob.pob_path == pob.pob_root_path) { + auto root_iter = conn.c_desired_paths.find(pob.pob_path); + + if (root_iter == conn.c_desired_paths.end()) { + log_warning("ignoring unknown root: %s", + pob.pob_root_path.c_str()); + return std::move(this->ht_state); + } + + loo = root_iter->second; + } else { + auto child_iter = conn.c_child_paths.find(pob.pob_path); + if (child_iter == conn.c_child_paths.end()) { + auto root_iter + = conn.c_desired_paths.find(pob.pob_root_path); + + if (root_iter == conn.c_desired_paths.end()) { + log_warning("ignoring child of unknown root: %s", + pob.pob_root_path.c_str()); + return std::move(this->ht_state); + } + + conn.c_child_paths[pob.pob_path] + = std::move(root_iter->second); + child_iter = conn.c_child_paths.find(pob.pob_path); + } + + loo = std::move(child_iter->second); + } + + update_tailer_description( + this->ht_netloc, conn.c_desired_paths, this->ht_uname); + + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(pob.pob_path)) + .relative_path(); + auto local_path = this->ht_local_path / remote_path; + auto open_res + = lnav::filesystem::open_file(local_path, O_RDONLY); + + if (this->ht_active_files.count(local_path) == 0) { + this->ht_active_files.insert(local_path); + + auto custom_name = this->get_display_path(pob.pob_path); + isc::to<main_looper&, services::main_t>().send( + [local_path, + custom_name, + loo, + netloc = this->ht_netloc](auto& mlooper) { + auto& active_fc = lnav_data.ld_active_files; + auto lpath_str = local_path.string(); + + { + safe::WriteAccess<safe_scan_progress> sp( + *active_fc.fc_progress); + + sp->sp_tailers.erase(netloc); + } + if (active_fc.fc_file_names.count(lpath_str) > 0) { + log_debug("already in fc_file_names"); + return; + } + if (active_fc.fc_closed_files.count(custom_name) + > 0) { + log_debug("in closed"); + return; + } + + file_collection fc; + + fc.fc_file_names[lpath_str] + .with_filename(custom_name) + .with_source(logfile_name_source::REMOTE) + .with_tail(loo.loo_tail) + .with_non_utf_visibility(false) + .with_visible_size_limit(256 * 1024); + update_active_files(fc); + }); + } + + if (open_res.isErr()) { + log_debug("file not found (%s), sending need block", + open_res.unwrapErr().c_str()); + send_packet(conn.ht_to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, + pob.pob_path.c_str(), + TPPT_DONE); + return std::move(this->ht_state); + } + + auto fd = open_res.unwrap(); + struct stat st; + + if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) { + log_debug("path changed, sending need block"); + ghc::filesystem::remove_all(local_path); + send_packet(conn.ht_to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, + pob.pob_path.c_str(), + TPPT_DONE); + return std::move(this->ht_state); + } + + if (st.st_size == pob.pob_offset) { + log_debug("local file is synced, sending need block"); + send_packet(conn.ht_to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, + pob.pob_path.c_str(), + TPPT_DONE); + return std::move(this->ht_state); + } + + constexpr int64_t BUFFER_SIZE = 4 * 1024 * 1024; + auto_mem<unsigned char> buffer; + + buffer = (unsigned char*) malloc(BUFFER_SIZE); + auto remaining = pob.pob_length; + auto remaining_offset = pob.pob_offset; + tailer::hash_frag thf; + SHA256_CTX shactx; + sha256_init(&shactx); + + log_debug("checking offer %s[%lld..+%lld]", + local_path.c_str(), + remaining_offset, + remaining); + while (remaining > 0) { + auto nbytes = std::min(remaining, BUFFER_SIZE); + auto bytes_read + = pread(fd, buffer, nbytes, remaining_offset); + if (bytes_read == -1) { + log_debug( + "unable to read file, sending need block -- %s", + strerror(errno)); + ghc::filesystem::remove_all(local_path); + break; + } + if (bytes_read == 0) { + break; + } + sha256_update(&shactx, buffer.in(), bytes_read); + remaining -= bytes_read; + remaining_offset += bytes_read; + } + + if (remaining == 0) { + sha256_final(&shactx, thf.thf_hash); + + if (thf == pob.pob_hash) { + log_debug("local file block is same, sending ack"); + send_packet(conn.ht_to_child.get(), + TPT_ACK_BLOCK, + TPPT_STRING, + pob.pob_path.c_str(), + TPPT_INT64, + pob.pob_offset, + TPPT_INT64, + pob.pob_length, + TPPT_INT64, + (int64_t) st.st_size, + TPPT_DONE); + return std::move(this->ht_state); + } + log_debug("local file is different, sending need block"); + } + send_packet(conn.ht_to_child.get(), + TPT_NEED_BLOCK, + TPPT_STRING, + pob.pob_path.c_str(), + TPPT_DONE); + return std::move(this->ht_state); + }, + [&](const tailer::packet_tail_block& ptb) { + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(ptb.ptb_path)) + .relative_path(); + auto local_path = this->ht_local_path / remote_path; + + log_debug("writing tail to: %lld/%ld %s", + ptb.ptb_offset, + ptb.ptb_bits.size(), + local_path.c_str()); + ghc::filesystem::create_directories(local_path.parent_path()); + auto create_res = lnav::filesystem::create_file( + local_path, O_WRONLY | O_APPEND | O_CREAT, 0600); + + if (create_res.isErr()) { + log_error("open: %s", create_res.unwrapErr().c_str()); + } else { + auto fd = create_res.unwrap(); + ftruncate(fd, ptb.ptb_offset); + pwrite(fd, + ptb.ptb_bits.data(), + ptb.ptb_bits.size(), + ptb.ptb_offset); + auto mtime = ghc::filesystem::file_time_type{ + std::chrono::seconds{ptb.ptb_mtime}}; + // XXX This isn't atomic with the write... + ghc::filesystem::last_write_time(local_path, mtime); + } + return std::move(this->ht_state); + }, + [&](const tailer::packet_synced& ps) { + if (ps.ps_root_path == ps.ps_path) { + auto iter = conn.c_desired_paths.find(ps.ps_path); + + if (iter != conn.c_desired_paths.end()) { + if (iter->second.loo_tail) { + conn.c_synced_desired_paths.insert(ps.ps_path); + } else { + log_info("synced desired path: %s", + iter->first.c_str()); + conn.c_desired_paths.erase(iter); + } + } + } else { + auto iter = conn.c_child_paths.find(ps.ps_path); + + if (iter != conn.c_child_paths.end()) { + if (iter->second.loo_tail) { + conn.c_synced_child_paths.insert(ps.ps_path); + } else { + log_info("synced child path: %s", + iter->first.c_str()); + conn.c_child_paths.erase(iter); + } + } + } + + if (conn.c_desired_paths.empty() && conn.c_child_paths.empty()) + { + log_info("tailer(%s): all desired paths synced", + this->ht_netloc.c_str()); + return state_v{synced{}}; + } else if (!conn.c_initial_sync_done + && conn.c_desired_paths.size() + == conn.c_synced_desired_paths.size() + && conn.c_child_paths.size() + == conn.c_synced_child_paths.size()) + { + log_info("tailer(%s): all desired paths synced", + this->ht_netloc.c_str()); + conn.c_initial_sync_done = true; + + std::set<std::string> synced_files; + for (const auto& desired_pair : conn.c_desired_paths) { + synced_files.emplace(fmt::format( + FMT_STRING("{}{}"), ht_netloc, desired_pair.first)); + } + isc::to<main_looper&, services::main_t>().send( + [file_set = std::move(synced_files)](auto& mlooper) { + file_collection fc; + + fc.fc_synced_files = file_set; + update_active_files(fc); + }); + } + + return std::move(this->ht_state); + }, + [&](const tailer::packet_link& pl) { + auto remote_path = ghc::filesystem::absolute( + ghc::filesystem::path(pl.pl_path)) + .relative_path(); + auto local_path = this->ht_local_path / remote_path; + auto remote_link_path = ghc::filesystem::path(pl.pl_link_value); + std::string link_path; + + if (remote_link_path.is_absolute()) { + auto local_link_path = this->ht_local_path + / remote_link_path.relative_path(); + + link_path = local_link_path.string(); + } else { + link_path = remote_link_path.string(); + } + + log_debug("symlinking %s -> %s", + local_path.c_str(), + link_path.c_str()); + ghc::filesystem::create_directories(local_path.parent_path()); + ghc::filesystem::remove_all(local_path); + if (symlink(link_path.c_str(), local_path.c_str()) < 0) { + log_error("symlink failed: %s", strerror(errno)); + } + + if (pl.pl_root_path == pl.pl_path) { + auto iter = conn.c_desired_paths.find(pl.pl_path); + + if (iter != conn.c_desired_paths.end()) { + if (iter->second.loo_tail) { + conn.c_synced_desired_paths.insert(pl.pl_path); + } else { + log_info("synced desired path: %s", + iter->first.c_str()); + conn.c_desired_paths.erase(iter); + } + } + } else { + auto iter = conn.c_child_paths.find(pl.pl_path); + + if (iter != conn.c_child_paths.end()) { + if (iter->second.loo_tail) { + conn.c_synced_child_paths.insert(pl.pl_path); + } else { + log_info("synced child path: %s", + iter->first.c_str()); + conn.c_child_paths.erase(iter); + } + } + } + + return std::move(this->ht_state); + }, + [&](const tailer::packet_preview_error& ppe) { + isc::to<main_looper&, services::main_t>().send( + [ppe](auto& mlooper) { + if (lnav_data.ld_preview_generation != ppe.ppe_id) { + log_debug("preview ID mismatch: %lld != %lld", + lnav_data.ld_preview_generation, + ppe.ppe_id); + return; + } + lnav_data.ld_preview_status_source.get_description() + .set_cylon(false) + .clear(); + lnav_data.ld_preview_source.clear(); + lnav_data.ld_bottom_source.grep_error(ppe.ppe_msg); + }); + + return std::move(this->ht_state); + }, + [&](const tailer::packet_preview_data& ppd) { + isc::to<main_looper&, services::main_t>().send( + [netloc = this->ht_netloc, ppd](auto& mlooper) { + if (lnav_data.ld_preview_generation != ppd.ppd_id) { + log_debug("preview ID mismatch: %lld != %lld", + lnav_data.ld_preview_generation, + ppd.ppd_id); + return; + } + std::string str(ppd.ppd_bits.begin(), + ppd.ppd_bits.end()); + lnav_data.ld_preview_status_source.get_description() + .set_cylon(false) + .set_value("For file: %s:%s", + netloc.c_str(), + ppd.ppd_path.c_str()); + lnav_data.ld_preview_source.replace_with(str) + .set_text_format(detect_text_format(str)); + }); + return std::move(this->ht_state); + }, + [&](const tailer::packet_possible_path& ppp) { + log_debug("possible path: %s", ppp.ppp_path.c_str()); + auto full_path = fmt::format( + FMT_STRING("{}{}"), this->ht_netloc, ppp.ppp_path); + + isc::to<main_looper&, services::main_t>().send( + [full_path](auto& mlooper) { + lnav_data.ld_rl_view->add_possibility( + ln_mode_t::COMMAND, "remote-path", full_path); + }); + return std::move(this->ht_state); + }); + + if (!this->ht_state.is<connected>()) { + this->s_looping = false; + } + } +} + +std::chrono::milliseconds +tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const +{ + return 0s; +} + +void +tailer::looper::host_tailer::stopped() +{ + if (this->ht_state.is<connected>()) { + this->ht_state = disconnected(); + } + if (this->ht_error_reader.joinable()) { + this->ht_error_reader.join(); + } +} + +std::string +tailer::looper::host_tailer::get_display_path( + const std::string& remote_path) const +{ + return fmt::format(FMT_STRING("{}{}"), this->ht_netloc, remote_path); +} + +void* +tailer::looper::host_tailer::run() +{ + log_set_thread_prefix( + fmt::format(FMT_STRING("tailer({})"), this->ht_netloc)); + + return service_base::run(); +} + +auto_pid<process_state::finished> +tailer::looper::host_tailer::connected::close() && +{ + this->ht_to_child.reset(); + this->ht_from_child.reset(); + + return std::move(this->ht_child).wait_for_child(); +} + +void +tailer::looper::child_finished(std::shared_ptr<service_base> child) +{ + auto child_tailer = std::static_pointer_cast<host_tailer>(child); + + for (auto iter = this->l_remotes.begin(); iter != this->l_remotes.end(); + ++iter) + { + if (iter->second != child_tailer) { + continue; + } + + if (child_tailer->is_synced()) { + log_info("synced with netloc '%s', removing", iter->first.c_str()); + auto netloc_iter = this->l_netlocs_to_paths.find(iter->first); + + if (netloc_iter != this->l_netlocs_to_paths.end()) { + netloc_iter->second.send_synced_to_main(netloc_iter->first); + this->l_netlocs_to_paths.erase(netloc_iter); + } + } + lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase( + iter->first); + this->l_remotes.erase(iter); + return; + } +} + +void +tailer::looper::remote_path_queue::send_synced_to_main( + const std::string& netloc) +{ + std::set<std::string> synced_files; + + for (const auto& pair : this->rpq_new_paths) { + if (!pair.second.loo_tail) { + synced_files.emplace( + fmt::format(FMT_STRING("{}{}"), netloc, pair.first)); + } + } + for (const auto& pair : this->rpq_existing_paths) { + if (!pair.second.loo_tail) { + synced_files.emplace( + fmt::format(FMT_STRING("{}{}"), netloc, pair.first)); + } + } + + isc::to<main_looper&, services::main_t>().send( + [file_set = std::move(synced_files)](auto& mlooper) { + file_collection fc; + + fc.fc_synced_files = file_set; + update_active_files(fc); + }); +} + +void +tailer::looper::report_error(std::string path, std::string msg) +{ + log_error("reporting error: %s -- %s", path.c_str(), msg.c_str()); + isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) { + file_collection fc; + + fc.fc_name_to_errors.emplace(path, + file_error_info{ + {}, + msg, + }); + update_active_files(fc); + lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase( + path); + }); +} + +void +tailer::cleanup_cache() +{ + (void) std::async(std::launch::async, []() { + auto now = std::chrono::system_clock::now(); + auto cache_path = remote_cache_path(); + const auto& cfg = injector::get<const config&>(); + std::vector<ghc::filesystem::path> to_remove; + + log_debug("cache-ttl %d", cfg.c_cache_ttl.count()); + for (const auto& entry : + ghc::filesystem::directory_iterator(cache_path)) + { + auto mtime = ghc::filesystem::last_write_time(entry.path()); + auto exp_time = mtime + cfg.c_cache_ttl; + if (now < exp_time) { + continue; + } + + to_remove.emplace_back(entry.path()); + } + + for (auto& entry : to_remove) { + log_debug("removing cached remote: %s", entry.c_str()); + ghc::filesystem::remove_all(entry); + } + }); +} diff --git a/src/tailer/tailer.looper.cfg.hh b/src/tailer/tailer.looper.cfg.hh new file mode 100644 index 0000000..65b5ff6 --- /dev/null +++ b/src/tailer/tailer.looper.cfg.hh @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef lnav_tailer_looper_cfg_hh +#define lnav_tailer_looper_cfg_hh + +#include <chrono> + +namespace tailer { + +struct config { + int64_t c_min_free_space{32 * 1024 * 1024}; + std::chrono::seconds c_cache_ttl{std::chrono::hours(48)}; + std::string c_transfer_cmd{"cat > {0:} && chmod ugo+rx ./{0:}"}; + std::string c_start_cmd{"bash -c ./{0:}"}; + std::string c_ssh_cmd{"ssh"}; + std::string c_ssh_flags{}; + std::map<std::string, std::string> c_ssh_options{}; + std::map<std::string, std::string> c_ssh_config{ + {"BatchMode", "yes"}, + {"ConnectTimeout", "10"}, + }; +}; + +} // namespace tailer + +#endif diff --git a/src/tailer/tailer.looper.hh b/src/tailer/tailer.looper.hh new file mode 100644 index 0000000..1bf84d9 --- /dev/null +++ b/src/tailer/tailer.looper.hh @@ -0,0 +1,158 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef lnav_tailer_looper_hh +#define lnav_tailer_looper_hh + +#include <set> + +#include <logfile_fwd.hh> + +#include "base/auto_fd.hh" +#include "base/auto_pid.hh" +#include "base/isc.hh" +#include "base/network.tcp.hh" +#include "ghc/filesystem.hpp" +#include "mapbox/variant.hpp" + +namespace tailer { + +class looper : public isc::service<looper> { +public: + void add_remote(const network::path& path, + logfile_open_options_base options); + + void load_preview(int64_t id, const network::path& path); + + void complete_path(const network::path& path); + + bool empty() const { return this->l_netlocs_to_paths.empty(); } + + std::set<std::string> active_netlocs() const + { + std::set<std::string> retval; + + for (const auto& pair : this->l_remotes) { + retval.insert(pair.first); + } + return retval; + } + +protected: + void loop_body() override; + + void child_finished(std::shared_ptr<service_base> child) override; + +private: + class host_tailer : public isc::service<host_tailer> { + public: + static Result<std::shared_ptr<host_tailer>, std::string> for_host( + const std::string& netloc); + + host_tailer(const std::string& netloc, + auto_pid<process_state::running> child, + auto_fd to_child, + auto_fd from_child, + auto_fd err_from_child); + + void open_remote_path(const std::string& path, + logfile_open_options_base loo); + + void load_preview(int64_t id, const std::string& path); + + void complete_path(const std::string& path); + + bool is_synced() const { return this->ht_state.is<synced>(); } + + protected: + void* run() override; + + void loop_body() override; + + void stopped() override; + + std::chrono::milliseconds compute_timeout( + mstime_t current_time) const override; + + private: + static ghc::filesystem::path tmp_path(); + + std::string get_display_path(const std::string& remote_path) const; + + struct connected { + auto_pid<process_state::running> ht_child; + auto_fd ht_to_child; + auto_fd ht_from_child; + std::map<std::string, logfile_open_options_base> c_desired_paths; + std::set<std::string> c_synced_desired_paths; + std::map<std::string, logfile_open_options_base> c_child_paths; + std::set<std::string> c_synced_child_paths; + bool c_initial_sync_done{false}; + + auto_pid<process_state::finished> close() &&; + }; + + struct disconnected {}; + struct synced {}; + + using state_v = mapbox::util::variant<connected, disconnected, synced>; + + const std::string ht_netloc; + std::string ht_uname; + const ghc::filesystem::path ht_local_path; + std::set<ghc::filesystem::path> ht_active_files; + std::vector<std::string> ht_error_queue; + std::thread ht_error_reader; + state_v ht_state{disconnected()}; + uint64_t ht_cycle_count{0}; + }; + + static void report_error(std::string path, std::string msg); + + using attempt_time_point + = std::chrono::time_point<std::chrono::steady_clock>; + + struct remote_path_queue { + attempt_time_point rpq_next_attempt_time{ + std::chrono::steady_clock::now()}; + std::map<std::string, logfile_open_options_base> rpq_new_paths; + std::map<std::string, logfile_open_options_base> rpq_existing_paths; + + void send_synced_to_main(const std::string& netloc); + }; + + std::map<std::string, remote_path_queue> l_netlocs_to_paths; + std::map<std::string, std::shared_ptr<host_tailer>> l_remotes; +}; + +void cleanup_cache(); + +} // namespace tailer + +#endif diff --git a/src/tailer/tailer.main.c b/src/tailer/tailer.main.c new file mode 100644 index 0000000..445d24c --- /dev/null +++ b/src/tailer/tailer.main.c @@ -0,0 +1,1072 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __COSMOPOLITAN__ +#include <glob.h> +#include <stdio.h> +#include <assert.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <dirent.h> +#include <stdarg.h> +#include <limits.h> +#include <poll.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/utsname.h> +#include <ctype.h> +#include <stdint.h> +#endif + +#include "sha-256.h" +#include "tailer.h" + +struct node { + struct node *n_succ; + struct node *n_pred; +}; + +struct list { + struct node *l_head; + struct node *l_tail; + struct node *l_tail_pred; +}; + +int is_glob(const char *fn) +{ + return (strchr(fn, '*') != NULL || + strchr(fn, '?') != NULL || + strchr(fn, '[') != NULL); +}; + +void list_init(struct list *l) +{ + l->l_head = (struct node *) &l->l_tail; + l->l_tail = NULL; + l->l_tail_pred = (struct node *) &l->l_head; +} + +void list_move(struct list *dst, struct list *src) +{ + if (src->l_head->n_succ == NULL) { + list_init(dst); + return; + } + + dst->l_head = src->l_head; + dst->l_head->n_pred = (struct node *) &dst->l_head; + dst->l_tail = NULL; + dst->l_tail_pred = src->l_tail_pred; + dst->l_tail_pred->n_succ = (struct node *) &dst->l_tail; + + list_init(src); +} + +void list_remove(struct node *n) +{ + n->n_pred->n_succ = n->n_succ; + n->n_succ->n_pred = n->n_pred; + + n->n_succ = NULL; + n->n_pred = NULL; +} + +struct node *list_remove_head(struct list *l) +{ + struct node *retval = NULL; + + if (l->l_head->n_succ != NULL) { + retval = l->l_head; + list_remove(l->l_head); + } + + return retval; +} + +void list_append(struct list *l, struct node *n) +{ + n->n_pred = l->l_tail_pred; + n->n_succ = (struct node *) &l->l_tail; + l->l_tail_pred->n_succ = n; + l->l_tail_pred = n; +} + +typedef enum { + CS_INIT, + CS_OFFERED, + CS_TAILING, + CS_SYNCED, +} client_state_t; + +typedef enum { + PS_UNKNOWN, + PS_OK, + PS_ERROR, +} path_state_t; + +struct client_path_state { + struct node cps_node; + char *cps_path; + path_state_t cps_last_path_state; + struct stat cps_last_stat; + int64_t cps_client_file_offset; + int64_t cps_client_file_size; + client_state_t cps_client_state; + struct list cps_children; +}; + +struct client_path_state *create_client_path_state(const char *path) +{ + struct client_path_state *retval = malloc(sizeof(struct client_path_state)); + + retval->cps_path = strdup(path); + retval->cps_last_path_state = PS_UNKNOWN; + memset(&retval->cps_last_stat, 0, sizeof(retval->cps_last_stat)); + retval->cps_client_file_offset = -1; + retval->cps_client_file_size = 0; + retval->cps_client_state = CS_INIT; + list_init(&retval->cps_children); + return retval; +} + +void delete_client_path_state(struct client_path_state *cps); + +void delete_client_path_list(struct list *l) +{ + struct client_path_state *child_cps; + + while ((child_cps = (struct client_path_state *) list_remove_head(l)) != NULL) { + list_remove(&child_cps->cps_node); + delete_client_path_state(child_cps); + } +} + +void delete_client_path_state(struct client_path_state *cps) +{ + free(cps->cps_path); + delete_client_path_list(&cps->cps_children); + free(cps); +} + +void dump_client_path_states(struct list *path_list) +{ + struct client_path_state *curr = (struct client_path_state *) path_list->l_head; + + while (curr->cps_node.n_succ != NULL) { + fprintf(stderr, "debug: path %s\n", curr->cps_path); + dump_client_path_states(&curr->cps_children); + + curr = (struct client_path_state *) curr->cps_node.n_succ; + } + + curr = (struct client_path_state *) path_list->l_tail_pred; + while (curr->cps_node.n_pred != NULL) { + fprintf(stderr, "debug: back path %s\n", curr->cps_path); + dump_client_path_states(&curr->cps_children); + + curr = (struct client_path_state *) curr->cps_node.n_pred; + } +} + +void send_error(struct client_path_state *cps, char *msg, ...) +{ + char buffer[1024]; + va_list args; + + va_start(args, msg); + vsnprintf(buffer, sizeof(buffer), msg, args); + va_end(args); + + send_packet(STDOUT_FILENO, + TPT_ERROR, + TPPT_STRING, cps->cps_path, + TPPT_STRING, buffer, + TPPT_DONE); +} + +void set_client_path_state_error(struct client_path_state *cps, const char *op) +{ + if (cps->cps_last_path_state != PS_ERROR) { + // tell client of the problem + send_error(cps, "unable to %s -- %s", op, strerror(errno)); + } + cps->cps_last_path_state = PS_ERROR; + cps->cps_client_file_offset = -1; + cps->cps_client_state = CS_INIT; + delete_client_path_list(&cps->cps_children); +} + +typedef enum { + RS_ERROR, + RS_PACKET_TYPE, + RS_PAYLOAD_TYPE, + RS_PAYLOAD, + RS_PAYLOAD_LENGTH, + RS_PAYLOAD_CONTENT, +} recv_state_t; + +static recv_state_t readall(recv_state_t state, int sock, void *buf, size_t len) +{ + char *cbuf = (char *) buf; + off_t offset = 0; + + if (state == RS_ERROR) { + return RS_ERROR; + } + + while (len > 0) { + ssize_t rc = read(sock, &cbuf[offset], len); + + if (rc == -1) { + if (errno == EAGAIN || errno == EINTR) { + + } else { + return RS_ERROR; + } + } + else if (rc == 0) { + errno = EIO; + return RS_ERROR; + } + else { + len -= rc; + offset += rc; + } + } + + switch (state) { + case RS_PACKET_TYPE: + return RS_PAYLOAD_TYPE; + case RS_PAYLOAD_TYPE: + return RS_PAYLOAD; + case RS_PAYLOAD_LENGTH: + return RS_PAYLOAD_CONTENT; + case RS_PAYLOAD_CONTENT: + return RS_PAYLOAD_TYPE; + default: + return RS_ERROR; + } +} + +static tailer_packet_payload_type_t read_payload_type(recv_state_t *state, int sock) +{ + tailer_packet_payload_type_t retval = TPPT_DONE; + + assert(*state == RS_PAYLOAD_TYPE); + + *state = readall(*state, sock, &retval, sizeof(retval)); + if (*state != RS_ERROR && retval == TPPT_DONE) { + *state = RS_PACKET_TYPE; + } + return retval; +} + +static char *readstr(recv_state_t *state, int sock) +{ + assert(*state == RS_PAYLOAD_TYPE); + + tailer_packet_payload_type_t payload_type = read_payload_type(state, sock); + + if (payload_type != TPPT_STRING) { + fprintf(stderr, "error: expected string, got: %d\n", payload_type); + return NULL; + } + + int32_t length; + + *state = RS_PAYLOAD_LENGTH; + *state = readall(*state, sock, &length, sizeof(length)); + if (*state == RS_ERROR) { + fprintf(stderr, "error: unable to read string length\n"); + return NULL; + } + + char *retval = malloc(length + 1); + if (retval == NULL) { + return NULL; + } + + *state = readall(*state, sock, retval, length); + if (*state == RS_ERROR) { + fprintf(stderr, "error: unable to read string of length: %d\n", length); + free(retval); + return NULL; + } + retval[length] = '\0'; + + return retval; +} + +static int readint64(recv_state_t *state, int sock, int64_t *i) +{ + tailer_packet_payload_type_t payload_type = read_payload_type(state, sock); + + if (payload_type != TPPT_INT64) { + fprintf(stderr, "error: expected int64, got: %d\n", payload_type); + return -1; + } + + *state = RS_PAYLOAD_CONTENT; + *state = readall(*state, sock, i, sizeof(*i)); + if (*state == -1) { + fprintf(stderr, "error: unable to read int64\n"); + return -1; + } + + return 0; +} + +struct list client_path_list; + +struct client_path_state *find_client_path_state(struct list *path_list, const char *path) +{ + struct client_path_state *curr = (struct client_path_state *) path_list->l_head; + + while (curr->cps_node.n_succ != NULL) { + if (strcmp(curr->cps_path, path) == 0) { + return curr; + } + + struct client_path_state *child = + find_client_path_state(&curr->cps_children, path); + + if (child != NULL) { + return child; + } + + curr = (struct client_path_state *) curr->cps_node.n_succ; + } + + return NULL; +} + +void send_preview_error(int64_t id, const char *path, const char *msg) +{ + send_packet(STDOUT_FILENO, + TPT_PREVIEW_ERROR, + TPPT_INT64, id, + TPPT_STRING, path, + TPPT_STRING, msg, + TPPT_DONE); +} + +void send_preview_data(int64_t id, const char *path, int32_t len, const char *bits) +{ + send_packet(STDOUT_FILENO, + TPT_PREVIEW_DATA, + TPPT_INT64, id, + TPPT_STRING, path, + TPPT_BITS, len, bits, + TPPT_DONE); +} + +int poll_paths(struct list *path_list, struct client_path_state *root_cps) +{ + struct client_path_state *curr = (struct client_path_state *) path_list->l_head; + int is_top = root_cps == NULL; + int retval = 0; + + while (curr->cps_node.n_succ != NULL) { + if (is_top) { + root_cps = curr; + } + + if (is_glob(curr->cps_path)) { + int changes = 0; + glob_t gl; + + memset(&gl, 0, sizeof(gl)); + if (glob(curr->cps_path, 0, NULL, &gl) != 0) { + set_client_path_state_error(curr, "glob"); + } else { + struct list prev_children; + + list_move(&prev_children, &curr->cps_children); + for (size_t lpc = 0; lpc < gl.gl_pathc; lpc++) { + struct client_path_state *child; + + if ((child = find_client_path_state( + &prev_children, gl.gl_pathv[lpc])) == NULL) { + child = create_client_path_state(gl.gl_pathv[lpc]); + changes += 1; + } else { + list_remove(&child->cps_node); + } + list_append(&curr->cps_children, &child->cps_node); + } + globfree(&gl); + + struct client_path_state *child; + + while ((child = (struct client_path_state *) list_remove_head( + &prev_children)) != NULL) { + send_error(child, "deleted"); + delete_client_path_state(child); + changes += 1; + } + + retval += poll_paths(&curr->cps_children, root_cps); + } + + if (changes) { + curr->cps_client_state = CS_INIT; + } else if (curr->cps_client_state != CS_SYNCED) { + send_packet(STDOUT_FILENO, + TPT_SYNCED, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; + } + + curr = (struct client_path_state *) curr->cps_node.n_succ; + continue; + } + + struct stat st; + int rc = lstat(curr->cps_path, &st); + + if (rc == -1) { + memset(&st, 0, sizeof(st)); + set_client_path_state_error(curr, "lstat"); + } else if (curr->cps_client_file_offset >= 0 && + ((curr->cps_last_stat.st_dev != st.st_dev && + curr->cps_last_stat.st_ino != st.st_ino) || + (st.st_size < curr->cps_last_stat.st_size))) { + send_error(curr, "replaced"); + set_client_path_state_error(curr, "replace"); + } else if (S_ISLNK(st.st_mode)) { + switch (curr->cps_client_state) { + case CS_INIT: { + char buffer[PATH_MAX]; + ssize_t link_len; + + link_len = readlink(curr->cps_path, buffer, sizeof(buffer)); + if (link_len < 0) { + set_client_path_state_error(curr, "readlink"); + } else { + buffer[link_len] = '\0'; + send_packet(STDOUT_FILENO, + TPT_LINK_BLOCK, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_STRING, buffer, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; + + if (buffer[0] == '/') { + struct client_path_state *child = + create_client_path_state(buffer); + + fprintf(stderr, "info: monitoring link path %s\n", + buffer); + list_append(&curr->cps_children, &child->cps_node); + } + + retval += 1; + } + break; + } + case CS_SYNCED: + break; + case CS_OFFERED: + case CS_TAILING: + fprintf(stderr, + "internal-error: unexpected state for path -- %s\n", + curr->cps_path); + break; + } + + retval += poll_paths(&curr->cps_children, root_cps); + + curr->cps_last_path_state = PS_OK; + } else if (S_ISREG(st.st_mode)) { + switch (curr->cps_client_state) { + case CS_INIT: + case CS_TAILING: + case CS_SYNCED: { + if (curr->cps_client_file_offset < st.st_size) { + int fd = open(curr->cps_path, O_RDONLY); + + if (fd == -1) { + set_client_path_state_error(curr, "open"); + } else { + static unsigned char buffer[4 * 1024 * 1024]; + + int64_t file_offset = + curr->cps_client_file_offset < 0 ? + 0 : + curr->cps_client_file_offset; + int64_t nbytes = sizeof(buffer); + if (curr->cps_client_state == CS_INIT) { + if (curr->cps_client_file_size == 0) { + // initial state, haven't heard from client yet. + nbytes = 32 * 1024; + } else if (file_offset < curr->cps_client_file_size) { + // heard from client, try to catch up + nbytes = curr->cps_client_file_size - file_offset; + if (nbytes > sizeof(buffer)) { + nbytes = sizeof(buffer); + } + } + } + int32_t bytes_read = pread(fd, buffer, nbytes, file_offset); + + if (bytes_read == -1) { + set_client_path_state_error(curr, "pread"); + } else if (curr->cps_client_state == CS_INIT && + (curr->cps_client_file_offset < 0 || + bytes_read > 0)) { + static unsigned char + HASH_BUFFER[4 * 1024 * 1024]; + BYTE hash[SHA256_BLOCK_SIZE]; + size_t remaining = 0; + int64_t remaining_offset + = file_offset + bytes_read; + SHA256_CTX shactx; + + if (curr->cps_client_file_size > 0 + && file_offset < curr->cps_client_file_size) + { + remaining = curr->cps_client_file_size + - file_offset - bytes_read; + } + + fprintf(stderr, + "info: prepping offer: init=%d; " + "remaining=%zu; %s\n", + bytes_read, + remaining, + curr->cps_path); + sha256_init(&shactx); + sha256_update(&shactx, buffer, bytes_read); + while (remaining > 0) { + nbytes = sizeof(HASH_BUFFER); + if (remaining < nbytes) { + nbytes = remaining; + } + ssize_t remaining_bytes_read + = pread(fd, + HASH_BUFFER, + nbytes, + remaining_offset); + if (remaining_bytes_read < 0) { + set_client_path_state_error(curr, "pread"); + break; + } + if (remaining_bytes_read == 0) { + remaining = 0; + break; + } + sha256_update(&shactx, HASH_BUFFER, remaining_bytes_read); + remaining -= remaining_bytes_read; + remaining_offset += remaining_bytes_read; + bytes_read += remaining_bytes_read; + } + + if (remaining == 0) { + sha256_final(&shactx, hash); + + send_packet(STDOUT_FILENO, + TPT_OFFER_BLOCK, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_INT64, + (int64_t) st.st_mtime, + TPPT_INT64, file_offset, + TPPT_INT64, (int64_t) bytes_read, + TPPT_HASH, hash, + TPPT_DONE); + curr->cps_client_state = CS_OFFERED; + } + } else { + if (curr->cps_client_file_offset < 0) { + curr->cps_client_file_offset = 0; + } + + send_packet(STDOUT_FILENO, + TPT_TAIL_BLOCK, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_INT64, (int64_t) st.st_mtime, + TPPT_INT64, curr->cps_client_file_offset, + TPPT_BITS, bytes_read, buffer, + TPPT_DONE); + curr->cps_client_file_offset += bytes_read; + curr->cps_client_state = CS_TAILING; + } + close(fd); + + retval = 1; + } + } else if (curr->cps_client_state != CS_SYNCED) { + send_packet(STDOUT_FILENO, + TPT_SYNCED, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; + } + break; + } + case CS_OFFERED: { + // Still waiting for the client ack + break; + } + } + + curr->cps_last_path_state = PS_OK; + } else if (S_ISDIR(st.st_mode)) { + DIR *dir = opendir(curr->cps_path); + + if (dir == NULL) { + set_client_path_state_error(curr, "opendir"); + } else { + struct list prev_children; + struct dirent *entry; + int changes = 0; + + list_move(&prev_children, &curr->cps_children); + while ((entry = readdir(dir)) != NULL) { + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0) { + continue; + } + + if (entry->d_type != DT_REG && + entry->d_type != DT_LNK) { + continue; + } + + char full_path[PATH_MAX]; + + snprintf(full_path, sizeof(full_path), + "%s/%s", + curr->cps_path, entry->d_name); + + struct client_path_state *child = find_client_path_state(&prev_children, full_path); + + if (child == NULL) { + // new file + fprintf(stderr, "info: monitoring child path: %s\n", full_path); + child = create_client_path_state(full_path); + changes += 1; + } else { + list_remove(&child->cps_node); + } + list_append(&curr->cps_children, &child->cps_node); + } + closedir(dir); + + struct client_path_state *child; + + while ((child = (struct client_path_state *) list_remove_head( + &prev_children)) != NULL) { + send_error(child, "deleted"); + delete_client_path_state(child); + changes += 1; + } + + retval += poll_paths(&curr->cps_children, root_cps); + + if (changes) { + curr->cps_client_state = CS_INIT; + } else if (curr->cps_client_state != CS_SYNCED) { + send_packet(STDOUT_FILENO, + TPT_SYNCED, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; + } + } + + curr->cps_last_path_state = PS_OK; + } + + curr->cps_last_stat = st; + + curr = (struct client_path_state *) curr->cps_node.n_succ; + } + + fflush(stderr); + + return retval; +} + +static +void send_possible_paths(const char *glob_path, int depth) +{ + glob_t gl; + + memset(&gl, 0, sizeof(gl)); + if (glob(glob_path, GLOB_MARK, NULL, &gl) == 0) { + for (size_t lpc = 0; + lpc < gl.gl_pathc; + lpc++) { + const char *child_path = gl.gl_pathv[lpc]; + size_t child_len = strlen(gl.gl_pathv[lpc]); + + send_packet(STDOUT_FILENO, + TPT_POSSIBLE_PATH, + TPPT_STRING, child_path, + TPPT_DONE); + + if (depth == 0 && child_path[child_len - 1] == '/') { + char *child_copy = malloc(child_len + 2); + + strcpy(child_copy, child_path); + strcat(child_copy, "*"); + send_possible_paths(child_copy, depth + 1); + free(child_copy); + } + } + } + + globfree(&gl); +} + +static +void handle_load_preview_request(const char *path, int64_t preview_id) +{ + struct stat st; + + fprintf(stderr, "info: load preview request -- %lld\n", preview_id); + if (is_glob(path)) { + glob_t gl; + + memset(&gl, 0, sizeof(gl)); + if (glob(path, 0, NULL, &gl) != 0) { + char msg[1024]; + + snprintf(msg, sizeof(msg), + "error: cannot glob %s -- %s", + path, + strerror(errno)); + send_preview_error(preview_id, path, msg); + } else { + char *bits = malloc(1024 * 1024); + int lpc, line_count = 10; + + bits[0] = '\0'; + for (lpc = 0; + line_count > 0 && lpc < gl.gl_pathc; + lpc++, line_count--) { + strcat(bits, gl.gl_pathv[lpc]); + strcat(bits, "\n"); + } + + if (lpc < gl.gl_pathc) { + strcat(bits, " ... and more! ...\n"); + } + + send_preview_data(preview_id, path, strlen(bits), bits); + + globfree(&gl); + free(bits); + } + } + else if (stat(path, &st) == -1) { + char msg[1024]; + + snprintf(msg, sizeof(msg), + "error: cannot open %s -- %s", + path, + strerror(errno)); + send_preview_error(preview_id, path, msg); + } else if (S_ISREG(st.st_mode)) { + size_t capacity = 1024 * 1024; + char *bits = malloc(capacity); + FILE *file; + + if ((file = fopen(path, "r")) == NULL) { + char msg[1024]; + + snprintf(msg, sizeof(msg), + "error: cannot open %s -- %s", + path, + strerror(errno)); + send_preview_error(preview_id, path, msg); + } else { + int line_count = 10; + size_t offset = 0; + char *line; + + while (line_count && + (capacity - offset) > 1024 && + (line = fgets(&bits[offset], capacity - offset, file)) != NULL) { + offset += strlen(line); + line_count -= 1; + } + + fclose(file); + + send_preview_data(preview_id, path, offset, bits); + } + free(bits); + } else if (S_ISDIR(st.st_mode)) { + DIR *dir = opendir(path); + + if (dir == NULL) { + char msg[1024]; + + snprintf(msg, sizeof(msg), + "error: unable to open directory -- %s", + path); + send_preview_error(preview_id, path, msg); + } else { + char *bits = malloc(1024 * 1024); + struct dirent *entry; + int line_count = 10; + + bits[0] = '\0'; + while ((entry = readdir(dir)) != NULL) { + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0) { + continue; + } + if (entry->d_type != DT_REG && + entry->d_type != DT_DIR) { + continue; + } + if (line_count == 1) { + strcat(bits, " ... and more! ...\n"); + break; + } + + strcat(bits, entry->d_name); + strcat(bits, "\n"); + + line_count -= 1; + } + + closedir(dir); + + send_preview_data(preview_id, path, strlen(bits), bits); + + free(bits); + } + } else { + char msg[1024]; + + snprintf(msg, sizeof(msg), + "error: path is not a file or directory -- %s", + path); + send_preview_error(preview_id, path, msg); + } +} + +static +void handle_complete_path_request(const char *path) +{ + size_t path_len = strlen(path); + char *glob_path = malloc(path_len + 3); + struct stat st; + + strcpy(glob_path, path); + fprintf(stderr, "complete path: %s\n", path); + if (path[path_len - 1] != '/' && + stat(path, &st) == 0 && + S_ISDIR(st.st_mode)) { + strcat(glob_path, "/"); + } + if (path[path_len - 1] != '*') { + strcat(glob_path, "*"); + } + fprintf(stderr, "complete glob path: %s\n", glob_path); + send_possible_paths(glob_path, 0); + + free(glob_path); +} + +int main(int argc, char *argv[]) +{ + int done = 0, timeout = 0; + recv_state_t rstate = RS_PACKET_TYPE; + + // No need to leave ourselves around + if (argc == 1) { + unlink(argv[0]); + } + + list_init(&client_path_list); + + { + FILE *unameFile = popen("uname -mrsv", "r"); + + if (unameFile != NULL) { + char buffer[1024]; + + fgets(buffer, sizeof(buffer), unameFile); + char *bufend = buffer + strlen(buffer) - 1; + while (isspace(*bufend)) { + bufend -= 1; + } + *bufend = '\0'; + send_packet(STDOUT_FILENO, + TPT_ANNOUNCE, + TPPT_STRING, buffer, + TPPT_DONE); + pclose(unameFile); + } + } + + while (!done) { + struct pollfd pfds[1]; + + pfds[0].fd = STDIN_FILENO; + pfds[0].events = POLLIN; + pfds[0].revents = 0; + + int ready_count = poll(pfds, 1, timeout); + + if (ready_count) { + tailer_packet_type_t type; + + assert(rstate == RS_PACKET_TYPE); + rstate = readall(rstate, STDIN_FILENO, &type, sizeof(type)); + if (rstate == RS_ERROR) { + fprintf(stderr, "info: exiting...\n"); + done = 1; + } else { + switch (type) { + case TPT_OPEN_PATH: + case TPT_CLOSE_PATH: + case TPT_LOAD_PREVIEW: + case TPT_COMPLETE_PATH: { + char *path = readstr(&rstate, STDIN_FILENO); + int64_t preview_id = 0; + + if (type == TPT_LOAD_PREVIEW) { + if (readint64(&rstate, STDIN_FILENO, &preview_id) == -1) { + done = 1; + break; + } + } + if (path == NULL) { + fprintf(stderr, "error: unable to get path to open\n"); + done = 1; + } else if (read_payload_type(&rstate, STDIN_FILENO) != TPPT_DONE) { + fprintf(stderr, "error: invalid open packet\n"); + done = 1; + } else if (type == TPT_OPEN_PATH) { + struct client_path_state *cps; + + cps = find_client_path_state(&client_path_list, path); + if (cps != NULL) { + fprintf(stderr, "warning: already monitoring -- %s\n", path); + } else { + cps = create_client_path_state(path); + + fprintf(stderr, "info: monitoring path: %s\n", path); + list_append(&client_path_list, &cps->cps_node); + } + } else if (type == TPT_CLOSE_PATH) { + struct client_path_state *cps = find_client_path_state(&client_path_list, path); + + if (cps == NULL) { + fprintf(stderr, "warning: path is not open: %s\n", path); + } else { + list_remove(&cps->cps_node); + delete_client_path_state(cps); + } + } else if (type == TPT_LOAD_PREVIEW) { + handle_load_preview_request(path, preview_id); + } else if (type == TPT_COMPLETE_PATH) { + handle_complete_path_request(path); + } + + free(path); + break; + } + case TPT_ACK_BLOCK: + case TPT_NEED_BLOCK: { + char *path = readstr(&rstate, STDIN_FILENO); + int64_t ack_offset = 0, ack_len = 0, client_size = 0; + + if (type == TPT_ACK_BLOCK && + (readint64(&rstate, STDIN_FILENO, &ack_offset) == -1 || + readint64(&rstate, STDIN_FILENO, &ack_len) == -1 || + readint64(&rstate, STDIN_FILENO, &client_size) == -1)) { + done = 1; + break; + } + + // fprintf(stderr, "info: block packet path: %s\n", path); + if (path == NULL) { + fprintf(stderr, "error: unable to get block path\n"); + done = 1; + } else if (read_payload_type(&rstate, STDIN_FILENO) != TPPT_DONE) { + fprintf(stderr, "error: invalid block packet\n"); + done = 1; + } else { + struct client_path_state *cps = find_client_path_state(&client_path_list, path); + + if (cps == NULL) { + fprintf(stderr, "warning: unknown path in block packet: %s\n", path); + } else if (type == TPT_NEED_BLOCK) { + fprintf(stderr, "info: client is tailing: %s\n", path); + cps->cps_client_state = CS_TAILING; + } else if (type == TPT_ACK_BLOCK) { + fprintf(stderr, + "info: client acked: %s %lld\n", + path, + client_size); + if (ack_len == 0) { + cps->cps_client_state = CS_TAILING; + } else { + cps->cps_client_file_offset = ack_offset + ack_len; + cps->cps_client_state = CS_INIT; + cps->cps_client_file_size = client_size; + } + } + free(path); + } + break; + } + default: { + assert(0); + } + } + } + } + + if (!done) { + if (poll_paths(&client_path_list, NULL)) { + timeout = 0; + } else { + timeout = 1000; + } + } + } + + return EXIT_SUCCESS; +} diff --git a/src/tailer/tailerpp.cc b/src/tailer/tailerpp.cc new file mode 100644 index 0000000..1ea0a9e --- /dev/null +++ b/src/tailer/tailerpp.cc @@ -0,0 +1,146 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "tailerpp.hh" + +#include <unistd.h> + +namespace tailer { + +int +readall(int sock, void* buf, size_t len) +{ + char* cbuf = (char*) buf; + off_t offset = 0; + + while (len > 0) { + ssize_t rc = read(sock, &cbuf[offset], len); + + if (rc == -1) { + switch (errno) { + case EAGAIN: + case EINTR: + break; + default: + return -1; + } + } else if (rc == 0) { + errno = EIO; + return -1; + } else { + len -= rc; + offset += rc; + } + } + + return 0; +} + +Result<packet, std::string> +read_packet(int fd) +{ + tailer_packet_type_t type; + + if (readall(fd, &type, sizeof(type)) == -1) { + return Ok(packet{packet_eof{}}); + } + switch (type) { + case TPT_ERROR: { + packet_error pe; + + TRY(read_payloads_into(fd, pe.pe_path, pe.pe_msg)); + return Ok(packet{pe}); + } + case TPT_ANNOUNCE: { + packet_announce pa; + + TRY(read_payloads_into(fd, pa.pa_uname)); + return Ok(packet{pa}); + } + case TPT_OFFER_BLOCK: { + packet_offer_block pob; + + TRY(read_payloads_into(fd, + pob.pob_root_path, + pob.pob_path, + pob.pob_mtime, + pob.pob_offset, + pob.pob_length, + pob.pob_hash)); + return Ok(packet{pob}); + } + case TPT_TAIL_BLOCK: { + packet_tail_block ptb; + + TRY(read_payloads_into(fd, + ptb.ptb_root_path, + ptb.ptb_path, + ptb.ptb_mtime, + ptb.ptb_offset, + ptb.ptb_bits)); + return Ok(packet{ptb}); + } + case TPT_SYNCED: { + packet_synced ps; + + TRY(read_payloads_into(fd, ps.ps_root_path, ps.ps_path)); + return Ok(packet{ps}); + } + case TPT_LINK_BLOCK: { + packet_link pl; + + TRY(read_payloads_into( + fd, pl.pl_root_path, pl.pl_path, pl.pl_link_value)); + return Ok(packet{pl}); + } + case TPT_PREVIEW_ERROR: { + packet_preview_error ppe; + + TRY(read_payloads_into(fd, ppe.ppe_id, ppe.ppe_path, ppe.ppe_msg)); + return Ok(packet{ppe}); + } + case TPT_PREVIEW_DATA: { + packet_preview_data ppd; + + TRY(read_payloads_into(fd, ppd.ppd_id, ppd.ppd_path, ppd.ppd_bits)); + return Ok(packet{ppd}); + } + case TPT_POSSIBLE_PATH: { + packet_possible_path ppp; + + TRY(read_payloads_into(fd, ppp.ppp_path)); + return Ok(packet{ppp}); + } + default: + assert(0); + break; + } +} + +} // namespace tailer diff --git a/src/tailer/tailerpp.hh b/src/tailer/tailerpp.hh new file mode 100644 index 0000000..0447379 --- /dev/null +++ b/src/tailer/tailerpp.hh @@ -0,0 +1,315 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef lnav_tailerpp_hh +#define lnav_tailerpp_hh + +#include <string> +#include <vector> + +#include "base/result.h" +#include "fmt/format.h" +#include "mapbox/variant.hpp" +#include "sha-256.h" +#include "tailer.h" + +namespace tailer { + +struct packet_eof {}; + +struct packet_error { + std::string pe_path; + std::string pe_msg; +}; + +struct packet_announce { + std::string pa_uname; +}; + +struct hash_frag { + uint8_t thf_hash[SHA256_BLOCK_SIZE]; + + bool operator==(const hash_frag& other) const + { + return memcmp(this->thf_hash, other.thf_hash, sizeof(this->thf_hash)) + == 0; + } +}; + +struct packet_log { + std::string pl_msg; +}; + +struct packet_offer_block { + std::string pob_root_path; + std::string pob_path; + int64_t pob_mtime; + int64_t pob_offset; + int64_t pob_length; + hash_frag pob_hash; +}; + +struct packet_tail_block { + std::string ptb_root_path; + std::string ptb_path; + int64_t ptb_mtime; + int64_t ptb_offset; + std::vector<uint8_t> ptb_bits; +}; + +struct packet_synced { + std::string ps_root_path; + std::string ps_path; +}; + +struct packet_link { + std::string pl_root_path; + std::string pl_path; + std::string pl_link_value; +}; + +struct packet_preview_error { + int64_t ppe_id; + std::string ppe_path; + std::string ppe_msg; +}; + +struct packet_preview_data { + int64_t ppd_id; + std::string ppd_path; + std::vector<uint8_t> ppd_bits; +}; + +struct packet_possible_path { + std::string ppp_path; +}; + +using packet = mapbox::util::variant<packet_eof, + packet_announce, + packet_error, + packet_offer_block, + packet_tail_block, + packet_link, + packet_preview_error, + packet_preview_data, + packet_possible_path, + packet_synced>; + +struct recv_payload_type {}; +struct recv_payload_length {}; +struct recv_payload_content {}; + +int readall(int sock, void* buf, size_t len); + +namespace details { + +template<class...> +using void_t = void; + +template<class, class = void> +struct has_data : std::false_type {}; + +template<class T> +struct has_data<T, decltype(void(std::declval<T&>().data()))> + : std::true_type {}; + +template<typename T, std::enable_if_t<has_data<T>::value, bool> = true> +uint8_t* +get_data(T& t) +{ + return (uint8_t*) t.data(); +} + +template<typename T, std::enable_if_t<!has_data<T>::value, bool> = true> +uint8_t* +get_data(T& t) +{ + return (uint8_t*) &t; +} + +} // namespace details + +template<int PAYLOAD_TYPE, typename EXPECT = recv_payload_type> +struct protocol_recv { + static constexpr bool HAS_LENGTH = (PAYLOAD_TYPE == TPPT_STRING) + || (PAYLOAD_TYPE == TPPT_BITS); + + using after_type = typename std::conditional<HAS_LENGTH, + recv_payload_length, + recv_payload_content>::type; + + static Result<protocol_recv<PAYLOAD_TYPE, after_type>, std::string> create( + int fd) + { + return protocol_recv<PAYLOAD_TYPE>(fd).read_type(); + } + + Result<protocol_recv<PAYLOAD_TYPE, after_type>, std::string> read_type() && + { + static_assert(std::is_same<EXPECT, recv_payload_type>::value, + "read_type() cannot be called in this state"); + + tailer_packet_payload_type_t payload_type; + + if (readall(this->pr_fd, &payload_type, sizeof(payload_type)) == -1) { + return Err( + fmt::format(FMT_STRING("unable to read payload type: {}"), + strerror(errno))); + } + + if (payload_type != PAYLOAD_TYPE) { + return Err(fmt::format( + FMT_STRING("payload-type mismatch, got: {}; expected: {}"), + (int) payload_type, + PAYLOAD_TYPE)); + } + + return Ok(protocol_recv<PAYLOAD_TYPE, after_type>(this->pr_fd)); + } + + template<typename T> + Result<protocol_recv<PAYLOAD_TYPE, recv_payload_content>, std::string> + read_length(T& data) && + { + static_assert(std::is_same<EXPECT, recv_payload_length>::value, + "read_length() cannot be called in this state"); + + if (readall(this->pr_fd, &this->pr_length, sizeof(this->pr_length)) + == -1) + { + return Err( + fmt::format(FMT_STRING("unable to read content length: {}"), + strerror(errno))); + } + + try { + data.resize(this->pr_length); + } catch (...) { + return Err(fmt::format(FMT_STRING("unable to resize data to {}"), + this->pr_length)); + } + + return Ok(protocol_recv<PAYLOAD_TYPE, recv_payload_content>( + this->pr_fd, this->pr_length)); + } + + template<typename T> + Result<void, std::string> read_content(T& data) && + { + static_assert(std::is_same<EXPECT, recv_payload_content>::value, + "read_content() cannot be called in this state"); + static_assert(!HAS_LENGTH || details::has_data<T>::value, "boo"); + + if (!HAS_LENGTH) { + this->pr_length = sizeof(T); + } + if (readall(this->pr_fd, details::get_data(data), this->pr_length) + == -1) + { + return Err(fmt::format(FMT_STRING("unable to read content -- {}"), + strerror(errno))); + } + + return Ok(); + } + +private: + template<int P, typename E> + friend struct protocol_recv; + + explicit protocol_recv(int fd, int32_t length = 0) + : pr_fd(fd), pr_length(length) + { + } + + int pr_fd; + int32_t pr_length; +}; + +inline Result<void, std::string> +read_payloads_into(int fd) +{ + tailer_packet_payload_type_t payload_type; + + readall(fd, &payload_type, sizeof(payload_type)); + if (payload_type != TPPT_DONE) { + return Err(std::string("not done")); + } + + return Ok(); +} + +template<typename... Ts> +Result<void, std::string> read_payloads_into(int fd, + std::string& str, + Ts&... args); + +template<typename... Ts> +Result<void, std::string> +read_payloads_into(int fd, std::vector<uint8_t>& bits, Ts&... args) +{ + TRY(TRY(TRY(protocol_recv<TPPT_BITS>::create(fd)).read_length(bits)) + .read_content(bits)); + + return read_payloads_into(fd, args...); +} + +template<typename... Ts> +Result<void, std::string> +read_payloads_into(int fd, hash_frag& thf, Ts&... args) +{ + TRY(TRY(protocol_recv<TPPT_HASH>::create(fd)).read_content(thf.thf_hash)); + + return read_payloads_into(fd, args...); +} + +template<typename... Ts> +Result<void, std::string> +read_payloads_into(int fd, int64_t& i, Ts&... args) +{ + TRY(TRY(protocol_recv<TPPT_INT64>::create(fd)).read_content(i)); + + return read_payloads_into(fd, args...); +} + +template<typename... Ts> +Result<void, std::string> +read_payloads_into(int fd, std::string& str, Ts&... args) +{ + TRY(TRY(TRY(protocol_recv<TPPT_STRING>::create(fd)).read_length(str)) + .read_content(str)); + + return read_payloads_into(fd, args...); +} + +Result<packet, std::string> read_packet(int fd); + +} // namespace tailer + +#endif diff --git a/src/tailer/test_tailer.sh b/src/tailer/test_tailer.sh new file mode 100755 index 0000000..932fe24 --- /dev/null +++ b/src/tailer/test_tailer.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +run_test ./drive_tailer preview nonexistent-file + +check_error_output "preview of nonexistent-file failed?" <<EOF +preview error: nonexistent-file -- error: cannot open nonexistent-file -- No such file or directory +tailer stderr: +info: load preview request -- 1234 +info: exiting... +EOF + +run_test ./drive_tailer preview ${test_dir}/logfile_access_log.0 + +check_output "preview of file failed?" <<EOF +preview of file: {test_dir}/logfile_access_log.0 +192.168.202.254 - - [20/Jul/2009:22:59:26 +0000] "GET /vmw/cgi/tramp HTTP/1.0" 200 134 "-" "gPXE/0.9.7" +192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkboot.gz HTTP/1.0" 404 46210 "-" "gPXE/0.9.7" +192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkernel.gz HTTP/1.0" 200 78929 "-" "gPXE/0.9.7" + +all done! +tailer stderr: +info: load preview request -- 1234 +info: exiting... +EOF + +run_cap_test ./drive_tailer preview "${test_dir}/remote-log-dir/*" + +run_test ./drive_tailer possible "${test_dir}/logfile_access_log.*" + +check_output "possible path list failed?" <<EOF +possible path: {test_dir}/logfile_access_log.0 +possible path: {test_dir}/logfile_access_log.1 +all done! +tailer stderr: +complete path: {test_dir}/logfile_access_log.* +complete glob path: {test_dir}/logfile_access_log.* +info: exiting... +EOF + +ln -sf bar foo + +run_test ./drive_tailer open foo + +check_output "open link not working?" <<EOF +link value: foo -> bar +all done! +tailer stderr: +info: monitoring path: foo +info: exiting... +EOF |