diff options
Diffstat (limited to 'fluent-bit/plugins/out_stackdriver/stackdriver_conf.c')
-rw-r--r-- | fluent-bit/plugins/out_stackdriver/stackdriver_conf.c | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c b/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c new file mode 100644 index 000000000..9f3f28a35 --- /dev/null +++ b/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c @@ -0,0 +1,667 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_unescape.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_jsmn.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_kv.h> + +#include <sys/types.h> +#include <sys/stat.h> + +#include "gce_metadata.h" +#include "stackdriver.h" +#include "stackdriver_conf.h" +#include "stackdriver_resource_types.h" + +static inline int key_cmp(const char *str, int len, const char *cmp) { + + if (strlen(cmp) != len) { + return -1; + } + + return strncasecmp(str, cmp, len); +} + +static int read_credentials_file(const char *cred_file, struct flb_stackdriver *ctx) +{ + int i; + int ret; + int key_len; + int val_len; + int tok_size = 32; + char *buf; + char *key; + char *val; + flb_sds_t tmp; + struct stat st; + jsmn_parser parser; + jsmntok_t *t; + jsmntok_t *tokens; + + /* Validate credentials path */ + ret = stat(cred_file, &st); + if (ret == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot open credentials file: %s", + cred_file); + return -1; + } + + if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) { + flb_plg_error(ctx->ins, "credentials file " + "is not a valid file: %s", cred_file); + return -1; + } + + /* Read file content */ + buf = mk_file_to_buffer(cred_file); + if (!buf) { + flb_plg_error(ctx->ins, "error reading credentials file: %s", + cred_file); + return -1; + } + + /* Parse content */ + jsmn_init(&parser); + tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + if (!tokens) { + flb_errno(); + flb_free(buf); + return -1; + } + + ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size); + if (ret <= 0) { + flb_plg_error(ctx->ins, "invalid JSON credentials file: %s", + cred_file); + flb_free(buf); + flb_free(tokens); + return -1; + } + + t = &tokens[0]; + if (t->type != JSMN_OBJECT) { + flb_plg_error(ctx->ins, "invalid JSON map on file: %s", + cred_file); + flb_free(buf); + flb_free(tokens); + return -1; + } + + /* Parse JSON tokens */ + for (i = 1; i < ret; i++) { + t = &tokens[i]; + if (t->type != JSMN_STRING) { + continue; + } + + if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){ + break; + } + + /* Key */ + key = buf + t->start; + key_len = (t->end - t->start); + + /* Value */ + i++; + t = &tokens[i]; + val = buf + t->start; + val_len = (t->end - t->start); + + if (key_cmp(key, key_len, "type") == 0) { + ctx->creds->type = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "project_id") == 0) { + ctx->project_id = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "private_key_id") == 0) { + ctx->creds->private_key_id = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "private_key") == 0) { + tmp = flb_sds_create_len(val, val_len); + if (tmp) { + /* Unescape private key */ + ctx->creds->private_key = flb_sds_create_size(val_len); + flb_unescape_string(tmp, flb_sds_len(tmp), + &ctx->creds->private_key); + flb_sds_destroy(tmp); + } + } + else if (key_cmp(key, key_len, "client_email") == 0) { + ctx->creds->client_email = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "client_id") == 0) { + ctx->creds->client_id = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "auth_uri") == 0) { + ctx->creds->auth_uri = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "token_uri") == 0) { + ctx->creds->token_uri = flb_sds_create_len(val, val_len); + } + } + + flb_free(buf); + flb_free(tokens); + + return 0; +} + +/* + * parse_key_value_list(): + * - Parses an origin list of comma seperated string specifying key=value. + * - Appends the parsed key value pairs into the destination list. + * - Returns the length of the destination list. + */ +static int parse_key_value_list(struct flb_stackdriver *ctx, + struct mk_list *origin, + struct mk_list *dest, + int shouldTrim) +{ + char *p; + flb_sds_t key; + flb_sds_t val; + struct flb_kv *kv; + struct mk_list *head; + struct flb_slist_entry *entry; + + if (origin) { + mk_list_foreach(head, origin) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + + p = strchr(entry->str, '='); + if (!p) { + flb_plg_error(ctx->ins, "invalid key value pair on '%s'", + entry->str); + return -1; + } + + key = flb_sds_create_size((p - entry->str) + 1); + flb_sds_cat(key, entry->str, p - entry->str); + val = flb_sds_create(p + 1); + if (shouldTrim) { + flb_sds_trim(key); + flb_sds_trim(val); + } + if (!key || flb_sds_len(key) == 0) { + flb_plg_error(ctx->ins, + "invalid key value pair on '%s'", + entry->str); + return -1; + } + if (!val || flb_sds_len(val) == 0) { + flb_plg_error(ctx->ins, + "invalid key value pair on '%s'", + entry->str); + flb_sds_destroy(key); + return -1; + } + + kv = flb_kv_item_create(dest, key, val); + flb_sds_destroy(key); + flb_sds_destroy(val); + + if (!kv) { + return -1; + } + } + } + + return mk_list_size(dest); +} + +/* + * parse_configuration_labels + * - Parse labels set in configuration + * - Returns the number of configuration labels + */ +static int parse_configuration_labels(struct flb_stackdriver *ctx) +{ + return parse_key_value_list(ctx, ctx->labels, + &ctx->config_labels, FLB_FALSE); +} + +/* + * parse_resource_labels(): + * - Parses resource labels set in configuration. + * - Returns the number of resource label mappings. + */ +static int parse_resource_labels(struct flb_stackdriver *ctx) +{ + return parse_key_value_list(ctx, ctx->resource_labels, + &ctx->resource_labels_kvs, FLB_TRUE); +} + +struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + const char *backwards_compatible_env_var; + struct flb_stackdriver *ctx; + size_t http_request_key_size; + + /* Allocate config context */ + ctx = flb_calloc(1, sizeof(struct flb_stackdriver)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + ctx->config = config; + + ret = flb_output_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to load configuration"); + flb_free(ctx); + return NULL; + } + + /* Compress (gzip) */ + tmp = flb_output_get_property("compress", ins); + ctx->compress_gzip = FLB_FALSE; + if (tmp && strcasecmp(tmp, "gzip") == 0) { + ctx->compress_gzip = FLB_TRUE; + } + + /* labels */ + flb_kv_init(&ctx->config_labels); + ret = parse_configuration_labels((void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to parse configuration labels"); + flb_kv_release(&ctx->config_labels); + flb_free(ctx); + return NULL; + } + + /* resource labels */ + flb_kv_init(&ctx->resource_labels_kvs); + ret = parse_resource_labels((void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to parse resource label list"); + flb_kv_release(&ctx->resource_labels_kvs); + flb_free(ctx); + return NULL; + } + + /* Lookup metadata server URL */ + ctx->metadata_server = NULL; + tmp = flb_output_get_property("metadata_server", ins); + if (tmp == NULL) { + tmp = getenv("METADATA_SERVER"); + if(tmp) { + if (ctx->env == NULL) { + ctx->env = flb_calloc(1, sizeof(struct flb_stackdriver_env)); + if (ctx->env == NULL) { + flb_plg_error(ins, "unable to allocate env variables"); + flb_free(ctx); + return NULL; + } + } + ctx->env->metadata_server = flb_sds_create(tmp); + ctx->metadata_server = ctx->env->metadata_server; + } + else { + ctx->metadata_server = flb_sds_create(FLB_STD_METADATA_SERVER); + } + } + else { + ctx->metadata_server = flb_sds_create(tmp); + } + flb_plg_info(ctx->ins, "metadata_server set to %s", ctx->metadata_server); + + /* Lookup credentials file */ + if (ctx->credentials_file == NULL) { + /* + * Use GOOGLE_APPLICATION_CREDENTIALS to fetch the credentials. + * GOOGLE_SERVICE_CREDENTIALS is checked for backwards compatibility. + */ + tmp = getenv("GOOGLE_APPLICATION_CREDENTIALS"); + backwards_compatible_env_var = getenv("GOOGLE_SERVICE_CREDENTIALS"); + if (tmp && backwards_compatible_env_var) { + flb_plg_warn(ctx->ins, "GOOGLE_APPLICATION_CREDENTIALS and " + "GOOGLE_SERVICE_CREDENTIALS are both defined. " + "Defaulting to GOOGLE_APPLICATION_CREDENTIALS"); + } + if ((tmp || backwards_compatible_env_var) && (ctx->env == NULL)) { + ctx->env = flb_calloc(1, sizeof(struct flb_stackdriver_env)); + if (ctx->env == NULL) { + flb_plg_error(ins, "unable to allocate env variables"); + flb_free(ctx); + return NULL; + } + } + if (tmp) { + ctx->env->creds_file = flb_sds_create(tmp); + ctx->credentials_file = ctx->env->creds_file; + } + else if (backwards_compatible_env_var) { + ctx->env->creds_file = flb_sds_create(backwards_compatible_env_var); + ctx->credentials_file = ctx->env->creds_file; + } + } + + if (ctx->credentials_file) { + ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials)); + if (ctx->creds == NULL) { + flb_plg_error(ctx->ins, "unable to allocate credentials"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + ret = read_credentials_file(ctx->credentials_file, ctx); + if (ret != 0) { + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + ctx->type = ctx->creds->type; + ctx->private_key_id = ctx->creds->private_key_id; + ctx->private_key = ctx->creds->private_key; + ctx->client_email = ctx->creds->client_email; + ctx->client_id = ctx->creds->client_id; + ctx->auth_uri = ctx->creds->auth_uri; + ctx->token_uri = ctx->creds->token_uri; + } + else { + /* + * If no credentials file has been defined, do manual lookup of the + * client email and the private key + */ + ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials)); + if (ctx->creds == NULL) { + flb_plg_error(ctx->ins, "unable to allocate credentials"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + + /* Service Account Email */ + if (ctx->client_email == NULL) { + tmp = getenv("SERVICE_ACCOUNT_EMAIL"); + if (tmp) { + ctx->creds->client_email = flb_sds_create(tmp); + } + } + + /* Service Account Secret */ + if (ctx->private_key == NULL) { + tmp = getenv("SERVICE_ACCOUNT_SECRET"); + if (tmp) { + ctx->creds->private_key = flb_sds_create(tmp); + } + } + + ctx->private_key = ctx->creds->private_key; + ctx->client_email = ctx->creds->client_email; + } + + /* + * If only client email has been provided, fetch token from + * the GCE metadata server. + * + * If no credentials have been provided, fetch token from the GCE + * metadata server for default account. + */ + if (!ctx->client_email && ctx->private_key) { + flb_plg_error(ctx->ins, "client_email is not defined"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + + if (!ctx->client_email) { + flb_plg_warn(ctx->ins, "client_email is not defined, using " + "a default one"); + if (ctx->creds == NULL) { + ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials)); + if (ctx->creds == NULL) { + flb_plg_error(ctx->ins, "unable to allocate credentials"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + } + ctx->creds->client_email = flb_sds_create("default"); + ctx->client_email = ctx->creds->client_email; + } + if (!ctx->private_key) { + flb_plg_warn(ctx->ins, "private_key is not defined, fetching " + "it from metadata server"); + ctx->metadata_server_auth = true; + } + + if (ctx->http_request_key) { + http_request_key_size = flb_sds_len(ctx->http_request_key); + if (http_request_key_size >= INT_MAX) { + flb_plg_error(ctx->ins, "http_request_key is too long"); + flb_sds_destroy(ctx->http_request_key); + ctx->http_request_key = NULL; + ctx->http_request_key_size = 0; + } else { + ctx->http_request_key_size = http_request_key_size; + } + } + + set_resource_type(ctx); + + if (resource_api_has_required_labels(ctx) == FLB_FALSE) { + + if (ctx->resource_type == RESOURCE_TYPE_K8S) { + if (!ctx->cluster_name || !ctx->cluster_location) { + flb_plg_error(ctx->ins, "missing k8s_cluster_name " + "or k8s_cluster_location in configuration"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + } + + else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE + || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) { + + if (ctx->location == NULL) { + flb_plg_error(ctx->ins, "missing generic resource's location"); + } + + if (ctx->namespace_id == NULL) { + flb_plg_error(ctx->ins, "missing generic resource's namespace"); + } + + if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE) { + if (ctx->node_id == NULL) { + flb_plg_error(ctx->ins, "missing generic_node's node_id"); + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + } + else { + if (ctx->job == NULL) { + flb_plg_error(ctx->ins, "missing generic_task's job"); + } + + if (ctx->task_id == NULL) { + flb_plg_error(ctx->ins, "missing generic_task's task_id"); + } + + if (!ctx->job || !ctx->task_id) { + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + } + + if (!ctx->location || !ctx->namespace_id) { + flb_stackdriver_conf_destroy(ctx); + return NULL; + } + } + } + + + if (ctx->tag_prefix == NULL && ctx->resource_type == RESOURCE_TYPE_K8S) { + /* allocate the flb_sds_t to tag_prefix_k8s so we can safely deallocate it */ + ctx->tag_prefix_k8s = flb_sds_create(ctx->resource); + ctx->tag_prefix_k8s = flb_sds_cat(ctx->tag_prefix_k8s, ".", 1); + ctx->tag_prefix = ctx->tag_prefix_k8s; + } + + /* Register metrics */ +#ifdef FLB_HAVE_METRICS + ctx->cmt_successful_requests = cmt_counter_create(ins->cmt, + "fluentbit", + "stackdriver", + "successful_requests", + "Total number of successful " + "requests.", + 1, (char *[]) {"name"}); + + ctx->cmt_failed_requests = cmt_counter_create(ins->cmt, + "fluentbit", + "stackdriver", + "failed_requests", + "Total number of failed " + "requests.", + 1, (char *[]) {"name"}); + + ctx->cmt_requests_total = cmt_counter_create(ins->cmt, + "fluentbit", + "stackdriver", + "requests_total", + "Total number of requests.", + 2, (char *[]) {"status", "name"}); + + ctx->cmt_proc_records_total = cmt_counter_create(ins->cmt, + "fluentbit", + "stackdriver", + "proc_records_total", + "Total number of processed records.", + 2, (char *[]) {"status", "name"}); + + ctx->cmt_retried_records_total = cmt_counter_create(ins->cmt, + "fluentbit", + "stackdriver", + "retried_records_total", + "Total number of retried records.", + 2, (char *[]) {"status", "name"}); + + /* OLD api */ + flb_metrics_add(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, + "stackdriver_successful_requests", ctx->ins->metrics); + flb_metrics_add(FLB_STACKDRIVER_FAILED_REQUESTS, + "stackdriver_failed_requests", ctx->ins->metrics); +#endif + + return ctx; +} + +int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) +{ + if (!ctx) { + return -1; + } + + if (ctx->creds) { + if (ctx->creds->type) { + flb_sds_destroy(ctx->creds->type); + } + if (ctx->creds->private_key_id) { + flb_sds_destroy(ctx->creds->private_key_id); + } + if (ctx->creds->private_key) { + flb_sds_destroy(ctx->creds->private_key); + } + if (ctx->creds->client_email) { + flb_sds_destroy(ctx->creds->client_email); + } + if (ctx->creds->client_id) { + flb_sds_destroy(ctx->creds->client_id); + } + if (ctx->creds->auth_uri) { + flb_sds_destroy(ctx->creds->auth_uri); + } + if (ctx->creds->token_uri) { + flb_sds_destroy(ctx->creds->token_uri); + } + flb_free(ctx->creds); + } + + if (ctx->env) { + if (ctx->env->creds_file) { + flb_sds_destroy(ctx->env->creds_file); + } + if (ctx->env->metadata_server) { + flb_sds_destroy(ctx->env->metadata_server); + /* + * If ctx->env is not NULL, + * ctx->metadata_server points ctx->env->metadata_server. + * + * We set ctx->metadata_server to NULL to prevent double free. + */ + ctx->metadata_server = NULL; + } + flb_free(ctx->env); + } + + if (ctx->metadata_server) { + flb_sds_destroy(ctx->metadata_server); + } + + if (ctx->resource_type == RESOURCE_TYPE_K8S){ + flb_sds_destroy(ctx->namespace_name); + flb_sds_destroy(ctx->pod_name); + flb_sds_destroy(ctx->container_name); + flb_sds_destroy(ctx->node_name); + flb_sds_destroy(ctx->local_resource_id); + } + + if (ctx->metadata_server_auth) { + flb_sds_destroy(ctx->zone); + flb_sds_destroy(ctx->instance_id); + } + + if (ctx->metadata_u) { + flb_upstream_destroy(ctx->metadata_u); + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->o) { + flb_oauth2_destroy(ctx->o); + } + + if (ctx->regex) { + flb_regex_destroy(ctx->regex); + } + + if (ctx->project_id) { + flb_sds_destroy(ctx->project_id); + } + + if (ctx->tag_prefix_k8s) { + flb_sds_destroy(ctx->tag_prefix_k8s); + } + + flb_kv_release(&ctx->config_labels); + flb_kv_release(&ctx->resource_labels_kvs); + flb_free(ctx); + + return 0; +} |