summaryrefslogtreecommitdiffstats
path: root/src/tailer/tailer.main.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/tailer/tailer.main.c1072
1 files changed, 1072 insertions, 0 deletions
diff --git a/src/tailer/tailer.main.c b/src/tailer/tailer.main.c
new file mode 100644
index 0000000..445d24c
--- /dev/null
+++ b/src/tailer/tailer.main.c
@@ -0,0 +1,1072 @@
+/**
+ * Copyright (c) 2021, Timothy Stack
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * * Neither the name of Timothy Stack nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __COSMOPOLITAN__
+#include <glob.h>
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include <dirent.h>
+#include <stdarg.h>
+#include <limits.h>
+#include <poll.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/utsname.h>
+#include <ctype.h>
+#include <stdint.h>
+#endif
+
+#include "sha-256.h"
+#include "tailer.h"
+
+struct node {
+ struct node *n_succ;
+ struct node *n_pred;
+};
+
+struct list {
+ struct node *l_head;
+ struct node *l_tail;
+ struct node *l_tail_pred;
+};
+
+int is_glob(const char *fn)
+{
+ return (strchr(fn, '*') != NULL ||
+ strchr(fn, '?') != NULL ||
+ strchr(fn, '[') != NULL);
+};
+
+void list_init(struct list *l)
+{
+ l->l_head = (struct node *) &l->l_tail;
+ l->l_tail = NULL;
+ l->l_tail_pred = (struct node *) &l->l_head;
+}
+
+void list_move(struct list *dst, struct list *src)
+{
+ if (src->l_head->n_succ == NULL) {
+ list_init(dst);
+ return;
+ }
+
+ dst->l_head = src->l_head;
+ dst->l_head->n_pred = (struct node *) &dst->l_head;
+ dst->l_tail = NULL;
+ dst->l_tail_pred = src->l_tail_pred;
+ dst->l_tail_pred->n_succ = (struct node *) &dst->l_tail;
+
+ list_init(src);
+}
+
+void list_remove(struct node *n)
+{
+ n->n_pred->n_succ = n->n_succ;
+ n->n_succ->n_pred = n->n_pred;
+
+ n->n_succ = NULL;
+ n->n_pred = NULL;
+}
+
+struct node *list_remove_head(struct list *l)
+{
+ struct node *retval = NULL;
+
+ if (l->l_head->n_succ != NULL) {
+ retval = l->l_head;
+ list_remove(l->l_head);
+ }
+
+ return retval;
+}
+
+void list_append(struct list *l, struct node *n)
+{
+ n->n_pred = l->l_tail_pred;
+ n->n_succ = (struct node *) &l->l_tail;
+ l->l_tail_pred->n_succ = n;
+ l->l_tail_pred = n;
+}
+
+typedef enum {
+ CS_INIT,
+ CS_OFFERED,
+ CS_TAILING,
+ CS_SYNCED,
+} client_state_t;
+
+typedef enum {
+ PS_UNKNOWN,
+ PS_OK,
+ PS_ERROR,
+} path_state_t;
+
+struct client_path_state {
+ struct node cps_node;
+ char *cps_path;
+ path_state_t cps_last_path_state;
+ struct stat cps_last_stat;
+ int64_t cps_client_file_offset;
+ int64_t cps_client_file_size;
+ client_state_t cps_client_state;
+ struct list cps_children;
+};
+
+struct client_path_state *create_client_path_state(const char *path)
+{
+ struct client_path_state *retval = malloc(sizeof(struct client_path_state));
+
+ retval->cps_path = strdup(path);
+ retval->cps_last_path_state = PS_UNKNOWN;
+ memset(&retval->cps_last_stat, 0, sizeof(retval->cps_last_stat));
+ retval->cps_client_file_offset = -1;
+ retval->cps_client_file_size = 0;
+ retval->cps_client_state = CS_INIT;
+ list_init(&retval->cps_children);
+ return retval;
+}
+
+void delete_client_path_state(struct client_path_state *cps);
+
+void delete_client_path_list(struct list *l)
+{
+ struct client_path_state *child_cps;
+
+ while ((child_cps = (struct client_path_state *) list_remove_head(l)) != NULL) {
+ list_remove(&child_cps->cps_node);
+ delete_client_path_state(child_cps);
+ }
+}
+
+void delete_client_path_state(struct client_path_state *cps)
+{
+ free(cps->cps_path);
+ delete_client_path_list(&cps->cps_children);
+ free(cps);
+}
+
+void dump_client_path_states(struct list *path_list)
+{
+ struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
+
+ while (curr->cps_node.n_succ != NULL) {
+ fprintf(stderr, "debug: path %s\n", curr->cps_path);
+ dump_client_path_states(&curr->cps_children);
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ }
+
+ curr = (struct client_path_state *) path_list->l_tail_pred;
+ while (curr->cps_node.n_pred != NULL) {
+ fprintf(stderr, "debug: back path %s\n", curr->cps_path);
+ dump_client_path_states(&curr->cps_children);
+
+ curr = (struct client_path_state *) curr->cps_node.n_pred;
+ }
+}
+
+void send_error(struct client_path_state *cps, char *msg, ...)
+{
+ char buffer[1024];
+ va_list args;
+
+ va_start(args, msg);
+ vsnprintf(buffer, sizeof(buffer), msg, args);
+ va_end(args);
+
+ send_packet(STDOUT_FILENO,
+ TPT_ERROR,
+ TPPT_STRING, cps->cps_path,
+ TPPT_STRING, buffer,
+ TPPT_DONE);
+}
+
+void set_client_path_state_error(struct client_path_state *cps, const char *op)
+{
+ if (cps->cps_last_path_state != PS_ERROR) {
+ // tell client of the problem
+ send_error(cps, "unable to %s -- %s", op, strerror(errno));
+ }
+ cps->cps_last_path_state = PS_ERROR;
+ cps->cps_client_file_offset = -1;
+ cps->cps_client_state = CS_INIT;
+ delete_client_path_list(&cps->cps_children);
+}
+
+typedef enum {
+ RS_ERROR,
+ RS_PACKET_TYPE,
+ RS_PAYLOAD_TYPE,
+ RS_PAYLOAD,
+ RS_PAYLOAD_LENGTH,
+ RS_PAYLOAD_CONTENT,
+} recv_state_t;
+
+static recv_state_t readall(recv_state_t state, int sock, void *buf, size_t len)
+{
+ char *cbuf = (char *) buf;
+ off_t offset = 0;
+
+ if (state == RS_ERROR) {
+ return RS_ERROR;
+ }
+
+ while (len > 0) {
+ ssize_t rc = read(sock, &cbuf[offset], len);
+
+ if (rc == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+
+ } else {
+ return RS_ERROR;
+ }
+ }
+ else if (rc == 0) {
+ errno = EIO;
+ return RS_ERROR;
+ }
+ else {
+ len -= rc;
+ offset += rc;
+ }
+ }
+
+ switch (state) {
+ case RS_PACKET_TYPE:
+ return RS_PAYLOAD_TYPE;
+ case RS_PAYLOAD_TYPE:
+ return RS_PAYLOAD;
+ case RS_PAYLOAD_LENGTH:
+ return RS_PAYLOAD_CONTENT;
+ case RS_PAYLOAD_CONTENT:
+ return RS_PAYLOAD_TYPE;
+ default:
+ return RS_ERROR;
+ }
+}
+
+static tailer_packet_payload_type_t read_payload_type(recv_state_t *state, int sock)
+{
+ tailer_packet_payload_type_t retval = TPPT_DONE;
+
+ assert(*state == RS_PAYLOAD_TYPE);
+
+ *state = readall(*state, sock, &retval, sizeof(retval));
+ if (*state != RS_ERROR && retval == TPPT_DONE) {
+ *state = RS_PACKET_TYPE;
+ }
+ return retval;
+}
+
+static char *readstr(recv_state_t *state, int sock)
+{
+ assert(*state == RS_PAYLOAD_TYPE);
+
+ tailer_packet_payload_type_t payload_type = read_payload_type(state, sock);
+
+ if (payload_type != TPPT_STRING) {
+ fprintf(stderr, "error: expected string, got: %d\n", payload_type);
+ return NULL;
+ }
+
+ int32_t length;
+
+ *state = RS_PAYLOAD_LENGTH;
+ *state = readall(*state, sock, &length, sizeof(length));
+ if (*state == RS_ERROR) {
+ fprintf(stderr, "error: unable to read string length\n");
+ return NULL;
+ }
+
+ char *retval = malloc(length + 1);
+ if (retval == NULL) {
+ return NULL;
+ }
+
+ *state = readall(*state, sock, retval, length);
+ if (*state == RS_ERROR) {
+ fprintf(stderr, "error: unable to read string of length: %d\n", length);
+ free(retval);
+ return NULL;
+ }
+ retval[length] = '\0';
+
+ return retval;
+}
+
+static int readint64(recv_state_t *state, int sock, int64_t *i)
+{
+ tailer_packet_payload_type_t payload_type = read_payload_type(state, sock);
+
+ if (payload_type != TPPT_INT64) {
+ fprintf(stderr, "error: expected int64, got: %d\n", payload_type);
+ return -1;
+ }
+
+ *state = RS_PAYLOAD_CONTENT;
+ *state = readall(*state, sock, i, sizeof(*i));
+ if (*state == -1) {
+ fprintf(stderr, "error: unable to read int64\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct list client_path_list;
+
+struct client_path_state *find_client_path_state(struct list *path_list, const char *path)
+{
+ struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
+
+ while (curr->cps_node.n_succ != NULL) {
+ if (strcmp(curr->cps_path, path) == 0) {
+ return curr;
+ }
+
+ struct client_path_state *child =
+ find_client_path_state(&curr->cps_children, path);
+
+ if (child != NULL) {
+ return child;
+ }
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ }
+
+ return NULL;
+}
+
+void send_preview_error(int64_t id, const char *path, const char *msg)
+{
+ send_packet(STDOUT_FILENO,
+ TPT_PREVIEW_ERROR,
+ TPPT_INT64, id,
+ TPPT_STRING, path,
+ TPPT_STRING, msg,
+ TPPT_DONE);
+}
+
+void send_preview_data(int64_t id, const char *path, int32_t len, const char *bits)
+{
+ send_packet(STDOUT_FILENO,
+ TPT_PREVIEW_DATA,
+ TPPT_INT64, id,
+ TPPT_STRING, path,
+ TPPT_BITS, len, bits,
+ TPPT_DONE);
+}
+
+int poll_paths(struct list *path_list, struct client_path_state *root_cps)
+{
+ struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
+ int is_top = root_cps == NULL;
+ int retval = 0;
+
+ while (curr->cps_node.n_succ != NULL) {
+ if (is_top) {
+ root_cps = curr;
+ }
+
+ if (is_glob(curr->cps_path)) {
+ int changes = 0;
+ glob_t gl;
+
+ memset(&gl, 0, sizeof(gl));
+ if (glob(curr->cps_path, 0, NULL, &gl) != 0) {
+ set_client_path_state_error(curr, "glob");
+ } else {
+ struct list prev_children;
+
+ list_move(&prev_children, &curr->cps_children);
+ for (size_t lpc = 0; lpc < gl.gl_pathc; lpc++) {
+ struct client_path_state *child;
+
+ if ((child = find_client_path_state(
+ &prev_children, gl.gl_pathv[lpc])) == NULL) {
+ child = create_client_path_state(gl.gl_pathv[lpc]);
+ changes += 1;
+ } else {
+ list_remove(&child->cps_node);
+ }
+ list_append(&curr->cps_children, &child->cps_node);
+ }
+ globfree(&gl);
+
+ struct client_path_state *child;
+
+ while ((child = (struct client_path_state *) list_remove_head(
+ &prev_children)) != NULL) {
+ send_error(child, "deleted");
+ delete_client_path_state(child);
+ changes += 1;
+ }
+
+ retval += poll_paths(&curr->cps_children, root_cps);
+ }
+
+ if (changes) {
+ curr->cps_client_state = CS_INIT;
+ } else if (curr->cps_client_state != CS_SYNCED) {
+ send_packet(STDOUT_FILENO,
+ TPT_SYNCED,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+ }
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ continue;
+ }
+
+ struct stat st;
+ int rc = lstat(curr->cps_path, &st);
+
+ if (rc == -1) {
+ memset(&st, 0, sizeof(st));
+ set_client_path_state_error(curr, "lstat");
+ } else if (curr->cps_client_file_offset >= 0 &&
+ ((curr->cps_last_stat.st_dev != st.st_dev &&
+ curr->cps_last_stat.st_ino != st.st_ino) ||
+ (st.st_size < curr->cps_last_stat.st_size))) {
+ send_error(curr, "replaced");
+ set_client_path_state_error(curr, "replace");
+ } else if (S_ISLNK(st.st_mode)) {
+ switch (curr->cps_client_state) {
+ case CS_INIT: {
+ char buffer[PATH_MAX];
+ ssize_t link_len;
+
+ link_len = readlink(curr->cps_path, buffer, sizeof(buffer));
+ if (link_len < 0) {
+ set_client_path_state_error(curr, "readlink");
+ } else {
+ buffer[link_len] = '\0';
+ send_packet(STDOUT_FILENO,
+ TPT_LINK_BLOCK,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_STRING, buffer,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+
+ if (buffer[0] == '/') {
+ struct client_path_state *child =
+ create_client_path_state(buffer);
+
+ fprintf(stderr, "info: monitoring link path %s\n",
+ buffer);
+ list_append(&curr->cps_children, &child->cps_node);
+ }
+
+ retval += 1;
+ }
+ break;
+ }
+ case CS_SYNCED:
+ break;
+ case CS_OFFERED:
+ case CS_TAILING:
+ fprintf(stderr,
+ "internal-error: unexpected state for path -- %s\n",
+ curr->cps_path);
+ break;
+ }
+
+ retval += poll_paths(&curr->cps_children, root_cps);
+
+ curr->cps_last_path_state = PS_OK;
+ } else if (S_ISREG(st.st_mode)) {
+ switch (curr->cps_client_state) {
+ case CS_INIT:
+ case CS_TAILING:
+ case CS_SYNCED: {
+ if (curr->cps_client_file_offset < st.st_size) {
+ int fd = open(curr->cps_path, O_RDONLY);
+
+ if (fd == -1) {
+ set_client_path_state_error(curr, "open");
+ } else {
+ static unsigned char buffer[4 * 1024 * 1024];
+
+ int64_t file_offset =
+ curr->cps_client_file_offset < 0 ?
+ 0 :
+ curr->cps_client_file_offset;
+ int64_t nbytes = sizeof(buffer);
+ if (curr->cps_client_state == CS_INIT) {
+ if (curr->cps_client_file_size == 0) {
+ // initial state, haven't heard from client yet.
+ nbytes = 32 * 1024;
+ } else if (file_offset < curr->cps_client_file_size) {
+ // heard from client, try to catch up
+ nbytes = curr->cps_client_file_size - file_offset;
+ if (nbytes > sizeof(buffer)) {
+ nbytes = sizeof(buffer);
+ }
+ }
+ }
+ int32_t bytes_read = pread(fd, buffer, nbytes, file_offset);
+
+ if (bytes_read == -1) {
+ set_client_path_state_error(curr, "pread");
+ } else if (curr->cps_client_state == CS_INIT &&
+ (curr->cps_client_file_offset < 0 ||
+ bytes_read > 0)) {
+ static unsigned char
+ HASH_BUFFER[4 * 1024 * 1024];
+ BYTE hash[SHA256_BLOCK_SIZE];
+ size_t remaining = 0;
+ int64_t remaining_offset
+ = file_offset + bytes_read;
+ SHA256_CTX shactx;
+
+ if (curr->cps_client_file_size > 0
+ && file_offset < curr->cps_client_file_size)
+ {
+ remaining = curr->cps_client_file_size
+ - file_offset - bytes_read;
+ }
+
+ fprintf(stderr,
+ "info: prepping offer: init=%d; "
+ "remaining=%zu; %s\n",
+ bytes_read,
+ remaining,
+ curr->cps_path);
+ sha256_init(&shactx);
+ sha256_update(&shactx, buffer, bytes_read);
+ while (remaining > 0) {
+ nbytes = sizeof(HASH_BUFFER);
+ if (remaining < nbytes) {
+ nbytes = remaining;
+ }
+ ssize_t remaining_bytes_read
+ = pread(fd,
+ HASH_BUFFER,
+ nbytes,
+ remaining_offset);
+ if (remaining_bytes_read < 0) {
+ set_client_path_state_error(curr, "pread");
+ break;
+ }
+ if (remaining_bytes_read == 0) {
+ remaining = 0;
+ break;
+ }
+ sha256_update(&shactx, HASH_BUFFER, remaining_bytes_read);
+ remaining -= remaining_bytes_read;
+ remaining_offset += remaining_bytes_read;
+ bytes_read += remaining_bytes_read;
+ }
+
+ if (remaining == 0) {
+ sha256_final(&shactx, hash);
+
+ send_packet(STDOUT_FILENO,
+ TPT_OFFER_BLOCK,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_INT64,
+ (int64_t) st.st_mtime,
+ TPPT_INT64, file_offset,
+ TPPT_INT64, (int64_t) bytes_read,
+ TPPT_HASH, hash,
+ TPPT_DONE);
+ curr->cps_client_state = CS_OFFERED;
+ }
+ } else {
+ if (curr->cps_client_file_offset < 0) {
+ curr->cps_client_file_offset = 0;
+ }
+
+ send_packet(STDOUT_FILENO,
+ TPT_TAIL_BLOCK,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_INT64, (int64_t) st.st_mtime,
+ TPPT_INT64, curr->cps_client_file_offset,
+ TPPT_BITS, bytes_read, buffer,
+ TPPT_DONE);
+ curr->cps_client_file_offset += bytes_read;
+ curr->cps_client_state = CS_TAILING;
+ }
+ close(fd);
+
+ retval = 1;
+ }
+ } else if (curr->cps_client_state != CS_SYNCED) {
+ send_packet(STDOUT_FILENO,
+ TPT_SYNCED,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+ }
+ break;
+ }
+ case CS_OFFERED: {
+ // Still waiting for the client ack
+ break;
+ }
+ }
+
+ curr->cps_last_path_state = PS_OK;
+ } else if (S_ISDIR(st.st_mode)) {
+ DIR *dir = opendir(curr->cps_path);
+
+ if (dir == NULL) {
+ set_client_path_state_error(curr, "opendir");
+ } else {
+ struct list prev_children;
+ struct dirent *entry;
+ int changes = 0;
+
+ list_move(&prev_children, &curr->cps_children);
+ while ((entry = readdir(dir)) != NULL) {
+ if (strcmp(entry->d_name, ".") == 0 ||
+ strcmp(entry->d_name, "..") == 0) {
+ continue;
+ }
+
+ if (entry->d_type != DT_REG &&
+ entry->d_type != DT_LNK) {
+ continue;
+ }
+
+ char full_path[PATH_MAX];
+
+ snprintf(full_path, sizeof(full_path),
+ "%s/%s",
+ curr->cps_path, entry->d_name);
+
+ struct client_path_state *child = find_client_path_state(&prev_children, full_path);
+
+ if (child == NULL) {
+ // new file
+ fprintf(stderr, "info: monitoring child path: %s\n", full_path);
+ child = create_client_path_state(full_path);
+ changes += 1;
+ } else {
+ list_remove(&child->cps_node);
+ }
+ list_append(&curr->cps_children, &child->cps_node);
+ }
+ closedir(dir);
+
+ struct client_path_state *child;
+
+ while ((child = (struct client_path_state *) list_remove_head(
+ &prev_children)) != NULL) {
+ send_error(child, "deleted");
+ delete_client_path_state(child);
+ changes += 1;
+ }
+
+ retval += poll_paths(&curr->cps_children, root_cps);
+
+ if (changes) {
+ curr->cps_client_state = CS_INIT;
+ } else if (curr->cps_client_state != CS_SYNCED) {
+ send_packet(STDOUT_FILENO,
+ TPT_SYNCED,
+ TPPT_STRING, root_cps->cps_path,
+ TPPT_STRING, curr->cps_path,
+ TPPT_DONE);
+ curr->cps_client_state = CS_SYNCED;
+ }
+ }
+
+ curr->cps_last_path_state = PS_OK;
+ }
+
+ curr->cps_last_stat = st;
+
+ curr = (struct client_path_state *) curr->cps_node.n_succ;
+ }
+
+ fflush(stderr);
+
+ return retval;
+}
+
+static
+void send_possible_paths(const char *glob_path, int depth)
+{
+ glob_t gl;
+
+ memset(&gl, 0, sizeof(gl));
+ if (glob(glob_path, GLOB_MARK, NULL, &gl) == 0) {
+ for (size_t lpc = 0;
+ lpc < gl.gl_pathc;
+ lpc++) {
+ const char *child_path = gl.gl_pathv[lpc];
+ size_t child_len = strlen(gl.gl_pathv[lpc]);
+
+ send_packet(STDOUT_FILENO,
+ TPT_POSSIBLE_PATH,
+ TPPT_STRING, child_path,
+ TPPT_DONE);
+
+ if (depth == 0 && child_path[child_len - 1] == '/') {
+ char *child_copy = malloc(child_len + 2);
+
+ strcpy(child_copy, child_path);
+ strcat(child_copy, "*");
+ send_possible_paths(child_copy, depth + 1);
+ free(child_copy);
+ }
+ }
+ }
+
+ globfree(&gl);
+}
+
+static
+void handle_load_preview_request(const char *path, int64_t preview_id)
+{
+ struct stat st;
+
+ fprintf(stderr, "info: load preview request -- %lld\n", preview_id);
+ if (is_glob(path)) {
+ glob_t gl;
+
+ memset(&gl, 0, sizeof(gl));
+ if (glob(path, 0, NULL, &gl) != 0) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: cannot glob %s -- %s",
+ path,
+ strerror(errno));
+ send_preview_error(preview_id, path, msg);
+ } else {
+ char *bits = malloc(1024 * 1024);
+ int lpc, line_count = 10;
+
+ bits[0] = '\0';
+ for (lpc = 0;
+ line_count > 0 && lpc < gl.gl_pathc;
+ lpc++, line_count--) {
+ strcat(bits, gl.gl_pathv[lpc]);
+ strcat(bits, "\n");
+ }
+
+ if (lpc < gl.gl_pathc) {
+ strcat(bits, " ... and more! ...\n");
+ }
+
+ send_preview_data(preview_id, path, strlen(bits), bits);
+
+ globfree(&gl);
+ free(bits);
+ }
+ }
+ else if (stat(path, &st) == -1) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: cannot open %s -- %s",
+ path,
+ strerror(errno));
+ send_preview_error(preview_id, path, msg);
+ } else if (S_ISREG(st.st_mode)) {
+ size_t capacity = 1024 * 1024;
+ char *bits = malloc(capacity);
+ FILE *file;
+
+ if ((file = fopen(path, "r")) == NULL) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: cannot open %s -- %s",
+ path,
+ strerror(errno));
+ send_preview_error(preview_id, path, msg);
+ } else {
+ int line_count = 10;
+ size_t offset = 0;
+ char *line;
+
+ while (line_count &&
+ (capacity - offset) > 1024 &&
+ (line = fgets(&bits[offset], capacity - offset, file)) != NULL) {
+ offset += strlen(line);
+ line_count -= 1;
+ }
+
+ fclose(file);
+
+ send_preview_data(preview_id, path, offset, bits);
+ }
+ free(bits);
+ } else if (S_ISDIR(st.st_mode)) {
+ DIR *dir = opendir(path);
+
+ if (dir == NULL) {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: unable to open directory -- %s",
+ path);
+ send_preview_error(preview_id, path, msg);
+ } else {
+ char *bits = malloc(1024 * 1024);
+ struct dirent *entry;
+ int line_count = 10;
+
+ bits[0] = '\0';
+ while ((entry = readdir(dir)) != NULL) {
+ if (strcmp(entry->d_name, ".") == 0 ||
+ strcmp(entry->d_name, "..") == 0) {
+ continue;
+ }
+ if (entry->d_type != DT_REG &&
+ entry->d_type != DT_DIR) {
+ continue;
+ }
+ if (line_count == 1) {
+ strcat(bits, " ... and more! ...\n");
+ break;
+ }
+
+ strcat(bits, entry->d_name);
+ strcat(bits, "\n");
+
+ line_count -= 1;
+ }
+
+ closedir(dir);
+
+ send_preview_data(preview_id, path, strlen(bits), bits);
+
+ free(bits);
+ }
+ } else {
+ char msg[1024];
+
+ snprintf(msg, sizeof(msg),
+ "error: path is not a file or directory -- %s",
+ path);
+ send_preview_error(preview_id, path, msg);
+ }
+}
+
+static
+void handle_complete_path_request(const char *path)
+{
+ size_t path_len = strlen(path);
+ char *glob_path = malloc(path_len + 3);
+ struct stat st;
+
+ strcpy(glob_path, path);
+ fprintf(stderr, "complete path: %s\n", path);
+ if (path[path_len - 1] != '/' &&
+ stat(path, &st) == 0 &&
+ S_ISDIR(st.st_mode)) {
+ strcat(glob_path, "/");
+ }
+ if (path[path_len - 1] != '*') {
+ strcat(glob_path, "*");
+ }
+ fprintf(stderr, "complete glob path: %s\n", glob_path);
+ send_possible_paths(glob_path, 0);
+
+ free(glob_path);
+}
+
+int main(int argc, char *argv[])
+{
+ int done = 0, timeout = 0;
+ recv_state_t rstate = RS_PACKET_TYPE;
+
+ // No need to leave ourselves around
+ if (argc == 1) {
+ unlink(argv[0]);
+ }
+
+ list_init(&client_path_list);
+
+ {
+ FILE *unameFile = popen("uname -mrsv", "r");
+
+ if (unameFile != NULL) {
+ char buffer[1024];
+
+ fgets(buffer, sizeof(buffer), unameFile);
+ char *bufend = buffer + strlen(buffer) - 1;
+ while (isspace(*bufend)) {
+ bufend -= 1;
+ }
+ *bufend = '\0';
+ send_packet(STDOUT_FILENO,
+ TPT_ANNOUNCE,
+ TPPT_STRING, buffer,
+ TPPT_DONE);
+ pclose(unameFile);
+ }
+ }
+
+ while (!done) {
+ struct pollfd pfds[1];
+
+ pfds[0].fd = STDIN_FILENO;
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+
+ int ready_count = poll(pfds, 1, timeout);
+
+ if (ready_count) {
+ tailer_packet_type_t type;
+
+ assert(rstate == RS_PACKET_TYPE);
+ rstate = readall(rstate, STDIN_FILENO, &type, sizeof(type));
+ if (rstate == RS_ERROR) {
+ fprintf(stderr, "info: exiting...\n");
+ done = 1;
+ } else {
+ switch (type) {
+ case TPT_OPEN_PATH:
+ case TPT_CLOSE_PATH:
+ case TPT_LOAD_PREVIEW:
+ case TPT_COMPLETE_PATH: {
+ char *path = readstr(&rstate, STDIN_FILENO);
+ int64_t preview_id = 0;
+
+ if (type == TPT_LOAD_PREVIEW) {
+ if (readint64(&rstate, STDIN_FILENO, &preview_id) == -1) {
+ done = 1;
+ break;
+ }
+ }
+ if (path == NULL) {
+ fprintf(stderr, "error: unable to get path to open\n");
+ done = 1;
+ } else if (read_payload_type(&rstate, STDIN_FILENO) != TPPT_DONE) {
+ fprintf(stderr, "error: invalid open packet\n");
+ done = 1;
+ } else if (type == TPT_OPEN_PATH) {
+ struct client_path_state *cps;
+
+ cps = find_client_path_state(&client_path_list, path);
+ if (cps != NULL) {
+ fprintf(stderr, "warning: already monitoring -- %s\n", path);
+ } else {
+ cps = create_client_path_state(path);
+
+ fprintf(stderr, "info: monitoring path: %s\n", path);
+ list_append(&client_path_list, &cps->cps_node);
+ }
+ } else if (type == TPT_CLOSE_PATH) {
+ struct client_path_state *cps = find_client_path_state(&client_path_list, path);
+
+ if (cps == NULL) {
+ fprintf(stderr, "warning: path is not open: %s\n", path);
+ } else {
+ list_remove(&cps->cps_node);
+ delete_client_path_state(cps);
+ }
+ } else if (type == TPT_LOAD_PREVIEW) {
+ handle_load_preview_request(path, preview_id);
+ } else if (type == TPT_COMPLETE_PATH) {
+ handle_complete_path_request(path);
+ }
+
+ free(path);
+ break;
+ }
+ case TPT_ACK_BLOCK:
+ case TPT_NEED_BLOCK: {
+ char *path = readstr(&rstate, STDIN_FILENO);
+ int64_t ack_offset = 0, ack_len = 0, client_size = 0;
+
+ if (type == TPT_ACK_BLOCK &&
+ (readint64(&rstate, STDIN_FILENO, &ack_offset) == -1 ||
+ readint64(&rstate, STDIN_FILENO, &ack_len) == -1 ||
+ readint64(&rstate, STDIN_FILENO, &client_size) == -1)) {
+ done = 1;
+ break;
+ }
+
+ // fprintf(stderr, "info: block packet path: %s\n", path);
+ if (path == NULL) {
+ fprintf(stderr, "error: unable to get block path\n");
+ done = 1;
+ } else if (read_payload_type(&rstate, STDIN_FILENO) != TPPT_DONE) {
+ fprintf(stderr, "error: invalid block packet\n");
+ done = 1;
+ } else {
+ struct client_path_state *cps = find_client_path_state(&client_path_list, path);
+
+ if (cps == NULL) {
+ fprintf(stderr, "warning: unknown path in block packet: %s\n", path);
+ } else if (type == TPT_NEED_BLOCK) {
+ fprintf(stderr, "info: client is tailing: %s\n", path);
+ cps->cps_client_state = CS_TAILING;
+ } else if (type == TPT_ACK_BLOCK) {
+ fprintf(stderr,
+ "info: client acked: %s %lld\n",
+ path,
+ client_size);
+ if (ack_len == 0) {
+ cps->cps_client_state = CS_TAILING;
+ } else {
+ cps->cps_client_file_offset = ack_offset + ack_len;
+ cps->cps_client_state = CS_INIT;
+ cps->cps_client_file_size = client_size;
+ }
+ }
+ free(path);
+ }
+ break;
+ }
+ default: {
+ assert(0);
+ }
+ }
+ }
+ }
+
+ if (!done) {
+ if (poll_paths(&client_path_list, NULL)) {
+ timeout = 0;
+ } else {
+ timeout = 1000;
+ }
+ }
+ }
+
+ return EXIT_SUCCESS;
+}