diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_calyptia/calyptia.c')
-rw-r--r-- | src/fluent-bit/plugins/out_calyptia/calyptia.c | 1025 |
1 files changed, 0 insertions, 1025 deletions
diff --git a/src/fluent-bit/plugins/out_calyptia/calyptia.c b/src/fluent-bit/plugins/out_calyptia/calyptia.c deleted file mode 100644 index 19811dcc9..000000000 --- a/src/fluent-bit/plugins/out_calyptia/calyptia.c +++ /dev/null @@ -1,1025 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/flb_kv.h> -#include <fluent-bit/flb_upstream.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_version.h> -#include <fluent-bit/flb_metrics.h> -#include <fluent-bit/flb_fstore.h> - -#include "calyptia.h" - -#include <cmetrics/cmetrics.h> -#include <cmetrics/cmt_encode_influx.h> - -flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx); - -static int get_io_flags(struct flb_output_instance *ins) -{ - int flags = 0; - - if (ins->use_tls) { - flags = FLB_IO_TLS; - } - else { - flags = FLB_IO_TCP; - } - - return flags; -} - -static int config_add_labels(struct flb_output_instance *ins, - struct flb_calyptia *ctx) -{ - struct mk_list *head; - struct flb_config_map_val *mv; - struct flb_slist_entry *k = NULL; - struct flb_slist_entry *v = NULL; - struct flb_kv *kv; - - if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) { - return 0; - } - - /* iterate all 'add_label' definitions */ - flb_config_map_foreach(head, mv, ctx->add_labels) { - if (mk_list_size(mv->val.list) != 2) { - flb_plg_error(ins, "'add_label' expects a key and a value, " - "e.g: 'add_label version 1.8.x'"); - return -1; - } - - k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); - v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); - - kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str); - if (!kv) { - flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str); - return -1; - } - } - - return 0; -} - -static void append_labels(struct flb_calyptia *ctx, struct cmt *cmt) -{ - struct flb_kv *kv; - struct mk_list *head; - - mk_list_foreach(head, &ctx->kv_labels) { - kv = mk_list_entry(head, struct flb_kv, _head); - cmt_label_add(cmt, kv->key, kv->val); - } -} - -static void pack_str(msgpack_packer *mp_pck, char *str) -{ - int len; - - len = strlen(str); - msgpack_pack_str(mp_pck, len); - msgpack_pack_str_body(mp_pck, str, len); -} - -static void pack_env(struct flb_env *env, char *prefix, char *key, - struct flb_mp_map_header *h, - msgpack_packer *mp_pck) -{ - int len = 0; - char *val; - - /* prefix set in the key, if set, adjust the key name */ - if (prefix) { - len = strlen(prefix); - } - - val = (char *) flb_env_get(env, key); - if (val) { - flb_mp_map_header_append(h); - pack_str(mp_pck, key + len); - pack_str(mp_pck, val); - } -} - -static void pack_env_metadata(struct flb_env *env, - struct flb_mp_map_header *mh, msgpack_packer *mp_pck) -{ - char *tmp; - struct flb_mp_map_header h; - struct flb_mp_map_header meta; - - /* Metadata */ - flb_mp_map_header_append(mh); - pack_str(mp_pck, "metadata"); - - flb_mp_map_header_init(&meta, mp_pck); - - /* Kubernetes */ - tmp = (char *) flb_env_get(env, "k8s"); - if (tmp && strcasecmp(tmp, "enabled") == 0) { - flb_mp_map_header_append(&meta); - pack_str(mp_pck, "k8s"); - - /* adding k8s map */ - flb_mp_map_header_init(&h, mp_pck); - - pack_env(env, "k8s.", "k8s.namespace", &h, mp_pck); - pack_env(env, "k8s.", "k8s.pod_name", &h, mp_pck); - pack_env(env, "k8s.", "k8s.node_name", &h, mp_pck); - - flb_mp_map_header_end(&h); - } - - /* AWS */ - tmp = (char *) flb_env_get(env, "aws"); - if (tmp && strcasecmp(tmp, "enabled") == 0) { - flb_mp_map_header_append(&meta); - pack_str(mp_pck, "aws"); - - /* adding aws map */ - flb_mp_map_header_init(&h, mp_pck); - - pack_env(env, "aws.", "aws.az", &h, mp_pck); - pack_env(env, "aws.", "aws.ec2_instance_id", &h, mp_pck); - pack_env(env, "aws.", "aws.ec2_instance_type", &h, mp_pck); - pack_env(env, "aws.", "aws.private_ip", &h, mp_pck); - pack_env(env, "aws.", "aws.vpc_id", &h, mp_pck); - pack_env(env, "aws.", "aws.ami_id", &h, mp_pck); - pack_env(env, "aws.", "aws.account_id", &h, mp_pck); - pack_env(env, "aws.", "aws.hostname", &h, mp_pck); - - flb_mp_map_header_end(&h); - } - flb_mp_map_header_end(&meta); -} - -static flb_sds_t get_agent_metadata(struct flb_calyptia *ctx) -{ - int len; - char *host; - flb_sds_t conf; - flb_sds_t meta; - struct flb_mp_map_header mh; - msgpack_sbuffer mp_sbuf; - msgpack_packer mp_pck; - struct flb_config *config = ctx->config; - - /* init msgpack */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - /* pack map */ - flb_mp_map_header_init(&mh, &mp_pck); - - host = (char *) flb_env_get(ctx->env, "HOSTNAME"); - if (!host) { - host = "unknown"; - } - len = strlen(host); - - /* name */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "name", 4); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, host, len); - - /* type */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "type", 4); - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "fluentbit", 9); - - /* rawConfig */ - conf = custom_calyptia_pipeline_config_get(ctx->config); - if (conf) { - flb_mp_map_header_append(&mh); - len = flb_sds_len(conf); - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "rawConfig", 9); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, conf, len); - } - flb_sds_destroy(conf); - - /* version */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "version", 7); - len = strlen(FLB_VERSION_STR); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, len); - - /* edition */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "edition", 7); - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "community", 9); - - /* machineID */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "machineID", 9); - len = flb_sds_len(ctx->machine_id); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, ctx->machine_id, len); - - /* fleetID */ - if (ctx->fleet_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "fleetID", 7); - len = flb_sds_len(ctx->fleet_id); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, ctx->fleet_id, len); - } - - /* pack environment metadata */ - pack_env_metadata(config->env, &mh, &mp_pck); - - /* finalize */ - flb_mp_map_header_end(&mh); - - /* convert to json */ - meta = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - msgpack_sbuffer_destroy(&mp_sbuf); - - return meta; -} - -static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c, - int type) -{ - int ret; - size_t b_sent; - - /* append headers */ - if (type == CALYPTIA_ACTION_REGISTER) { - flb_http_add_header(c, - CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, - CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); - - flb_http_add_header(c, - CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1, - ctx->api_key, flb_sds_len(ctx->api_key)); - } - else if (type == CALYPTIA_ACTION_PATCH) { - flb_http_add_header(c, - CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, - CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); - - flb_http_add_header(c, - CALYPTIA_H_AGENT_TOKEN, - sizeof(CALYPTIA_H_AGENT_TOKEN) - 1, - ctx->agent_token, flb_sds_len(ctx->agent_token)); - } - else if (type == CALYPTIA_ACTION_METRICS) { - flb_http_add_header(c, - CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, - CALYPTIA_H_CTYPE_MSGPACK, - sizeof(CALYPTIA_H_CTYPE_MSGPACK) - 1); - - flb_http_add_header(c, - CALYPTIA_H_AGENT_TOKEN, - sizeof(CALYPTIA_H_AGENT_TOKEN) - 1, - ctx->agent_token, flb_sds_len(ctx->agent_token)); - } -#ifdef FLB_HAVE_CHUNK_TRACE - else if (type == CALYPTIA_ACTION_TRACE) { - flb_http_add_header(c, - CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, - CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); - - flb_http_add_header(c, - CALYPTIA_H_AGENT_TOKEN, - sizeof(CALYPTIA_H_AGENT_TOKEN) - 1, - ctx->agent_token, flb_sds_len(ctx->agent_token)); - } -#endif - - /* Map debug callbacks */ - flb_http_client_debug(c, ctx->ins->callback); - - /* Perform HTTP request */ - ret = flb_http_do(c, &b_sent); - if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i", ret); - return FLB_RETRY; - } - - if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) { - if (c->resp.payload_size > 0) { - flb_plg_warn(ctx->ins, "http_status=%i:\n%s", - c->resp.status, c->resp.payload); - } - else { - flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status); - } - - /* invalid metrics */ - if (c->resp.status == 422) { - return FLB_ERROR; - } - return FLB_RETRY;; - } - - return FLB_OK; -} - -static flb_sds_t get_agent_info(char *buf, size_t size, char *k) -{ - int i; - int ret; - int type; - int len; - char *out_buf; - flb_sds_t v = NULL; - size_t off = 0; - size_t out_size; - msgpack_unpacked result; - msgpack_object root; - msgpack_object key; - msgpack_object val; - - len = strlen(k); - - ret = flb_pack_json(buf, size, &out_buf, &out_size, &type, NULL); - if (ret != 0) { - return NULL; - } - - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, out_buf, out_size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - flb_free(out_buf); - msgpack_unpacked_destroy(&result); - return NULL; - } - - root = result.data; - if (root.type != MSGPACK_OBJECT_MAP) { - flb_free(out_buf); - msgpack_unpacked_destroy(&result); - return NULL; - } - - for (i = 0; i < root.via.map.size; i++) { - key = root.via.map.ptr[i].key; - val = root.via.map.ptr[i].val; - - if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (key.via.str.size != len) { - continue; - } - - if (strncmp(key.via.str.ptr, k, len) == 0) { - v = flb_sds_create_len(val.via.str.ptr, val.via.str.size); - break; - } - } - - flb_free(out_buf); - msgpack_unpacked_destroy(&result); - return v; -} - -/* Set the session content */ -static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size) -{ - int ret; - int type; - char *mp_buf; - size_t mp_size; - - /* remove any previous session file */ - if (ctx->fs_file) { - flb_fstore_file_delete(ctx->fs, ctx->fs_file); - } - - /* create session file */ - ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream, - CALYPTIA_SESSION_FILE, 1024); - if (!ctx->fs_file) { - flb_plg_error(ctx->ins, "could not create new session file"); - return -1; - } - - /* store meta */ - flb_fstore_file_meta_set(ctx->fs, ctx->fs_file, - FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1); - - /* encode */ - ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type, NULL); - if (ret < 0) { - flb_plg_error(ctx->ins, "could not encode session information"); - return -1; - } - - /* store content */ - ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not store session information"); - flb_free(mp_buf); - return -1; - } - - flb_free(mp_buf); - return 0; -} - -static int store_session_get(struct flb_calyptia *ctx, - void **out_buf, size_t *out_size) -{ - int ret; - void *buf; - size_t size; - flb_sds_t json; - - ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file, - &buf, &size); - - if (size == 0) { - return -1; - } - - /* decode */ - json = flb_msgpack_raw_to_json_sds(buf, size); - flb_free(buf); - if (!json) { - return -1; - } - - *out_buf = json; - *out_size = flb_sds_len(json); - - return ret; -} - -static int store_init(struct flb_calyptia *ctx) -{ - int ret; - struct flb_fstore *fs; - struct flb_fstore_file *fsf; - void *buf; - size_t size; - - /* store context */ - fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS); - if (!fs) { - flb_plg_error(ctx->ins, - "could not initialize 'store_path': %s", - ctx->store_path); - return -1; - } - ctx->fs = fs; - - /* stream */ - ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia"); - if (!ctx->fs_stream) { - flb_plg_error(ctx->ins, "could not create storage stream"); - return -1; - } - - /* lookup any previous file */ - fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE, - sizeof(CALYPTIA_SESSION_FILE) - 1); - if (!fsf) { - flb_plg_debug(ctx->ins, "no session file was found"); - return 0; - } - ctx->fs_file = fsf; - - /* retrieve session info */ - ret = store_session_get(ctx, &buf, &size); - if (ret == 0) { - /* agent id */ - ctx->agent_id = get_agent_info(buf, size, "id"); - - /* agent token */ - ctx->agent_token = get_agent_info(buf, size, "token"); - - if (ctx->agent_id && ctx->agent_token) { - flb_plg_info(ctx->ins, "session setup OK"); - } - else { - if (ctx->agent_id) { - flb_sds_destroy(ctx->agent_id); - } - if (ctx->agent_token) { - flb_sds_destroy(ctx->agent_token); - } - } - flb_sds_destroy(buf); - } - - return 0; -} - -/* Agent creation is perform on initialization using a sync upstream connection */ -static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx) -{ - int ret; - int flb_ret; - int flags; - int action = CALYPTIA_ACTION_REGISTER; - char uri[1024]; - flb_sds_t meta; - struct flb_upstream *u; - struct flb_connection *u_conn; - struct flb_http_client *c; - - /* Meta */ - meta = get_agent_metadata(ctx); - if (!meta) { - flb_plg_error(ctx->ins, "could not retrieve metadata"); - return -1; - } - - /* Upstream */ - flags = get_io_flags(ctx->ins); - u = flb_upstream_create(ctx->config, - ctx->cloud_host, ctx->cloud_port, - flags, ctx->ins->tls); - if (!u) { - flb_plg_error(ctx->ins, - "could not create upstream connection on 'agent create'"); - flb_sds_destroy(meta); - return -1; - } - - /* Make it synchronous */ - flb_stream_disable_async_mode(&u->base); - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(u); - if (!u_conn) { - flb_upstream_destroy(u); - flb_sds_destroy(meta); - return -1; - } - - if (ctx->agent_id && ctx->agent_token) { - /* Patch */ - action = CALYPTIA_ACTION_PATCH; - snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id); - c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri, - meta, flb_sds_len(meta), NULL, 0, NULL, 0); - } - else { - /* Create */ - action = CALYPTIA_ACTION_REGISTER; - c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE, - meta, flb_sds_len(meta), NULL, 0, NULL, 0); - } - - if (!c) { - flb_upstream_conn_release(u_conn); - flb_upstream_destroy(u); - return -1; - } - - /* perform request */ - flb_ret = calyptia_http_do(ctx, c, action); - if (flb_ret == FLB_OK && - (c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) { - if (c->resp.payload_size > 0) { - if (action == CALYPTIA_ACTION_REGISTER) { - /* agent id */ - ctx->agent_id = get_agent_info(c->resp.payload, - c->resp.payload_size, - "id"); - - /* agent token */ - ctx->agent_token = get_agent_info(c->resp.payload, - c->resp.payload_size, - "token"); - - if (ctx->agent_id && ctx->agent_token) { - flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'", - ctx->agent_id); - - if (ctx->store_path && ctx->fs) { - ret = store_session_set(ctx, - c->resp.payload, - c->resp.payload_size); - if (ret == -1) { - flb_plg_warn(ctx->ins, - "could not store Calyptia session"); - } - } - } - } - } - - if (action == CALYPTIA_ACTION_PATCH) { - flb_plg_info(ctx->ins, "known agent registration successful"); - } - } - - /* release resources */ - flb_sds_destroy(meta); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - flb_upstream_destroy(u); - - return flb_ret; -} - -static struct flb_calyptia *config_init(struct flb_output_instance *ins, - struct flb_config *config) -{ - int ret; - int flags; - struct flb_calyptia *ctx; - - /* Calyptia plugin context */ - ctx = flb_calloc(1, sizeof(struct flb_calyptia)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - ctx->config = config; - flb_kv_init(&ctx->kv_labels); - - /* Load the config map */ - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; - } - - /* api_key */ - if (!ctx->api_key) { - flb_plg_error(ctx->ins, "configuration 'api_key' is missing"); - flb_free(ctx); - return NULL; - } - - /* parse 'add_label' */ - ret = config_add_labels(ins, ctx); - if (ret == -1) { - return NULL; - } - - /* env reader */ - ctx->env = flb_env_create(); - - /* Set context */ - flb_output_set_context(ins, ctx); - - /* Initialize optional storage */ - if (ctx->store_path) { - ret = store_init(ctx); - if (ret == -1) { - return NULL; - } - } - - /* the machine-id is provided by custom calyptia, which invokes this plugin. */ - if (!ctx->machine_id) { - flb_plg_error(ctx->ins, "machine_id has not been set"); - return NULL; - } - - flb_plg_debug(ctx->ins, "machine_id=%s", ctx->machine_id); - - /* Upstream */ - flags = get_io_flags(ctx->ins); - ctx->u = flb_upstream_create(ctx->config, - ctx->cloud_host, ctx->cloud_port, - flags, ctx->ins->tls); - if (!ctx->u) { - return NULL; - } - - /* Set instance flags into upstream */ - flb_output_upstream_set(ctx->u, ins); - - return ctx; -} - -static int cb_calyptia_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - int ret; - struct flb_calyptia *ctx; - (void) data; - - /* create config context */ - ctx = config_init(ins, config); - if (!ctx) { - flb_plg_error(ins, "could not initialize configuration"); - return -1; - } - - /* - * This plugin instance uses the HTTP client interface, let's register - * it debugging callbacks. - */ - flb_output_set_http_debug_callbacks(ins); - - /* register/update agent */ - ret = api_agent_create(config, ctx); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "agent registration failed"); - return -1; - } - - /* metrics endpoint */ - ctx->metrics_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id); - -#ifdef FLB_HAVE_CHUNK_TRACE - ctx->trace_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, - ctx->pipeline_id); -#endif /* FLB_HAVE_CHUNK_TRACE */ - return 0; -} - -static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes) -{ - int ret; - size_t off = 0; - struct cmt *cmt; - cfl_sds_t out; - - ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off); - if (ret != CMT_DECODE_MSGPACK_SUCCESS) { - flb_plg_warn(ctx->ins, "could not unpack debug payload"); - return; - } - - out = cmt_encode_text_create(cmt); - flb_plg_info(ctx->ins, "debug payload:\n%s", out); - cmt_encode_text_destroy(out); - cmt_destroy(cmt); -} - -static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - int ret; - size_t off = 0; - size_t out_size = 0; - char *out_buf = NULL; - -/* used to create records for reporting traces to the cloud. */ -#ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t json; -#endif /* FLB_HAVE_CHUNK_TRACE */ - - struct flb_connection *u_conn; - struct flb_http_client *c; - struct flb_calyptia *ctx = out_context; - struct cmt *cmt; - (void) i_ins; - (void) config; - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { - /* if we have labels append them */ - if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) { - ret = cmt_decode_msgpack_create(&cmt, - (char *) event_chunk->data, - event_chunk->size, - &off); - if (ret != CMT_DECODE_MSGPACK_SUCCESS) { - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_ERROR); - } - - /* append labels set by config */ - append_labels(ctx, cmt); - - /* encode back to msgpack */ - ret = cmt_encode_msgpack_create(cmt, &out_buf, &out_size); - if (ret != 0) { - cmt_destroy(cmt); - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_ERROR); - } - cmt_destroy(cmt); - } - else { - out_buf = (char *) event_chunk->data; - out_size = event_chunk->size; - } - - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); - if (!c) { - if (out_buf != event_chunk->data) { - cmt_encode_msgpack_destroy(out_buf); - } - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ - ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS); - if (ret == FLB_OK) { - flb_plg_debug(ctx->ins, "metrics delivered OK"); - } - else if (ret == FLB_ERROR) { - flb_plg_error(ctx->ins, "could not deliver metrics"); - debug_payload(ctx, out_buf, out_size); - } - - if (out_buf != event_chunk->data) { - cmt_encode_msgpack_destroy(out_buf); - } - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) { - json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_STREAM, - FLB_PACK_JSON_DATE_DOUBLE, - NULL); - if (json == NULL) { - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - out_buf = (char *)json; - out_size = flb_sds_len(json); - - if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id) == NULL) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); - if (!c) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(json); - flb_sds_destroy(ctx->metrics_endpoint); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ - ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE); - if (ret == FLB_OK) { - flb_plg_debug(ctx->ins, "trace delivered OK"); - } - else if (ret == FLB_ERROR) { - flb_plg_error(ctx->ins, "could not deliver trace"); - debug_payload(ctx, out_buf, out_size); - } - flb_sds_destroy(json); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - flb_upstream_conn_release(u_conn); - flb_http_client_destroy(c); - FLB_OUTPUT_RETURN(ret); -} - -static int cb_calyptia_exit(void *data, struct flb_config *config) -{ - struct flb_calyptia *ctx = data; - - if (!ctx) { - return 0; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - if (ctx->agent_id) { - flb_sds_destroy(ctx->agent_id); - } - - if (ctx->agent_token) { - flb_sds_destroy(ctx->agent_token); - } - - if (ctx->env) { - flb_env_destroy(ctx->env); - } - - if (ctx->metrics_endpoint) { - flb_sds_destroy(ctx->metrics_endpoint); - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ctx->trace_endpoint) { - flb_sds_destroy(ctx->trace_endpoint); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - if (ctx->fs) { - flb_fstore_destroy(ctx->fs); - } - - flb_kv_release(&ctx->kv_labels); - flb_free(ctx); - - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "cloud_host", CALYPTIA_HOST, - 0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_host), - "", - }, - - { - FLB_CONFIG_MAP_INT, "cloud_port", CALYPTIA_PORT, - 0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_port), - "", - }, - - { - FLB_CONFIG_MAP_STR, "api_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_calyptia, api_key), - "Calyptia Cloud API Key." - }, - { - FLB_CONFIG_MAP_STR, "machine_id", NULL, - 0, FLB_TRUE, offsetof(struct flb_calyptia, machine_id), - "Custom machine_id to be used when registering agent" - }, - { - FLB_CONFIG_MAP_STR, "fleet_id", NULL, - 0, FLB_TRUE, offsetof(struct flb_calyptia, fleet_id), - "Fleet ID for identifying as part of a managed fleet" - }, - - { - FLB_CONFIG_MAP_STR, "store_path", NULL, - 0, FLB_TRUE, offsetof(struct flb_calyptia, store_path), - "" - }, - - { - FLB_CONFIG_MAP_SLIST_1, "add_label", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_calyptia, add_labels), - "Label to append to the generated metric." - }, - -#ifdef FLB_HAVE_CHUNK_TRACE - { - FLB_CONFIG_MAP_STR, "pipeline_id", NULL, - 0, FLB_TRUE, offsetof(struct flb_calyptia, pipeline_id), - "Pipeline ID for calyptia core traces." - }, -#endif - - /* EOF */ - {0} -}; - -struct flb_output_plugin out_calyptia_plugin = { - .name = "calyptia", - .description = "Calyptia Cloud", - .cb_init = cb_calyptia_init, - .cb_flush = cb_calyptia_flush, - .cb_exit = cb_calyptia_exit, - .config_map = config_map, - .flags = FLB_OUTPUT_NET | FLB_OUTPUT_PRIVATE | FLB_IO_OPT_TLS, - .event_type = FLB_OUTPUT_METRICS -}; |