summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/aws/flb_aws_credentials_process.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/aws/flb_aws_credentials_process.c')
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_process.c783
1 files changed, 783 insertions, 0 deletions
diff --git a/fluent-bit/src/aws/flb_aws_credentials_process.c b/fluent-bit/src/aws/flb_aws_credentials_process.c
new file mode 100644
index 000000000..44c024ca7
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_process.c
@@ -0,0 +1,783 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2021 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_aws_credentials.h>
+
+#include "flb_aws_credentials_log.h"
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pipe.h>
+#include <fluent-bit/flb_time.h>
+
+#include <fcntl.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/wait.h>
+
+#define DEV_NULL "/dev/null"
+
+#define MS_PER_SEC 1000
+#define MICROS_PER_MS 1000
+#define NS_PER_MS 1000000
+
+#define CREDENTIAL_PROCESS_TIMEOUT_MS 60000
+#define CREDENTIAL_PROCESS_BUFFER_SIZE 8 * 1024
+
+#define WAITPID_POLL_FREQUENCY_MS 20
+#define WAITPID_TIMEOUT_MS 10 * WAITPID_POLL_FREQUENCY_MS
+
+#define CREDENTIAL_PROCESS_RESPONSE_SESSION_TOKEN "SessionToken"
+
+/* Declarations */
+struct token_array;
+static int new_token_array(struct token_array *arr, int cap);
+static int append_token(struct token_array *arr, char* elem);
+
+struct readbuf;
+static int new_readbuf(struct readbuf* buf, int cap);
+
+static int get_monotonic_time(struct flb_time* tm);
+
+static char* ltrim(char* input);
+static int scan_credential_process_token_quoted(char *input);
+static int scan_credential_process_token_unquoted(char *input);
+static int credential_process_token_count(char* process);
+static int parse_credential_process_token(char **input, char** out_token);
+
+static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf);
+static int waitpid_timeout(char* name, pid_t pid, int* wstatus);
+
+struct process;
+static int new_process(struct process* p, char** args);
+static void exec_process_child(struct process* p);
+static int exec_process(struct process* p);
+static int read_from_process(struct process* p, struct readbuf* buf);
+static int wait_process(struct process* p);
+static void destroy_process(struct process* p);
+/* End Declarations */
+
+struct token_array {
+ char** tokens;
+ int len;
+ int cap;
+};
+
+/*
+ * Initializes a new token array with the given capacity.
+ * Returns 0 on success and < 0 on failure.
+ * The caller is responsible for calling `flb_free(arr->tokens)`.
+ */
+static int new_token_array(struct token_array *arr, int cap)
+{
+ *arr = (struct token_array) { .len = 0, .cap = cap };
+ arr->tokens = flb_malloc(cap * sizeof(char*));
+ if (!arr->tokens) {
+ flb_errno();
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Appends the given token to the array, if there is capacity.
+ * Returns 0 on success and < 0 on failure.
+ */
+static int append_token(struct token_array *arr, char* token)
+{
+ if (arr->len >= arr->cap) {
+ /* This means there is a bug in credential_process_token_count. */
+ AWS_CREDS_ERROR("append_token called on full token_array");
+ return -1;
+ }
+
+ (arr->tokens)[arr->len] = token;
+ arr->len++;
+ return 0;
+}
+
+struct readbuf {
+ char* buf;
+ int len;
+ int cap;
+};
+
+/*
+ * Initializes a new buffer with the given capacity.
+ * Returns 0 on success and < 0 on failure.
+ * The caller is responsible for calling `flb_free(buf->buf)`.
+ */
+static int new_readbuf(struct readbuf* buf, int cap)
+{
+ *buf = (struct readbuf) { .len = 0, .cap = cap };
+ buf->buf = flb_malloc(cap * sizeof(char));
+ if (!buf->buf) {
+ flb_errno();
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Fetches the current time from the monotonic clock.
+ * Returns 0 on success and < 0 on failure.
+ * This is useful for calculating deadlines that are not sensitive to changes
+ * in the system clock.
+ */
+static int get_monotonic_time(struct flb_time* tm)
+{
+ struct timespec ts;
+ if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) {
+ flb_errno();
+ return -1;
+ }
+ flb_time_set(tm, ts.tv_sec, ts.tv_nsec);
+ return 0;
+}
+
+/*
+ * Skips over any leading spaces in the input string, returning the remainder.
+ * If the entire string is consumed, returns the empty string (not NULL).
+ */
+static char* ltrim(char* input)
+{
+ while (*input == ' ') {
+ input++;
+ }
+ return input;
+}
+
+/*
+ * Scans the unquoted token string at the start of the input string.
+ * The input must be the start of an unquoted token.
+ * Returns the token length on success, and < 0 on failure.
+ * This function does not add a null terminator to the token.
+ * The token length is the index where the null terminator must be placed.
+ * If the entire input is consumed, returns the length of the input string
+ * (excluding the null terminator).
+ */
+static int scan_credential_process_token_unquoted(char *input)
+{
+ int i;
+
+ for (i = 0; input[i] != ' '; i++) {
+ if (input[i] == '\0') {
+ break;
+ }
+ if (input[i] == '"') {
+ AWS_CREDS_ERROR("unexpected quote in credential_process");
+ return -1;
+ }
+ }
+
+ return i;
+}
+
+/*
+ * Scans the quoted token at the start of the input string.
+ * The input must be the string after the opening quote.
+ * Returns the token length on success, and < 0 on failure.
+ * This function does not add a null terminator to the token.
+ * The token length is the index where the null terminator must be placed.
+ */
+static int scan_credential_process_token_quoted(char *input)
+{
+ int i;
+
+ for (i = 0; input[i] != '"'; i++) {
+ if (input[i] == '\0') {
+ AWS_CREDS_ERROR("unterminated quote in credential_process");
+ return -1;
+ }
+ }
+
+ if (input[i+1] != '\0' && input[i+1] != ' ') {
+ AWS_CREDS_ERROR("unexpected character %c after closing quote in "
+ "credential_process", input[i+1]);
+ return -1;
+ }
+
+ return i;
+}
+
+/*
+ * Counts the number of tokens in the input string, which is assumed to be the
+ * credential_process from the config file.
+ * Returns < 0 on failure.
+ */
+static int credential_process_token_count(char* process)
+{
+ int count = 0;
+ int i;
+
+ while (1) {
+ process = ltrim(process);
+ if (*process == '\0') {
+ break;
+ }
+
+ count++;
+
+ if (*process == '"') {
+ process++;
+ i = scan_credential_process_token_quoted(process);
+ }
+ else {
+ i = scan_credential_process_token_unquoted(process);
+ }
+
+ if (i < 0) {
+ return -1;
+ }
+
+ process += i;
+ if (*process != '\0') {
+ process++;
+ }
+ }
+
+ return count;
+}
+
+/*
+ * Parses the input string, which is assumed to be the credential_process
+ * from the config file. The next token will be put in *out_token, and the
+ * remaining unprocessed input will be put in *input.
+ * Returns 0 on success and < 0 on failure.
+ * If there is an error, the value of *input and *out_token is not defined.
+ * If it succeeds and *out_token is NULL, then there are no more tokens,
+ * and this function should not be called again.
+ * *out_token will be some substring of the original *input, so it should not
+ * be freed.
+ */
+static int parse_credential_process_token(char** input, char** out_token)
+{
+ *out_token = NULL;
+ int i;
+
+ if (!*input) {
+ AWS_CREDS_ERROR("parse_credential_process_token called after yielding last token");
+ return -1;
+ }
+
+ *input = ltrim(*input);
+
+ if (**input == '\0') {
+ *input = NULL;
+ *out_token = NULL;
+ return 0;
+ }
+
+ if (**input == '"') {
+ (*input)++;
+ i = scan_credential_process_token_quoted(*input);
+ }
+ else {
+ i = scan_credential_process_token_unquoted(*input);
+ }
+
+ if (i < 0) {
+ return -1;
+ }
+
+ *out_token = *input;
+ *input += i;
+
+ if (**input != '\0') {
+ **input = '\0';
+ (*input)++;
+ }
+
+ return 0;
+}
+
+/* See <fluent-bit/flb_aws_credentials.h>. */
+char** parse_credential_process(char* input)
+{
+ char* next_token = NULL;
+ struct token_array arr = { 0 };
+ int token_count = credential_process_token_count(input);
+
+ if (token_count < 0) {
+ goto error;
+ }
+
+ /* Add one extra capacity for the NULL terminator. */
+ if (new_token_array(&arr, token_count + 1) < 0) {
+ goto error;
+ }
+
+ while (1) {
+ if (parse_credential_process_token(&input, &next_token) < 0) {
+ goto error;
+ }
+
+ if (!next_token) {
+ break;
+ }
+
+ if (append_token(&arr, next_token) < 0) {
+ goto error;
+ }
+ }
+
+ if (append_token(&arr, NULL) < 0) {
+ goto error;
+ }
+
+ return arr.tokens;
+
+error:
+ flb_free(arr.tokens);
+ return NULL;
+}
+
+/*
+ * Reads from the pipe into the buffer until no more input is available.
+ * If the input is exhausted (EOF), returns 0.
+ * If reading would block (EWOULDBLOCK/EAGAIN), returns > 0.
+ * If an error occurs or the buffer is full, returns < 0.
+ */
+static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf)
+{
+ int result = -1;
+
+ while (1) {
+ if (buf->len >= buf->cap) {
+ AWS_CREDS_ERROR("credential_process %s exceeded max buffer size", name);
+ return -1;
+ }
+
+ result = flb_pipe_r(fd, buf->buf + buf->len, buf->cap - buf->len);
+ if (result < 0) {
+ if (FLB_PIPE_WOULDBLOCK()) {
+ return 1;
+ }
+ flb_errno();
+ return -1;
+ }
+ else if (result == 0) { /* EOF */
+ return 0;
+ }
+ else {
+ buf->len += result;
+ }
+ }
+}
+
+/*
+ * Polls waitpid until the given process exits, or the timeout is reached.
+ * Returns 0 on success and < 0 on failure.
+ */
+static int waitpid_timeout(char* name, pid_t pid, int* wstatus)
+{
+ int result = -1;
+ int retries = WAITPID_TIMEOUT_MS / WAITPID_POLL_FREQUENCY_MS;
+
+ while (1) {
+ result = waitpid(pid, wstatus, WNOHANG);
+ if (result < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (result > 0) {
+ return 0;
+ }
+
+ if (retries <= 0) {
+ AWS_CREDS_ERROR("timed out waiting for credential_process %s to exit", name);
+ return -1;
+ }
+ retries--;
+
+ usleep(WAITPID_POLL_FREQUENCY_MS * MICROS_PER_MS);
+ }
+}
+
+struct process {
+ int initialized;
+ char** args;
+ int stdin_stream;
+ flb_pipefd_t stdout_stream[2];
+ int stderr_stream;
+ pid_t pid;
+};
+
+/*
+ * Initializes a new process with the given args.
+ * args is assumed to be a NULL terminated array, for use with execvp.
+ * It must have a least one element, and the first element is assumed to be the
+ * name/path of the executable.
+ * Returns 0 on success and < 0 on failure.
+ * The caller is responsible for calling `destroy_process(p)`.
+ */
+static int new_process(struct process* p, char** args)
+{
+ *p = (struct process) {
+ .initialized = FLB_TRUE,
+ .args = args,
+ .stdin_stream = -1,
+ .stdout_stream = {-1, -1},
+ .stderr_stream = -1,
+ .pid = -1,
+ };
+
+ while ((p->stdin_stream = open(DEV_NULL, O_RDONLY|O_CLOEXEC)) < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ if (flb_pipe_create(p->stdout_stream) < 0) {;
+ flb_errno();
+ return -1;
+ }
+
+ if (fcntl(p->stdout_stream[0], F_SETFL, O_CLOEXEC) < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (fcntl(p->stdout_stream[1], F_SETFL, O_CLOEXEC) < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ while ((p->stderr_stream = open(DEV_NULL, O_WRONLY|O_CLOEXEC)) < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Sets up the credential_process's stdin, stdout, and stderr, and exec's
+ * the actual process.
+ * For this function to return at all is an error.
+ * This function should not be called more than once.
+ */
+static void exec_process_child(struct process* p)
+{
+ while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) {
+ if (errno != EINTR) {
+ return;
+ }
+ }
+ while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) {
+ if (errno != EINTR) {
+ return;
+ }
+ }
+ while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) {
+ if (errno != EINTR) {
+ return;
+ }
+ }
+
+ close(p->stdin_stream);
+ flb_pipe_close(p->stdout_stream[0]);
+ flb_pipe_close(p->stdout_stream[1]);
+ close(p->stderr_stream);
+
+ execvp(p->args[0], p->args);
+}
+
+/*
+ * Forks the credential_process, but does not wait for it to finish.
+ * Returns 0 on success and < 0 on failure.
+ * This function should not be called more than once.
+ */
+static int exec_process(struct process* p)
+{
+ AWS_CREDS_DEBUG("executing credential_process %s", p->args[0]);
+
+ p->pid = fork();
+ if (p->pid < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (p->pid == 0) {
+ exec_process_child(p);
+
+ /* It should not be possible to reach this under normal circumstances. */
+ exit(EXIT_FAILURE);
+ }
+
+ close(p->stdin_stream);
+ p->stdin_stream = -1;
+
+ flb_pipe_close(p->stdout_stream[1]);
+ p->stdout_stream[1] = -1;
+
+ close(p->stderr_stream);
+ p->stderr_stream = -1;
+
+ return 0;
+}
+
+/*
+ * Reads from the credential_process's stdout into the given buffer.
+ * Returns 0 on success, and < 0 on failure or timeout.
+ * This function should not be called more than once.
+ */
+static int read_from_process(struct process* p, struct readbuf* buf)
+{
+ int result = -1;
+ struct pollfd pfd;
+ struct flb_time start, timeout, deadline, now, remaining;
+ int remaining_ms;
+
+ if (fcntl(p->stdout_stream[0], F_SETFL, O_NONBLOCK) < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (get_monotonic_time(&start) < 0) {
+ return -1;
+ }
+
+ flb_time_set(&timeout,
+ (time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC),
+ ((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS);
+
+ /* deadline = start + timeout */
+ flb_time_add(&start, &timeout, &deadline);
+
+ while (1) {
+ pfd = (struct pollfd) {
+ .fd = p->stdout_stream[0],
+ .events = POLLIN,
+ };
+
+ if (get_monotonic_time(&now) < 0) {
+ return -1;
+ }
+
+ /* remaining = deadline - now */
+ if (flb_time_diff(&deadline, &now, &remaining) < 0) {
+ AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]);
+ return -1;
+ }
+
+ /*
+ * poll uses millisecond resolution for the timeout.
+ * If there is less than a millisecond left, then for simplicity we'll just
+ * declare that it timed out.
+ */
+ remaining_ms = (int) (flb_time_to_nanosec(&remaining) / NS_PER_MS);
+ if (remaining_ms <= 0) {
+ AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]);
+ return -1;
+ }
+
+ result = poll(&pfd, 1, remaining_ms);
+ if (result < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ return -1;
+ }
+ continue;
+ }
+
+ if (result == 0) {
+ AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]);
+ return -1;
+ }
+
+ if ((pfd.revents & POLLNVAL) == POLLNVAL) {
+ AWS_CREDS_ERROR("credential_process %s POLLNVAL", p->args[0]);
+ return -1;
+ }
+
+ if ((pfd.revents & POLLERR) == POLLERR) {
+ AWS_CREDS_ERROR("credential_process %s POLLERR", p->args[0]);
+ return -1;
+ }
+
+ if ((pfd.revents & POLLIN) == POLLIN || (pfd.revents & POLLHUP) == POLLHUP) {
+ result = read_until_block(p->args[0], p->stdout_stream[0], buf);
+ if (result <= 0) {
+ return result;
+ }
+ }
+ }
+}
+
+/*
+ * Waits for the process to exit, up to a timeout.
+ * Returns 0 on success and < 0 on failure.
+ * This function should not be called more than once.
+ */
+static int wait_process(struct process* p)
+{
+ int wstatus;
+
+ if (waitpid_timeout(p->args[0], p->pid, &wstatus) < 0) {
+ return -1;
+ }
+ p->pid = -1;
+
+ if (!WIFEXITED(wstatus)) {
+ AWS_CREDS_ERROR("credential_process %s did not terminate normally", p->args[0]);
+ return -1;
+ }
+
+ if (WEXITSTATUS(wstatus) != EXIT_SUCCESS) {
+ AWS_CREDS_ERROR("credential_process %s exited with status %d", p->args[0],
+ WEXITSTATUS(wstatus));
+ return -1;
+ }
+
+ AWS_CREDS_DEBUG("credential_process %s exited successfully", p->args[0]);
+ return 0;
+}
+
+/*
+ * Release all resources associated with this process.
+ * Calling this function multiple times is a no-op.
+ * Since the process does not own p->args, it does not free it.
+ * Note that p->args will be set to NULL, so the caller must hold onto
+ * it separately in order to free it.
+ */
+static void destroy_process(struct process* p)
+{
+ if (p->initialized) {
+ if (p->stdin_stream >= 0) {
+ close(p->stdin_stream);
+ p->stdin_stream = -1;
+ }
+ if (p->stdout_stream[0] >= 0) {
+ close(p->stdout_stream[0]);
+ p->stdout_stream[0] = -1;
+ }
+ if (p->stdout_stream[1] >= 0) {
+ close(p->stdout_stream[1]);
+ p->stdout_stream[1] = -1;
+ }
+ if (p->stderr_stream >= 0) {
+ close(p->stderr_stream);
+ p->stderr_stream = -1;
+ }
+
+ if (p->pid > 0) {
+ if (kill(p->pid, SIGKILL) < 0) {
+ flb_errno();
+ AWS_CREDS_ERROR("could not kill credential_process %s (pid=%d) "
+ "during cleanup", p->args[0], p->pid);
+ }
+ else {
+ while (waitpid(p->pid, NULL, 0) < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ break;
+ }
+ }
+ }
+ p->pid = -1;
+ }
+
+ p->args = NULL;
+
+ p->initialized = FLB_FALSE;
+ }
+}
+
+/* See <fluent-bit/flb_aws_credentials.h>. */
+int exec_credential_process(char* process, struct flb_aws_credentials** creds,
+ time_t* expiration)
+{
+ char** args = NULL;
+ int result = -1;
+ struct process p = { 0 };
+ struct readbuf buf = { 0 };
+ *creds = NULL;
+ *expiration = 0;
+
+ args = parse_credential_process(process);
+ if (!args) {
+ result = -1;
+ goto end;
+ }
+
+ if (!args[0]) {
+ AWS_CREDS_ERROR("invalid credential_process");
+ result = -1;
+ goto end;
+ }
+
+ if (new_process(&p, args) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (new_readbuf(&buf, CREDENTIAL_PROCESS_BUFFER_SIZE) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (exec_process(&p) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (read_from_process(&p, &buf) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (wait_process(&p) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ *creds = flb_parse_json_credentials(buf.buf, buf.len,
+ CREDENTIAL_PROCESS_RESPONSE_SESSION_TOKEN,
+ expiration);
+ if (!*creds) {
+ AWS_CREDS_ERROR("could not parse credentials from credential_process %s", args[0]);
+ result = -1;
+ goto end;
+ }
+
+ AWS_CREDS_DEBUG("successfully parsed credentials from credential_process %s", args[0]);
+
+ result = 0;
+
+end:
+ destroy_process(&p);
+
+ flb_free(buf.buf);
+ buf.buf = NULL;
+
+ flb_free(args);
+ args = NULL;
+
+ if (result < 0) {
+ flb_aws_credentials_destroy(*creds);
+ *creds = NULL;
+ }
+
+ return result;
+}