diff options
Diffstat (limited to 'fluent-bit/plugins/custom_calyptia/calyptia.c')
-rw-r--r-- | fluent-bit/plugins/custom_calyptia/calyptia.c | 615 |
1 files changed, 615 insertions, 0 deletions
diff --git a/fluent-bit/plugins/custom_calyptia/calyptia.c b/fluent-bit/plugins/custom_calyptia/calyptia.c new file mode 100644 index 000000000..1cfbbd5ce --- /dev/null +++ b/fluent-bit/plugins/custom_calyptia/calyptia.c @@ -0,0 +1,615 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_custom_plugin.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_env.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_router.h> + +/* pipeline plugins */ +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_output.h> + +#include <fluent-bit/flb_hash.h> + +struct calyptia { + /* config map options */ + flb_sds_t api_key; + flb_sds_t store_path; + flb_sds_t cloud_host; + flb_sds_t cloud_port; + flb_sds_t machine_id; + +/* used for reporting chunk trace records. */ +#ifdef FLB_HAVE_CHUNK_TRACE + flb_sds_t pipeline_id; +#endif /* FLB_HAVE_CHUNK_TRACE */ + + int cloud_tls; + int cloud_tls_verify; + + /* config reader for 'add_label' */ + struct mk_list *add_labels; + + /* instances */ + struct flb_input_instance *i; + struct flb_output_instance *o; + struct flb_input_instance *fleet; + struct flb_custom_instance *ins; + + /* Fleet configuration */ + flb_sds_t fleet_id; /* fleet-id */ + flb_sds_t fleet_name; + flb_sds_t fleet_config_dir; /* fleet configuration directory */ + int fleet_interval_sec; + int fleet_interval_nsec; +}; + +/* + * Check if the key belongs to a sensitive data field, if so report it. We never + * share any sensitive data. + */ +static int is_sensitive_property(char *key) +{ + + if (strcasecmp(key, "password") == 0 || + strcasecmp(key, "passwd") == 0 || + strcasecmp(key, "user") == 0 || + strcasecmp(key, "http_user") == 0 || + strcasecmp(key, "http_passwd") == 0 || + strcasecmp(key, "shared_key") == 0 || + strcasecmp(key, "endpoint") == 0 || + strcasecmp(key, "apikey") == 0 || + strcasecmp(key, "private_key") == 0 || + strcasecmp(key, "service_account_secret") == 0 || + strcasecmp(key, "splunk_token") == 0 || + strcasecmp(key, "logdna_host") == 0 || + strcasecmp(key, "api_key") == 0 || + strcasecmp(key, "hostname") == 0 || + strcasecmp(key, "license_key") == 0 || + strcasecmp(key, "base_uri") == 0 || + strcasecmp(key, "api") == 0) { + + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static void pipeline_config_add_properties(flb_sds_t *buf, struct mk_list *props) +{ + struct mk_list *head; + struct flb_kv *kv; + + mk_list_foreach(head, props) { + kv = mk_list_entry(head, struct flb_kv, _head); + + if (kv->key != NULL && kv->val != NULL) { + flb_sds_printf(buf, " %s ", kv->key); + + if (is_sensitive_property(kv->key)) { + flb_sds_cat_safe(buf, "--redacted--", strlen("--redacted--")); + } + else { + flb_sds_cat_safe(buf, kv->val, strlen(kv->val)); + } + + flb_sds_cat_safe(buf, "\n", 1); + } + } +} + +flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx) +{ + char tmp[32]; + flb_sds_t buf; + struct mk_list *head; + struct flb_input_instance *i_ins; + struct flb_filter_instance *f_ins; + struct flb_output_instance *o_ins; + + buf = flb_sds_create_size(2048); + + if (!buf) { + return NULL; + } + + /* [INPUT] */ + mk_list_foreach(head, &ctx->inputs) { + i_ins = mk_list_entry(head, struct flb_input_instance, _head); + flb_sds_printf(&buf, "[INPUT]\n"); + flb_sds_printf(&buf, " name %s\n", i_ins->name); + + if (i_ins->alias) { + flb_sds_printf(&buf, " alias %s\n", i_ins->alias); + } + + if (i_ins->tag) { + flb_sds_printf(&buf, " tag %s\n", i_ins->tag); + } + + if (i_ins->mem_buf_limit > 0) { + flb_utils_bytes_to_human_readable_size(i_ins->mem_buf_limit, + tmp, sizeof(tmp) - 1); + flb_sds_printf(&buf, " mem_buf_limit %s\n", tmp); + } + + pipeline_config_add_properties(&buf, &i_ins->properties); + } + flb_sds_printf(&buf, "\n"); + + /* Config: [FILTER] */ + mk_list_foreach(head, &ctx->filters) { + f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + + flb_sds_printf(&buf, "[FILTER]\n"); + flb_sds_printf(&buf, " name %s\n", f_ins->name); + flb_sds_printf(&buf, " match %s\n", f_ins->match); + + pipeline_config_add_properties(&buf, &f_ins->properties); + } + flb_sds_printf(&buf, "\n"); + + /* Config: [OUTPUT] */ + mk_list_foreach(head, &ctx->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + flb_sds_printf(&buf, "[OUTPUT]\n"); + flb_sds_printf(&buf, " name %s\n", o_ins->name); + + if (o_ins->match) { + flb_sds_printf(&buf, " match %s\n", o_ins->match); + } + else { + flb_sds_printf(&buf, " match *\n"); + } + +#ifdef FLB_HAVE_TLS + if (o_ins->use_tls == FLB_TRUE) { + flb_sds_printf(&buf, " tls %s\n", o_ins->use_tls ? "on" : "off"); + flb_sds_printf(&buf, " tls.verify %s\n", + o_ins->tls_verify ? "on": "off"); + + if (o_ins->tls_ca_file) { + flb_sds_printf(&buf, " tls.ca_file %s\n", + o_ins->tls_ca_file); + } + + if (o_ins->tls_crt_file) { + flb_sds_printf(&buf, " tls.crt_file %s\n", + o_ins->tls_crt_file); + } + + if (o_ins->tls_key_file) { + flb_sds_printf(&buf, " tls.key_file %s\n", + o_ins->tls_key_file); + } + + if (o_ins->tls_key_passwd) { + flb_sds_printf(&buf, " tls.key_passwd --redacted--\n"); + } + } +#endif + + if (o_ins->retry_limit == FLB_OUT_RETRY_UNLIMITED) { + flb_sds_printf(&buf, " retry_limit no_limits\n"); + } + else if (o_ins->retry_limit == FLB_OUT_RETRY_NONE) { + flb_sds_printf(&buf, " retry_limit no_retries\n"); + } + else { + flb_sds_printf(&buf, " retry_limit %i\n", o_ins->retry_limit); + } + + if (o_ins->host.name) { + flb_sds_printf(&buf, " host --redacted--\n"); + } + + pipeline_config_add_properties(&buf, &o_ins->properties); + flb_sds_printf(&buf, "\n"); + } + + return buf; +} + +static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx) +{ + int ret; + struct flb_output_instance *cloud; + struct mk_list *head; + struct flb_slist_entry *key = NULL; + struct flb_slist_entry *val = NULL; + flb_sds_t label; + struct flb_config_map_val *mv; + + cloud = flb_output_new(config, "calyptia", ctx, FLB_FALSE); + + if (!cloud) { + flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector"); + flb_free(ctx); + return NULL; + } + + /* direct connect / routing */ + ret = flb_router_connect_direct(ctx->i, cloud); + + if (ret != 0) { + flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector"); + flb_free(ctx); + return NULL; + } + + if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) { + + /* iterate all 'add_label' definitions */ + flb_config_map_foreach(head, mv, ctx->add_labels) { + key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + label = flb_sds_create_size(strlen(key->str) + strlen(val->str) + 1); + + if (!label) { + flb_free(ctx); + return NULL; + } + + flb_sds_printf(&label, "%s %s", key->str, val->str); + flb_output_set_property(cloud, "add_label", label); + flb_sds_destroy(label); + } + } + + flb_output_set_property(cloud, "match", "_calyptia_cloud"); + flb_output_set_property(cloud, "api_key", ctx->api_key); + + if (ctx->store_path) { + flb_output_set_property(cloud, "store_path", ctx->store_path); + } + + if (ctx->machine_id) { + flb_output_set_property(cloud, "machine_id", ctx->machine_id); + } + + /* Override network details: development purposes only */ + if (ctx->cloud_host) { + flb_output_set_property(cloud, "cloud_host", ctx->cloud_host); + } + + if (ctx->cloud_port) { + flb_output_set_property(cloud, "cloud_port", ctx->cloud_port); + } + + if (ctx->cloud_tls) { + flb_output_set_property(cloud, "tls", "true"); + } + else { + flb_output_set_property(cloud, "tls", "false"); + } + + if (ctx->cloud_tls_verify) { + flb_output_set_property(cloud, "tls.verify", "true"); + } + else { + flb_output_set_property(cloud, "tls.verify", "false"); + } + + if (ctx->fleet_id) { + flb_output_set_property(cloud, "fleet_id", ctx->fleet_id); + label = flb_sds_create_size(strlen("fleet_id") + strlen(ctx->fleet_id) + 1); + + if (!label) { + flb_free(ctx); + return NULL; + } + + flb_sds_printf(&label, "fleet_id %s", ctx->fleet_id); + flb_output_set_property(cloud, "add_label", label); + flb_sds_destroy(label); + } + +#ifdef FLB_HAVE_CHUNK_TRACE + flb_output_set_property(cloud, "pipeline_id", ctx->pipeline_id); +#endif /* FLB_HAVE_CHUNK_TRACE */ + + return cloud; +} + +static flb_sds_t sha256_to_hex(unsigned char *sha256) +{ + int idx; + flb_sds_t hex; + flb_sds_t tmp; + + hex = flb_sds_create_size(64); + + if (!hex) { + return NULL; + } + + for (idx = 0; idx < 32; idx++) { + tmp = flb_sds_printf(&hex, "%02x", sha256[idx]); + + if (!tmp) { + flb_sds_destroy(hex); + return NULL; + } + + hex = tmp; + } + + flb_sds_len_set(hex, 64); + return hex; +} + +static flb_sds_t get_machine_id(struct calyptia *ctx) +{ + int ret; + char *buf; + size_t blen; + unsigned char sha256_buf[64] = {0}; + + /* retrieve raw machine id */ + ret = flb_utils_get_machine_id(&buf, &blen); + + if (ret == -1) { + flb_plg_error(ctx->ins, "could not obtain machine id"); + return NULL; + } + + ret = flb_hash_simple(FLB_HASH_SHA256, + (unsigned char *) buf, + blen, + sha256_buf, + sizeof(sha256_buf)); + flb_free(buf); + + if (ret != FLB_CRYPTO_SUCCESS) { + return NULL; + } + + /* convert to hex */ + return sha256_to_hex(sha256_buf); +} + +static int cb_calyptia_init(struct flb_custom_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + struct calyptia *ctx; + int is_fleet_mode; + (void) data; + + ctx = flb_calloc(1, sizeof(struct calyptia)); + + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + + /* Load the config map */ + ret = flb_custom_config_map_set(ins, (void *) ctx); + + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* map instance and local context */ + flb_custom_set_context(ins, ctx); + + /* If no machine_id has been provided via a configuration option get it from the local machine-id. */ + if (!ctx->machine_id) { + /* machine id */ + ctx->machine_id = get_machine_id(ctx); + + if (ctx->machine_id == NULL) { + flb_plg_error(ctx->ins, "unable to retrieve machine_id"); + return -1; + } + } + + /* input collector */ + ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE); + + if (!ctx->i) { + flb_plg_error(ctx->ins, "could not load metrics collector"); + return -1; + } + + flb_input_set_property(ctx->i, "tag", "_calyptia_cloud"); + flb_input_set_property(ctx->i, "scrape_on_start", "true"); + flb_input_set_property(ctx->i, "scrape_interval", "30"); + + if (ctx->fleet_name || ctx->fleet_id) { + is_fleet_mode = FLB_TRUE; + } + else { + is_fleet_mode = FLB_FALSE; + } + + /* output cloud connector */ + if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) || + (is_fleet_mode == FLB_FALSE)) { + ctx->o = setup_cloud_output(config, ctx); + + if (ctx->o == NULL) { + return -1; + } + } + + if (ctx->fleet_id || ctx->fleet_name) { + ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); + + if (!ctx->fleet) { + flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin"); + return -1; + } + + if (ctx->fleet_name) { + // TODO: set this once the fleet_id has been retrieved... + // flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); + flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name); + } + else { + flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); + flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id); + } + + flb_input_set_property(ctx->fleet, "api_key", ctx->api_key); + flb_input_set_property(ctx->fleet, "host", ctx->cloud_host); + flb_input_set_property(ctx->fleet, "port", ctx->cloud_port); + + if (ctx->cloud_tls == 1) { + flb_input_set_property(ctx->fleet, "tls", "on"); + } + else { + flb_input_set_property(ctx->fleet, "tls", "off"); + } + + if (ctx->cloud_tls_verify == 1) { + flb_input_set_property(ctx->fleet, "tls.verify", "on"); + } + else { + flb_input_set_property(ctx->fleet, "tls.verify", "off"); + } + + if (ctx->fleet_config_dir) { + flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir); + } + + if (ctx->machine_id) { + flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id); + } + } + + if (ctx->o) { + flb_router_connect(ctx->i, ctx->o); + } + flb_plg_info(ins, "custom initialized!"); + return 0; +} + +static int cb_calyptia_exit(void *data, struct flb_config *config) +{ + struct calyptia *ctx = data; + + if (!ctx) { + return 0; + } + + flb_free(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "api_key", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, api_key), + "Calyptia Cloud API Key." + }, + + { + FLB_CONFIG_MAP_STR, "store_path", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, store_path) + }, + + { + FLB_CONFIG_MAP_STR, "calyptia_host", "cloud-api.calyptia.com", + 0, FLB_TRUE, offsetof(struct calyptia, cloud_host), + "" + }, + + { + FLB_CONFIG_MAP_STR, "calyptia_port", "443", + 0, FLB_TRUE, offsetof(struct calyptia, cloud_port), + "" + }, + + { + FLB_CONFIG_MAP_BOOL, "calyptia_tls", "true", + 0, FLB_TRUE, offsetof(struct calyptia, cloud_tls), + "" + }, + + { + FLB_CONFIG_MAP_BOOL, "calyptia_tls.verify", "true", + 0, FLB_TRUE, offsetof(struct calyptia, cloud_tls_verify), + "" + }, + + { + FLB_CONFIG_MAP_SLIST_1, "add_label", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct calyptia, add_labels), + "Label to append to the generated metric." + }, + { + FLB_CONFIG_MAP_STR, "machine_id", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, machine_id), + "Custom machine_id to be used when registering agent" + }, + { + FLB_CONFIG_MAP_STR, "fleet_id", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, fleet_id), + "Fleet id to be used when registering agent in a fleet" + }, + { + FLB_CONFIG_MAP_STR, "fleet.config_dir", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, fleet_config_dir), + "Base path for the configuration directory." + }, + { + FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1", + 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec), + "Set the collector interval" + }, + { + FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1", + 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec), + "Set the collector interval (nanoseconds)" + }, + { + FLB_CONFIG_MAP_STR, "fleet_name", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, fleet_name), + "Fleet name to be used when registering agent in a fleet" + }, + +#ifdef FLB_HAVE_CHUNK_TRACE + { + FLB_CONFIG_MAP_STR, "pipeline_id", NULL, + 0, FLB_TRUE, offsetof(struct calyptia, pipeline_id), + "Pipeline ID for reporting to calyptia cloud." + }, +#endif /* FLB_HAVE_CHUNK_TRACE */ + + /* EOF */ + {0} +}; + +struct flb_custom_plugin custom_calyptia_plugin = { + .name = "calyptia", + .description = "Calyptia Cloud", + .config_map = config_map, + .cb_init = cb_calyptia_init, + .cb_exit = cb_calyptia_exit, +}; |