summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_calyptia/calyptia.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_calyptia/calyptia.c')
-rw-r--r--src/fluent-bit/plugins/out_calyptia/calyptia.c1025
1 files changed, 0 insertions, 1025 deletions
diff --git a/src/fluent-bit/plugins/out_calyptia/calyptia.c b/src/fluent-bit/plugins/out_calyptia/calyptia.c
deleted file mode 100644
index 19811dcc9..000000000
--- a/src/fluent-bit/plugins/out_calyptia/calyptia.c
+++ /dev/null
@@ -1,1025 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_log.h>
-#include <fluent-bit/flb_kv.h>
-#include <fluent-bit/flb_upstream.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_version.h>
-#include <fluent-bit/flb_metrics.h>
-#include <fluent-bit/flb_fstore.h>
-
-#include "calyptia.h"
-
-#include <cmetrics/cmetrics.h>
-#include <cmetrics/cmt_encode_influx.h>
-
-flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx);
-
-static int get_io_flags(struct flb_output_instance *ins)
-{
- int flags = 0;
-
- if (ins->use_tls) {
- flags = FLB_IO_TLS;
- }
- else {
- flags = FLB_IO_TCP;
- }
-
- return flags;
-}
-
-static int config_add_labels(struct flb_output_instance *ins,
- struct flb_calyptia *ctx)
-{
- struct mk_list *head;
- struct flb_config_map_val *mv;
- struct flb_slist_entry *k = NULL;
- struct flb_slist_entry *v = NULL;
- struct flb_kv *kv;
-
- if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
- return 0;
- }
-
- /* iterate all 'add_label' definitions */
- flb_config_map_foreach(head, mv, ctx->add_labels) {
- if (mk_list_size(mv->val.list) != 2) {
- flb_plg_error(ins, "'add_label' expects a key and a value, "
- "e.g: 'add_label version 1.8.x'");
- return -1;
- }
-
- k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
- v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
-
- kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
- if (!kv) {
- flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
- return -1;
- }
- }
-
- return 0;
-}
-
-static void append_labels(struct flb_calyptia *ctx, struct cmt *cmt)
-{
- struct flb_kv *kv;
- struct mk_list *head;
-
- mk_list_foreach(head, &ctx->kv_labels) {
- kv = mk_list_entry(head, struct flb_kv, _head);
- cmt_label_add(cmt, kv->key, kv->val);
- }
-}
-
-static void pack_str(msgpack_packer *mp_pck, char *str)
-{
- int len;
-
- len = strlen(str);
- msgpack_pack_str(mp_pck, len);
- msgpack_pack_str_body(mp_pck, str, len);
-}
-
-static void pack_env(struct flb_env *env, char *prefix, char *key,
- struct flb_mp_map_header *h,
- msgpack_packer *mp_pck)
-{
- int len = 0;
- char *val;
-
- /* prefix set in the key, if set, adjust the key name */
- if (prefix) {
- len = strlen(prefix);
- }
-
- val = (char *) flb_env_get(env, key);
- if (val) {
- flb_mp_map_header_append(h);
- pack_str(mp_pck, key + len);
- pack_str(mp_pck, val);
- }
-}
-
-static void pack_env_metadata(struct flb_env *env,
- struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
-{
- char *tmp;
- struct flb_mp_map_header h;
- struct flb_mp_map_header meta;
-
- /* Metadata */
- flb_mp_map_header_append(mh);
- pack_str(mp_pck, "metadata");
-
- flb_mp_map_header_init(&meta, mp_pck);
-
- /* Kubernetes */
- tmp = (char *) flb_env_get(env, "k8s");
- if (tmp && strcasecmp(tmp, "enabled") == 0) {
- flb_mp_map_header_append(&meta);
- pack_str(mp_pck, "k8s");
-
- /* adding k8s map */
- flb_mp_map_header_init(&h, mp_pck);
-
- pack_env(env, "k8s.", "k8s.namespace", &h, mp_pck);
- pack_env(env, "k8s.", "k8s.pod_name", &h, mp_pck);
- pack_env(env, "k8s.", "k8s.node_name", &h, mp_pck);
-
- flb_mp_map_header_end(&h);
- }
-
- /* AWS */
- tmp = (char *) flb_env_get(env, "aws");
- if (tmp && strcasecmp(tmp, "enabled") == 0) {
- flb_mp_map_header_append(&meta);
- pack_str(mp_pck, "aws");
-
- /* adding aws map */
- flb_mp_map_header_init(&h, mp_pck);
-
- pack_env(env, "aws.", "aws.az", &h, mp_pck);
- pack_env(env, "aws.", "aws.ec2_instance_id", &h, mp_pck);
- pack_env(env, "aws.", "aws.ec2_instance_type", &h, mp_pck);
- pack_env(env, "aws.", "aws.private_ip", &h, mp_pck);
- pack_env(env, "aws.", "aws.vpc_id", &h, mp_pck);
- pack_env(env, "aws.", "aws.ami_id", &h, mp_pck);
- pack_env(env, "aws.", "aws.account_id", &h, mp_pck);
- pack_env(env, "aws.", "aws.hostname", &h, mp_pck);
-
- flb_mp_map_header_end(&h);
- }
- flb_mp_map_header_end(&meta);
-}
-
-static flb_sds_t get_agent_metadata(struct flb_calyptia *ctx)
-{
- int len;
- char *host;
- flb_sds_t conf;
- flb_sds_t meta;
- struct flb_mp_map_header mh;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- struct flb_config *config = ctx->config;
-
- /* init msgpack */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* pack map */
- flb_mp_map_header_init(&mh, &mp_pck);
-
- host = (char *) flb_env_get(ctx->env, "HOSTNAME");
- if (!host) {
- host = "unknown";
- }
- len = strlen(host);
-
- /* name */
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 4);
- msgpack_pack_str_body(&mp_pck, "name", 4);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, host, len);
-
- /* type */
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 4);
- msgpack_pack_str_body(&mp_pck, "type", 4);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "fluentbit", 9);
-
- /* rawConfig */
- conf = custom_calyptia_pipeline_config_get(ctx->config);
- if (conf) {
- flb_mp_map_header_append(&mh);
- len = flb_sds_len(conf);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "rawConfig", 9);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, conf, len);
- }
- flb_sds_destroy(conf);
-
- /* version */
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "version", 7);
- len = strlen(FLB_VERSION_STR);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, len);
-
- /* edition */
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "edition", 7);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "community", 9);
-
- /* machineID */
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "machineID", 9);
- len = flb_sds_len(ctx->machine_id);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, ctx->machine_id, len);
-
- /* fleetID */
- if (ctx->fleet_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "fleetID", 7);
- len = flb_sds_len(ctx->fleet_id);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, ctx->fleet_id, len);
- }
-
- /* pack environment metadata */
- pack_env_metadata(config->env, &mh, &mp_pck);
-
- /* finalize */
- flb_mp_map_header_end(&mh);
-
- /* convert to json */
- meta = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
- msgpack_sbuffer_destroy(&mp_sbuf);
-
- return meta;
-}
-
-static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
- int type)
-{
- int ret;
- size_t b_sent;
-
- /* append headers */
- if (type == CALYPTIA_ACTION_REGISTER) {
- flb_http_add_header(c,
- CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
- CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
-
- flb_http_add_header(c,
- CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
- ctx->api_key, flb_sds_len(ctx->api_key));
- }
- else if (type == CALYPTIA_ACTION_PATCH) {
- flb_http_add_header(c,
- CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
- CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
-
- flb_http_add_header(c,
- CALYPTIA_H_AGENT_TOKEN,
- sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
- ctx->agent_token, flb_sds_len(ctx->agent_token));
- }
- else if (type == CALYPTIA_ACTION_METRICS) {
- flb_http_add_header(c,
- CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
- CALYPTIA_H_CTYPE_MSGPACK,
- sizeof(CALYPTIA_H_CTYPE_MSGPACK) - 1);
-
- flb_http_add_header(c,
- CALYPTIA_H_AGENT_TOKEN,
- sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
- ctx->agent_token, flb_sds_len(ctx->agent_token));
- }
-#ifdef FLB_HAVE_CHUNK_TRACE
- else if (type == CALYPTIA_ACTION_TRACE) {
- flb_http_add_header(c,
- CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
- CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
-
- flb_http_add_header(c,
- CALYPTIA_H_AGENT_TOKEN,
- sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
- ctx->agent_token, flb_sds_len(ctx->agent_token));
- }
-#endif
-
- /* Map debug callbacks */
- flb_http_client_debug(c, ctx->ins->callback);
-
- /* Perform HTTP request */
- ret = flb_http_do(c, &b_sent);
- if (ret != 0) {
- flb_plg_warn(ctx->ins, "http_do=%i", ret);
- return FLB_RETRY;
- }
-
- if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) {
- if (c->resp.payload_size > 0) {
- flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
- c->resp.status, c->resp.payload);
- }
- else {
- flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status);
- }
-
- /* invalid metrics */
- if (c->resp.status == 422) {
- return FLB_ERROR;
- }
- return FLB_RETRY;;
- }
-
- return FLB_OK;
-}
-
-static flb_sds_t get_agent_info(char *buf, size_t size, char *k)
-{
- int i;
- int ret;
- int type;
- int len;
- char *out_buf;
- flb_sds_t v = NULL;
- size_t off = 0;
- size_t out_size;
- msgpack_unpacked result;
- msgpack_object root;
- msgpack_object key;
- msgpack_object val;
-
- len = strlen(k);
-
- ret = flb_pack_json(buf, size, &out_buf, &out_size, &type, NULL);
- if (ret != 0) {
- return NULL;
- }
-
- msgpack_unpacked_init(&result);
- ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
- if (ret != MSGPACK_UNPACK_SUCCESS) {
- flb_free(out_buf);
- msgpack_unpacked_destroy(&result);
- return NULL;
- }
-
- root = result.data;
- if (root.type != MSGPACK_OBJECT_MAP) {
- flb_free(out_buf);
- msgpack_unpacked_destroy(&result);
- return NULL;
- }
-
- for (i = 0; i < root.via.map.size; i++) {
- key = root.via.map.ptr[i].key;
- val = root.via.map.ptr[i].val;
-
- if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) {
- continue;
- }
-
- if (key.via.str.size != len) {
- continue;
- }
-
- if (strncmp(key.via.str.ptr, k, len) == 0) {
- v = flb_sds_create_len(val.via.str.ptr, val.via.str.size);
- break;
- }
- }
-
- flb_free(out_buf);
- msgpack_unpacked_destroy(&result);
- return v;
-}
-
-/* Set the session content */
-static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size)
-{
- int ret;
- int type;
- char *mp_buf;
- size_t mp_size;
-
- /* remove any previous session file */
- if (ctx->fs_file) {
- flb_fstore_file_delete(ctx->fs, ctx->fs_file);
- }
-
- /* create session file */
- ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream,
- CALYPTIA_SESSION_FILE, 1024);
- if (!ctx->fs_file) {
- flb_plg_error(ctx->ins, "could not create new session file");
- return -1;
- }
-
- /* store meta */
- flb_fstore_file_meta_set(ctx->fs, ctx->fs_file,
- FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1);
-
- /* encode */
- ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type, NULL);
- if (ret < 0) {
- flb_plg_error(ctx->ins, "could not encode session information");
- return -1;
- }
-
- /* store content */
- ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "could not store session information");
- flb_free(mp_buf);
- return -1;
- }
-
- flb_free(mp_buf);
- return 0;
-}
-
-static int store_session_get(struct flb_calyptia *ctx,
- void **out_buf, size_t *out_size)
-{
- int ret;
- void *buf;
- size_t size;
- flb_sds_t json;
-
- ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file,
- &buf, &size);
-
- if (size == 0) {
- return -1;
- }
-
- /* decode */
- json = flb_msgpack_raw_to_json_sds(buf, size);
- flb_free(buf);
- if (!json) {
- return -1;
- }
-
- *out_buf = json;
- *out_size = flb_sds_len(json);
-
- return ret;
-}
-
-static int store_init(struct flb_calyptia *ctx)
-{
- int ret;
- struct flb_fstore *fs;
- struct flb_fstore_file *fsf;
- void *buf;
- size_t size;
-
- /* store context */
- fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS);
- if (!fs) {
- flb_plg_error(ctx->ins,
- "could not initialize 'store_path': %s",
- ctx->store_path);
- return -1;
- }
- ctx->fs = fs;
-
- /* stream */
- ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia");
- if (!ctx->fs_stream) {
- flb_plg_error(ctx->ins, "could not create storage stream");
- return -1;
- }
-
- /* lookup any previous file */
- fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE,
- sizeof(CALYPTIA_SESSION_FILE) - 1);
- if (!fsf) {
- flb_plg_debug(ctx->ins, "no session file was found");
- return 0;
- }
- ctx->fs_file = fsf;
-
- /* retrieve session info */
- ret = store_session_get(ctx, &buf, &size);
- if (ret == 0) {
- /* agent id */
- ctx->agent_id = get_agent_info(buf, size, "id");
-
- /* agent token */
- ctx->agent_token = get_agent_info(buf, size, "token");
-
- if (ctx->agent_id && ctx->agent_token) {
- flb_plg_info(ctx->ins, "session setup OK");
- }
- else {
- if (ctx->agent_id) {
- flb_sds_destroy(ctx->agent_id);
- }
- if (ctx->agent_token) {
- flb_sds_destroy(ctx->agent_token);
- }
- }
- flb_sds_destroy(buf);
- }
-
- return 0;
-}
-
-/* Agent creation is perform on initialization using a sync upstream connection */
-static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
-{
- int ret;
- int flb_ret;
- int flags;
- int action = CALYPTIA_ACTION_REGISTER;
- char uri[1024];
- flb_sds_t meta;
- struct flb_upstream *u;
- struct flb_connection *u_conn;
- struct flb_http_client *c;
-
- /* Meta */
- meta = get_agent_metadata(ctx);
- if (!meta) {
- flb_plg_error(ctx->ins, "could not retrieve metadata");
- return -1;
- }
-
- /* Upstream */
- flags = get_io_flags(ctx->ins);
- u = flb_upstream_create(ctx->config,
- ctx->cloud_host, ctx->cloud_port,
- flags, ctx->ins->tls);
- if (!u) {
- flb_plg_error(ctx->ins,
- "could not create upstream connection on 'agent create'");
- flb_sds_destroy(meta);
- return -1;
- }
-
- /* Make it synchronous */
- flb_stream_disable_async_mode(&u->base);
-
- /* Get upstream connection */
- u_conn = flb_upstream_conn_get(u);
- if (!u_conn) {
- flb_upstream_destroy(u);
- flb_sds_destroy(meta);
- return -1;
- }
-
- if (ctx->agent_id && ctx->agent_token) {
- /* Patch */
- action = CALYPTIA_ACTION_PATCH;
- snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id);
- c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri,
- meta, flb_sds_len(meta), NULL, 0, NULL, 0);
- }
- else {
- /* Create */
- action = CALYPTIA_ACTION_REGISTER;
- c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE,
- meta, flb_sds_len(meta), NULL, 0, NULL, 0);
- }
-
- if (!c) {
- flb_upstream_conn_release(u_conn);
- flb_upstream_destroy(u);
- return -1;
- }
-
- /* perform request */
- flb_ret = calyptia_http_do(ctx, c, action);
- if (flb_ret == FLB_OK &&
- (c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) {
- if (c->resp.payload_size > 0) {
- if (action == CALYPTIA_ACTION_REGISTER) {
- /* agent id */
- ctx->agent_id = get_agent_info(c->resp.payload,
- c->resp.payload_size,
- "id");
-
- /* agent token */
- ctx->agent_token = get_agent_info(c->resp.payload,
- c->resp.payload_size,
- "token");
-
- if (ctx->agent_id && ctx->agent_token) {
- flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'",
- ctx->agent_id);
-
- if (ctx->store_path && ctx->fs) {
- ret = store_session_set(ctx,
- c->resp.payload,
- c->resp.payload_size);
- if (ret == -1) {
- flb_plg_warn(ctx->ins,
- "could not store Calyptia session");
- }
- }
- }
- }
- }
-
- if (action == CALYPTIA_ACTION_PATCH) {
- flb_plg_info(ctx->ins, "known agent registration successful");
- }
- }
-
- /* release resources */
- flb_sds_destroy(meta);
- flb_http_client_destroy(c);
- flb_upstream_conn_release(u_conn);
- flb_upstream_destroy(u);
-
- return flb_ret;
-}
-
-static struct flb_calyptia *config_init(struct flb_output_instance *ins,
- struct flb_config *config)
-{
- int ret;
- int flags;
- struct flb_calyptia *ctx;
-
- /* Calyptia plugin context */
- ctx = flb_calloc(1, sizeof(struct flb_calyptia));
- if (!ctx) {
- flb_errno();
- return NULL;
- }
- ctx->ins = ins;
- ctx->config = config;
- flb_kv_init(&ctx->kv_labels);
-
- /* Load the config map */
- ret = flb_output_config_map_set(ins, (void *) ctx);
- if (ret == -1) {
- flb_free(ctx);
- return NULL;
- }
-
- /* api_key */
- if (!ctx->api_key) {
- flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
- flb_free(ctx);
- return NULL;
- }
-
- /* parse 'add_label' */
- ret = config_add_labels(ins, ctx);
- if (ret == -1) {
- return NULL;
- }
-
- /* env reader */
- ctx->env = flb_env_create();
-
- /* Set context */
- flb_output_set_context(ins, ctx);
-
- /* Initialize optional storage */
- if (ctx->store_path) {
- ret = store_init(ctx);
- if (ret == -1) {
- return NULL;
- }
- }
-
- /* the machine-id is provided by custom calyptia, which invokes this plugin. */
- if (!ctx->machine_id) {
- flb_plg_error(ctx->ins, "machine_id has not been set");
- return NULL;
- }
-
- flb_plg_debug(ctx->ins, "machine_id=%s", ctx->machine_id);
-
- /* Upstream */
- flags = get_io_flags(ctx->ins);
- ctx->u = flb_upstream_create(ctx->config,
- ctx->cloud_host, ctx->cloud_port,
- flags, ctx->ins->tls);
- if (!ctx->u) {
- return NULL;
- }
-
- /* Set instance flags into upstream */
- flb_output_upstream_set(ctx->u, ins);
-
- return ctx;
-}
-
-static int cb_calyptia_init(struct flb_output_instance *ins,
- struct flb_config *config, void *data)
-{
- int ret;
- struct flb_calyptia *ctx;
- (void) data;
-
- /* create config context */
- ctx = config_init(ins, config);
- if (!ctx) {
- flb_plg_error(ins, "could not initialize configuration");
- return -1;
- }
-
- /*
- * This plugin instance uses the HTTP client interface, let's register
- * it debugging callbacks.
- */
- flb_output_set_http_debug_callbacks(ins);
-
- /* register/update agent */
- ret = api_agent_create(config, ctx);
- if (ret != FLB_OK) {
- flb_plg_error(ctx->ins, "agent registration failed");
- return -1;
- }
-
- /* metrics endpoint */
- ctx->metrics_endpoint = flb_sds_create_size(256);
- flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
- ctx->agent_id);
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- ctx->trace_endpoint = flb_sds_create_size(256);
- flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
- ctx->pipeline_id);
-#endif /* FLB_HAVE_CHUNK_TRACE */
- return 0;
-}
-
-static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
-{
- int ret;
- size_t off = 0;
- struct cmt *cmt;
- cfl_sds_t out;
-
- ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off);
- if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
- flb_plg_warn(ctx->ins, "could not unpack debug payload");
- return;
- }
-
- out = cmt_encode_text_create(cmt);
- flb_plg_info(ctx->ins, "debug payload:\n%s", out);
- cmt_encode_text_destroy(out);
- cmt_destroy(cmt);
-}
-
-static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *i_ins,
- void *out_context,
- struct flb_config *config)
-{
- int ret;
- size_t off = 0;
- size_t out_size = 0;
- char *out_buf = NULL;
-
-/* used to create records for reporting traces to the cloud. */
-#ifdef FLB_HAVE_CHUNK_TRACE
- flb_sds_t json;
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- struct flb_connection *u_conn;
- struct flb_http_client *c;
- struct flb_calyptia *ctx = out_context;
- struct cmt *cmt;
- (void) i_ins;
- (void) config;
-
- /* Get upstream connection */
- u_conn = flb_upstream_conn_get(ctx->u);
- if (!u_conn) {
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
- /* if we have labels append them */
- if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) {
- ret = cmt_decode_msgpack_create(&cmt,
- (char *) event_chunk->data,
- event_chunk->size,
- &off);
- if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
-
- /* append labels set by config */
- append_labels(ctx, cmt);
-
- /* encode back to msgpack */
- ret = cmt_encode_msgpack_create(cmt, &out_buf, &out_size);
- if (ret != 0) {
- cmt_destroy(cmt);
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
- cmt_destroy(cmt);
- }
- else {
- out_buf = (char *) event_chunk->data;
- out_size = event_chunk->size;
- }
-
- /* Compose HTTP Client request */
- c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
- out_buf, out_size, NULL, 0, NULL, 0);
- if (!c) {
- if (out_buf != event_chunk->data) {
- cmt_encode_msgpack_destroy(out_buf);
- }
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
- ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
- if (ret == FLB_OK) {
- flb_plg_debug(ctx->ins, "metrics delivered OK");
- }
- else if (ret == FLB_ERROR) {
- flb_plg_error(ctx->ins, "could not deliver metrics");
- debug_payload(ctx, out_buf, out_size);
- }
-
- if (out_buf != event_chunk->data) {
- cmt_encode_msgpack_destroy(out_buf);
- }
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
- json = flb_pack_msgpack_to_json_format(event_chunk->data,
- event_chunk->size,
- FLB_PACK_JSON_FORMAT_STREAM,
- FLB_PACK_JSON_DATE_DOUBLE,
- NULL);
- if (json == NULL) {
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
- out_buf = (char *)json;
- out_size = flb_sds_len(json);
-
- if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
- ctx->agent_id) == NULL) {
- flb_upstream_conn_release(u_conn);
- flb_sds_destroy(json);
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
- c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
- out_buf, out_size, NULL, 0, NULL, 0);
- if (!c) {
- flb_upstream_conn_release(u_conn);
- flb_sds_destroy(json);
- flb_sds_destroy(ctx->metrics_endpoint);
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
- ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
- if (ret == FLB_OK) {
- flb_plg_debug(ctx->ins, "trace delivered OK");
- }
- else if (ret == FLB_ERROR) {
- flb_plg_error(ctx->ins, "could not deliver trace");
- debug_payload(ctx, out_buf, out_size);
- }
- flb_sds_destroy(json);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- flb_upstream_conn_release(u_conn);
- flb_http_client_destroy(c);
- FLB_OUTPUT_RETURN(ret);
-}
-
-static int cb_calyptia_exit(void *data, struct flb_config *config)
-{
- struct flb_calyptia *ctx = data;
-
- if (!ctx) {
- return 0;
- }
-
- if (ctx->u) {
- flb_upstream_destroy(ctx->u);
- }
-
- if (ctx->agent_id) {
- flb_sds_destroy(ctx->agent_id);
- }
-
- if (ctx->agent_token) {
- flb_sds_destroy(ctx->agent_token);
- }
-
- if (ctx->env) {
- flb_env_destroy(ctx->env);
- }
-
- if (ctx->metrics_endpoint) {
- flb_sds_destroy(ctx->metrics_endpoint);
- }
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- if (ctx->trace_endpoint) {
- flb_sds_destroy(ctx->trace_endpoint);
- }
-#endif /* FLB_HAVE_CHUNK_TRACE */
-
- if (ctx->fs) {
- flb_fstore_destroy(ctx->fs);
- }
-
- flb_kv_release(&ctx->kv_labels);
- flb_free(ctx);
-
- return 0;
-}
-
-/* Configuration properties map */
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "cloud_host", CALYPTIA_HOST,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_host),
- "",
- },
-
- {
- FLB_CONFIG_MAP_INT, "cloud_port", CALYPTIA_PORT,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_port),
- "",
- },
-
- {
- FLB_CONFIG_MAP_STR, "api_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, api_key),
- "Calyptia Cloud API Key."
- },
- {
- FLB_CONFIG_MAP_STR, "machine_id", NULL,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, machine_id),
- "Custom machine_id to be used when registering agent"
- },
- {
- FLB_CONFIG_MAP_STR, "fleet_id", NULL,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, fleet_id),
- "Fleet ID for identifying as part of a managed fleet"
- },
-
- {
- FLB_CONFIG_MAP_STR, "store_path", NULL,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, store_path),
- ""
- },
-
- {
- FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
- FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_calyptia, add_labels),
- "Label to append to the generated metric."
- },
-
-#ifdef FLB_HAVE_CHUNK_TRACE
- {
- FLB_CONFIG_MAP_STR, "pipeline_id", NULL,
- 0, FLB_TRUE, offsetof(struct flb_calyptia, pipeline_id),
- "Pipeline ID for calyptia core traces."
- },
-#endif
-
- /* EOF */
- {0}
-};
-
-struct flb_output_plugin out_calyptia_plugin = {
- .name = "calyptia",
- .description = "Calyptia Cloud",
- .cb_init = cb_calyptia_init,
- .cb_flush = cb_calyptia_flush,
- .cb_exit = cb_calyptia_exit,
- .config_map = config_map,
- .flags = FLB_OUTPUT_NET | FLB_OUTPUT_PRIVATE | FLB_IO_OPT_TLS,
- .event_type = FLB_OUTPUT_METRICS
-};