summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_calyptia
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/out_calyptia')
-rw-r--r--fluent-bit/plugins/out_calyptia/CMakeLists.txt4
-rw-r--r--fluent-bit/plugins/out_calyptia/calyptia.c1025
-rw-r--r--fluent-bit/plugins/out_calyptia/calyptia.h85
3 files changed, 1114 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_calyptia/CMakeLists.txt b/fluent-bit/plugins/out_calyptia/CMakeLists.txt
new file mode 100644
index 000000000..064c4b835
--- /dev/null
+++ b/fluent-bit/plugins/out_calyptia/CMakeLists.txt
@@ -0,0 +1,4 @@
+set(src
+ calyptia.c)
+
+FLB_PLUGIN(out_calyptia "${src}" "")
diff --git a/fluent-bit/plugins/out_calyptia/calyptia.c b/fluent-bit/plugins/out_calyptia/calyptia.c
new file mode 100644
index 000000000..19811dcc9
--- /dev/null
+++ b/fluent-bit/plugins/out_calyptia/calyptia.c
@@ -0,0 +1,1025 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_fstore.h>
+
+#include "calyptia.h"
+
+#include <cmetrics/cmetrics.h>
+#include <cmetrics/cmt_encode_influx.h>
+
+flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx);
+
+static int get_io_flags(struct flb_output_instance *ins)
+{
+ int flags = 0;
+
+ if (ins->use_tls) {
+ flags = FLB_IO_TLS;
+ }
+ else {
+ flags = FLB_IO_TCP;
+ }
+
+ return flags;
+}
+
+static int config_add_labels(struct flb_output_instance *ins,
+ struct flb_calyptia *ctx)
+{
+ struct mk_list *head;
+ struct flb_config_map_val *mv;
+ struct flb_slist_entry *k = NULL;
+ struct flb_slist_entry *v = NULL;
+ struct flb_kv *kv;
+
+ if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
+ return 0;
+ }
+
+ /* iterate all 'add_label' definitions */
+ flb_config_map_foreach(head, mv, ctx->add_labels) {
+ if (mk_list_size(mv->val.list) != 2) {
+ flb_plg_error(ins, "'add_label' expects a key and a value, "
+ "e.g: 'add_label version 1.8.x'");
+ return -1;
+ }
+
+ k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
+ v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
+
+ kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
+ if (!kv) {
+ flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static void append_labels(struct flb_calyptia *ctx, struct cmt *cmt)
+{
+ struct flb_kv *kv;
+ struct mk_list *head;
+
+ mk_list_foreach(head, &ctx->kv_labels) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ cmt_label_add(cmt, kv->key, kv->val);
+ }
+}
+
+static void pack_str(msgpack_packer *mp_pck, char *str)
+{
+ int len;
+
+ len = strlen(str);
+ msgpack_pack_str(mp_pck, len);
+ msgpack_pack_str_body(mp_pck, str, len);
+}
+
+static void pack_env(struct flb_env *env, char *prefix, char *key,
+ struct flb_mp_map_header *h,
+ msgpack_packer *mp_pck)
+{
+ int len = 0;
+ char *val;
+
+ /* prefix set in the key, if set, adjust the key name */
+ if (prefix) {
+ len = strlen(prefix);
+ }
+
+ val = (char *) flb_env_get(env, key);
+ if (val) {
+ flb_mp_map_header_append(h);
+ pack_str(mp_pck, key + len);
+ pack_str(mp_pck, val);
+ }
+}
+
+static void pack_env_metadata(struct flb_env *env,
+ struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
+{
+ char *tmp;
+ struct flb_mp_map_header h;
+ struct flb_mp_map_header meta;
+
+ /* Metadata */
+ flb_mp_map_header_append(mh);
+ pack_str(mp_pck, "metadata");
+
+ flb_mp_map_header_init(&meta, mp_pck);
+
+ /* Kubernetes */
+ tmp = (char *) flb_env_get(env, "k8s");
+ if (tmp && strcasecmp(tmp, "enabled") == 0) {
+ flb_mp_map_header_append(&meta);
+ pack_str(mp_pck, "k8s");
+
+ /* adding k8s map */
+ flb_mp_map_header_init(&h, mp_pck);
+
+ pack_env(env, "k8s.", "k8s.namespace", &h, mp_pck);
+ pack_env(env, "k8s.", "k8s.pod_name", &h, mp_pck);
+ pack_env(env, "k8s.", "k8s.node_name", &h, mp_pck);
+
+ flb_mp_map_header_end(&h);
+ }
+
+ /* AWS */
+ tmp = (char *) flb_env_get(env, "aws");
+ if (tmp && strcasecmp(tmp, "enabled") == 0) {
+ flb_mp_map_header_append(&meta);
+ pack_str(mp_pck, "aws");
+
+ /* adding aws map */
+ flb_mp_map_header_init(&h, mp_pck);
+
+ pack_env(env, "aws.", "aws.az", &h, mp_pck);
+ pack_env(env, "aws.", "aws.ec2_instance_id", &h, mp_pck);
+ pack_env(env, "aws.", "aws.ec2_instance_type", &h, mp_pck);
+ pack_env(env, "aws.", "aws.private_ip", &h, mp_pck);
+ pack_env(env, "aws.", "aws.vpc_id", &h, mp_pck);
+ pack_env(env, "aws.", "aws.ami_id", &h, mp_pck);
+ pack_env(env, "aws.", "aws.account_id", &h, mp_pck);
+ pack_env(env, "aws.", "aws.hostname", &h, mp_pck);
+
+ flb_mp_map_header_end(&h);
+ }
+ flb_mp_map_header_end(&meta);
+}
+
+static flb_sds_t get_agent_metadata(struct flb_calyptia *ctx)
+{
+ int len;
+ char *host;
+ flb_sds_t conf;
+ flb_sds_t meta;
+ struct flb_mp_map_header mh;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ struct flb_config *config = ctx->config;
+
+ /* init msgpack */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* pack map */
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ host = (char *) flb_env_get(ctx->env, "HOSTNAME");
+ if (!host) {
+ host = "unknown";
+ }
+ len = strlen(host);
+
+ /* name */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "name", 4);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, host, len);
+
+ /* type */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "type", 4);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "fluentbit", 9);
+
+ /* rawConfig */
+ conf = custom_calyptia_pipeline_config_get(ctx->config);
+ if (conf) {
+ flb_mp_map_header_append(&mh);
+ len = flb_sds_len(conf);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "rawConfig", 9);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, conf, len);
+ }
+ flb_sds_destroy(conf);
+
+ /* version */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "version", 7);
+ len = strlen(FLB_VERSION_STR);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, len);
+
+ /* edition */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "edition", 7);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "community", 9);
+
+ /* machineID */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "machineID", 9);
+ len = flb_sds_len(ctx->machine_id);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, ctx->machine_id, len);
+
+ /* fleetID */
+ if (ctx->fleet_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "fleetID", 7);
+ len = flb_sds_len(ctx->fleet_id);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, ctx->fleet_id, len);
+ }
+
+ /* pack environment metadata */
+ pack_env_metadata(config->env, &mh, &mp_pck);
+
+ /* finalize */
+ flb_mp_map_header_end(&mh);
+
+ /* convert to json */
+ meta = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ return meta;
+}
+
+static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
+ int type)
+{
+ int ret;
+ size_t b_sent;
+
+ /* append headers */
+ if (type == CALYPTIA_ACTION_REGISTER) {
+ flb_http_add_header(c,
+ CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
+ CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
+
+ flb_http_add_header(c,
+ CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
+ ctx->api_key, flb_sds_len(ctx->api_key));
+ }
+ else if (type == CALYPTIA_ACTION_PATCH) {
+ flb_http_add_header(c,
+ CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
+ CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
+
+ flb_http_add_header(c,
+ CALYPTIA_H_AGENT_TOKEN,
+ sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
+ ctx->agent_token, flb_sds_len(ctx->agent_token));
+ }
+ else if (type == CALYPTIA_ACTION_METRICS) {
+ flb_http_add_header(c,
+ CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
+ CALYPTIA_H_CTYPE_MSGPACK,
+ sizeof(CALYPTIA_H_CTYPE_MSGPACK) - 1);
+
+ flb_http_add_header(c,
+ CALYPTIA_H_AGENT_TOKEN,
+ sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
+ ctx->agent_token, flb_sds_len(ctx->agent_token));
+ }
+#ifdef FLB_HAVE_CHUNK_TRACE
+ else if (type == CALYPTIA_ACTION_TRACE) {
+ flb_http_add_header(c,
+ CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
+ CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
+
+ flb_http_add_header(c,
+ CALYPTIA_H_AGENT_TOKEN,
+ sizeof(CALYPTIA_H_AGENT_TOKEN) - 1,
+ ctx->agent_token, flb_sds_len(ctx->agent_token));
+ }
+#endif
+
+ /* Map debug callbacks */
+ flb_http_client_debug(c, ctx->ins->callback);
+
+ /* Perform HTTP request */
+ ret = flb_http_do(c, &b_sent);
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "http_do=%i", ret);
+ return FLB_RETRY;
+ }
+
+ if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) {
+ if (c->resp.payload_size > 0) {
+ flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
+ c->resp.status, c->resp.payload);
+ }
+ else {
+ flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status);
+ }
+
+ /* invalid metrics */
+ if (c->resp.status == 422) {
+ return FLB_ERROR;
+ }
+ return FLB_RETRY;;
+ }
+
+ return FLB_OK;
+}
+
+static flb_sds_t get_agent_info(char *buf, size_t size, char *k)
+{
+ int i;
+ int ret;
+ int type;
+ int len;
+ char *out_buf;
+ flb_sds_t v = NULL;
+ size_t off = 0;
+ size_t out_size;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object key;
+ msgpack_object val;
+
+ len = strlen(k);
+
+ ret = flb_pack_json(buf, size, &out_buf, &out_size, &type, NULL);
+ if (ret != 0) {
+ return NULL;
+ }
+
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_free(out_buf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ flb_free(out_buf);
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+
+ for (i = 0; i < root.via.map.size; i++) {
+ key = root.via.map.ptr[i].key;
+ val = root.via.map.ptr[i].val;
+
+ if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ if (key.via.str.size != len) {
+ continue;
+ }
+
+ if (strncmp(key.via.str.ptr, k, len) == 0) {
+ v = flb_sds_create_len(val.via.str.ptr, val.via.str.size);
+ break;
+ }
+ }
+
+ flb_free(out_buf);
+ msgpack_unpacked_destroy(&result);
+ return v;
+}
+
+/* Set the session content */
+static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size)
+{
+ int ret;
+ int type;
+ char *mp_buf;
+ size_t mp_size;
+
+ /* remove any previous session file */
+ if (ctx->fs_file) {
+ flb_fstore_file_delete(ctx->fs, ctx->fs_file);
+ }
+
+ /* create session file */
+ ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream,
+ CALYPTIA_SESSION_FILE, 1024);
+ if (!ctx->fs_file) {
+ flb_plg_error(ctx->ins, "could not create new session file");
+ return -1;
+ }
+
+ /* store meta */
+ flb_fstore_file_meta_set(ctx->fs, ctx->fs_file,
+ FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1);
+
+ /* encode */
+ ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type, NULL);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "could not encode session information");
+ return -1;
+ }
+
+ /* store content */
+ ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not store session information");
+ flb_free(mp_buf);
+ return -1;
+ }
+
+ flb_free(mp_buf);
+ return 0;
+}
+
+static int store_session_get(struct flb_calyptia *ctx,
+ void **out_buf, size_t *out_size)
+{
+ int ret;
+ void *buf;
+ size_t size;
+ flb_sds_t json;
+
+ ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file,
+ &buf, &size);
+
+ if (size == 0) {
+ return -1;
+ }
+
+ /* decode */
+ json = flb_msgpack_raw_to_json_sds(buf, size);
+ flb_free(buf);
+ if (!json) {
+ return -1;
+ }
+
+ *out_buf = json;
+ *out_size = flb_sds_len(json);
+
+ return ret;
+}
+
+static int store_init(struct flb_calyptia *ctx)
+{
+ int ret;
+ struct flb_fstore *fs;
+ struct flb_fstore_file *fsf;
+ void *buf;
+ size_t size;
+
+ /* store context */
+ fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS);
+ if (!fs) {
+ flb_plg_error(ctx->ins,
+ "could not initialize 'store_path': %s",
+ ctx->store_path);
+ return -1;
+ }
+ ctx->fs = fs;
+
+ /* stream */
+ ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia");
+ if (!ctx->fs_stream) {
+ flb_plg_error(ctx->ins, "could not create storage stream");
+ return -1;
+ }
+
+ /* lookup any previous file */
+ fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE,
+ sizeof(CALYPTIA_SESSION_FILE) - 1);
+ if (!fsf) {
+ flb_plg_debug(ctx->ins, "no session file was found");
+ return 0;
+ }
+ ctx->fs_file = fsf;
+
+ /* retrieve session info */
+ ret = store_session_get(ctx, &buf, &size);
+ if (ret == 0) {
+ /* agent id */
+ ctx->agent_id = get_agent_info(buf, size, "id");
+
+ /* agent token */
+ ctx->agent_token = get_agent_info(buf, size, "token");
+
+ if (ctx->agent_id && ctx->agent_token) {
+ flb_plg_info(ctx->ins, "session setup OK");
+ }
+ else {
+ if (ctx->agent_id) {
+ flb_sds_destroy(ctx->agent_id);
+ }
+ if (ctx->agent_token) {
+ flb_sds_destroy(ctx->agent_token);
+ }
+ }
+ flb_sds_destroy(buf);
+ }
+
+ return 0;
+}
+
+/* Agent creation is perform on initialization using a sync upstream connection */
+static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
+{
+ int ret;
+ int flb_ret;
+ int flags;
+ int action = CALYPTIA_ACTION_REGISTER;
+ char uri[1024];
+ flb_sds_t meta;
+ struct flb_upstream *u;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+
+ /* Meta */
+ meta = get_agent_metadata(ctx);
+ if (!meta) {
+ flb_plg_error(ctx->ins, "could not retrieve metadata");
+ return -1;
+ }
+
+ /* Upstream */
+ flags = get_io_flags(ctx->ins);
+ u = flb_upstream_create(ctx->config,
+ ctx->cloud_host, ctx->cloud_port,
+ flags, ctx->ins->tls);
+ if (!u) {
+ flb_plg_error(ctx->ins,
+ "could not create upstream connection on 'agent create'");
+ flb_sds_destroy(meta);
+ return -1;
+ }
+
+ /* Make it synchronous */
+ flb_stream_disable_async_mode(&u->base);
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(u);
+ if (!u_conn) {
+ flb_upstream_destroy(u);
+ flb_sds_destroy(meta);
+ return -1;
+ }
+
+ if (ctx->agent_id && ctx->agent_token) {
+ /* Patch */
+ action = CALYPTIA_ACTION_PATCH;
+ snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id);
+ c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri,
+ meta, flb_sds_len(meta), NULL, 0, NULL, 0);
+ }
+ else {
+ /* Create */
+ action = CALYPTIA_ACTION_REGISTER;
+ c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE,
+ meta, flb_sds_len(meta), NULL, 0, NULL, 0);
+ }
+
+ if (!c) {
+ flb_upstream_conn_release(u_conn);
+ flb_upstream_destroy(u);
+ return -1;
+ }
+
+ /* perform request */
+ flb_ret = calyptia_http_do(ctx, c, action);
+ if (flb_ret == FLB_OK &&
+ (c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) {
+ if (c->resp.payload_size > 0) {
+ if (action == CALYPTIA_ACTION_REGISTER) {
+ /* agent id */
+ ctx->agent_id = get_agent_info(c->resp.payload,
+ c->resp.payload_size,
+ "id");
+
+ /* agent token */
+ ctx->agent_token = get_agent_info(c->resp.payload,
+ c->resp.payload_size,
+ "token");
+
+ if (ctx->agent_id && ctx->agent_token) {
+ flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'",
+ ctx->agent_id);
+
+ if (ctx->store_path && ctx->fs) {
+ ret = store_session_set(ctx,
+ c->resp.payload,
+ c->resp.payload_size);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins,
+ "could not store Calyptia session");
+ }
+ }
+ }
+ }
+ }
+
+ if (action == CALYPTIA_ACTION_PATCH) {
+ flb_plg_info(ctx->ins, "known agent registration successful");
+ }
+ }
+
+ /* release resources */
+ flb_sds_destroy(meta);
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+ flb_upstream_destroy(u);
+
+ return flb_ret;
+}
+
+static struct flb_calyptia *config_init(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ int flags;
+ struct flb_calyptia *ctx;
+
+ /* Calyptia plugin context */
+ ctx = flb_calloc(1, sizeof(struct flb_calyptia));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+ ctx->config = config;
+ flb_kv_init(&ctx->kv_labels);
+
+ /* Load the config map */
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* api_key */
+ if (!ctx->api_key) {
+ flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* parse 'add_label' */
+ ret = config_add_labels(ins, ctx);
+ if (ret == -1) {
+ return NULL;
+ }
+
+ /* env reader */
+ ctx->env = flb_env_create();
+
+ /* Set context */
+ flb_output_set_context(ins, ctx);
+
+ /* Initialize optional storage */
+ if (ctx->store_path) {
+ ret = store_init(ctx);
+ if (ret == -1) {
+ return NULL;
+ }
+ }
+
+ /* the machine-id is provided by custom calyptia, which invokes this plugin. */
+ if (!ctx->machine_id) {
+ flb_plg_error(ctx->ins, "machine_id has not been set");
+ return NULL;
+ }
+
+ flb_plg_debug(ctx->ins, "machine_id=%s", ctx->machine_id);
+
+ /* Upstream */
+ flags = get_io_flags(ctx->ins);
+ ctx->u = flb_upstream_create(ctx->config,
+ ctx->cloud_host, ctx->cloud_port,
+ flags, ctx->ins->tls);
+ if (!ctx->u) {
+ return NULL;
+ }
+
+ /* Set instance flags into upstream */
+ flb_output_upstream_set(ctx->u, ins);
+
+ return ctx;
+}
+
+static int cb_calyptia_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ struct flb_calyptia *ctx;
+ (void) data;
+
+ /* create config context */
+ ctx = config_init(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "could not initialize configuration");
+ return -1;
+ }
+
+ /*
+ * This plugin instance uses the HTTP client interface, let's register
+ * it debugging callbacks.
+ */
+ flb_output_set_http_debug_callbacks(ins);
+
+ /* register/update agent */
+ ret = api_agent_create(config, ctx);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "agent registration failed");
+ return -1;
+ }
+
+ /* metrics endpoint */
+ ctx->metrics_endpoint = flb_sds_create_size(256);
+ flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
+ ctx->agent_id);
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+ ctx->trace_endpoint = flb_sds_create_size(256);
+ flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
+ ctx->pipeline_id);
+#endif /* FLB_HAVE_CHUNK_TRACE */
+ return 0;
+}
+
+static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
+{
+ int ret;
+ size_t off = 0;
+ struct cmt *cmt;
+ cfl_sds_t out;
+
+ ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off);
+ if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
+ flb_plg_warn(ctx->ins, "could not unpack debug payload");
+ return;
+ }
+
+ out = cmt_encode_text_create(cmt);
+ flb_plg_info(ctx->ins, "debug payload:\n%s", out);
+ cmt_encode_text_destroy(out);
+ cmt_destroy(cmt);
+}
+
+static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ size_t off = 0;
+ size_t out_size = 0;
+ char *out_buf = NULL;
+
+/* used to create records for reporting traces to the cloud. */
+#ifdef FLB_HAVE_CHUNK_TRACE
+ flb_sds_t json;
+#endif /* FLB_HAVE_CHUNK_TRACE */
+
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ struct flb_calyptia *ctx = out_context;
+ struct cmt *cmt;
+ (void) i_ins;
+ (void) config;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
+ /* if we have labels append them */
+ if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) {
+ ret = cmt_decode_msgpack_create(&cmt,
+ (char *) event_chunk->data,
+ event_chunk->size,
+ &off);
+ if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ /* append labels set by config */
+ append_labels(ctx, cmt);
+
+ /* encode back to msgpack */
+ ret = cmt_encode_msgpack_create(cmt, &out_buf, &out_size);
+ if (ret != 0) {
+ cmt_destroy(cmt);
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ cmt_destroy(cmt);
+ }
+ else {
+ out_buf = (char *) event_chunk->data;
+ out_size = event_chunk->size;
+ }
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
+ out_buf, out_size, NULL, 0, NULL, 0);
+ if (!c) {
+ if (out_buf != event_chunk->data) {
+ cmt_encode_msgpack_destroy(out_buf);
+ }
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
+ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
+ if (ret == FLB_OK) {
+ flb_plg_debug(ctx->ins, "metrics delivered OK");
+ }
+ else if (ret == FLB_ERROR) {
+ flb_plg_error(ctx->ins, "could not deliver metrics");
+ debug_payload(ctx, out_buf, out_size);
+ }
+
+ if (out_buf != event_chunk->data) {
+ cmt_encode_msgpack_destroy(out_buf);
+ }
+ }
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+ if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
+ json = flb_pack_msgpack_to_json_format(event_chunk->data,
+ event_chunk->size,
+ FLB_PACK_JSON_FORMAT_STREAM,
+ FLB_PACK_JSON_DATE_DOUBLE,
+ NULL);
+ if (json == NULL) {
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ out_buf = (char *)json;
+ out_size = flb_sds_len(json);
+
+ if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
+ ctx->agent_id) == NULL) {
+ flb_upstream_conn_release(u_conn);
+ flb_sds_destroy(json);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
+ out_buf, out_size, NULL, 0, NULL, 0);
+ if (!c) {
+ flb_upstream_conn_release(u_conn);
+ flb_sds_destroy(json);
+ flb_sds_destroy(ctx->metrics_endpoint);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
+ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
+ if (ret == FLB_OK) {
+ flb_plg_debug(ctx->ins, "trace delivered OK");
+ }
+ else if (ret == FLB_ERROR) {
+ flb_plg_error(ctx->ins, "could not deliver trace");
+ debug_payload(ctx, out_buf, out_size);
+ }
+ flb_sds_destroy(json);
+ }
+#endif /* FLB_HAVE_CHUNK_TRACE */
+
+ flb_upstream_conn_release(u_conn);
+ flb_http_client_destroy(c);
+ FLB_OUTPUT_RETURN(ret);
+}
+
+static int cb_calyptia_exit(void *data, struct flb_config *config)
+{
+ struct flb_calyptia *ctx = data;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ }
+
+ if (ctx->agent_id) {
+ flb_sds_destroy(ctx->agent_id);
+ }
+
+ if (ctx->agent_token) {
+ flb_sds_destroy(ctx->agent_token);
+ }
+
+ if (ctx->env) {
+ flb_env_destroy(ctx->env);
+ }
+
+ if (ctx->metrics_endpoint) {
+ flb_sds_destroy(ctx->metrics_endpoint);
+ }
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+ if (ctx->trace_endpoint) {
+ flb_sds_destroy(ctx->trace_endpoint);
+ }
+#endif /* FLB_HAVE_CHUNK_TRACE */
+
+ if (ctx->fs) {
+ flb_fstore_destroy(ctx->fs);
+ }
+
+ flb_kv_release(&ctx->kv_labels);
+ flb_free(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "cloud_host", CALYPTIA_HOST,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_host),
+ "",
+ },
+
+ {
+ FLB_CONFIG_MAP_INT, "cloud_port", CALYPTIA_PORT,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_port),
+ "",
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "api_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, api_key),
+ "Calyptia Cloud API Key."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "machine_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, machine_id),
+ "Custom machine_id to be used when registering agent"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "fleet_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, fleet_id),
+ "Fleet ID for identifying as part of a managed fleet"
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "store_path", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, store_path),
+ ""
+ },
+
+ {
+ FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_calyptia, add_labels),
+ "Label to append to the generated metric."
+ },
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+ {
+ FLB_CONFIG_MAP_STR, "pipeline_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_calyptia, pipeline_id),
+ "Pipeline ID for calyptia core traces."
+ },
+#endif
+
+ /* EOF */
+ {0}
+};
+
+struct flb_output_plugin out_calyptia_plugin = {
+ .name = "calyptia",
+ .description = "Calyptia Cloud",
+ .cb_init = cb_calyptia_init,
+ .cb_flush = cb_calyptia_flush,
+ .cb_exit = cb_calyptia_exit,
+ .config_map = config_map,
+ .flags = FLB_OUTPUT_NET | FLB_OUTPUT_PRIVATE | FLB_IO_OPT_TLS,
+ .event_type = FLB_OUTPUT_METRICS
+};
diff --git a/fluent-bit/plugins/out_calyptia/calyptia.h b/fluent-bit/plugins/out_calyptia/calyptia.h
new file mode 100644
index 000000000..db640ff10
--- /dev/null
+++ b/fluent-bit/plugins/out_calyptia/calyptia.h
@@ -0,0 +1,85 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_CALYPTIA_H
+#define FLB_OUT_CALYPTIA_H
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/flb_fstore.h>
+
+/* End point */
+#define CALYPTIA_HOST "cloud-api.calyptia.com"
+#define CALYPTIA_PORT "443"
+
+/* HTTP action types */
+#define CALYPTIA_ACTION_REGISTER 0
+#define CALYPTIA_ACTION_PATCH 1
+#define CALYPTIA_ACTION_METRICS 2
+#define CALYPTIA_ACTION_TRACE 3
+
+/* Endpoints */
+#define CALYPTIA_ENDPOINT_CREATE "/v1/agents"
+#define CALYPTIA_ENDPOINT_PATCH "/v1/agents/%s"
+#define CALYPTIA_ENDPOINT_METRICS "/v1/agents/%s/metrics"
+#define CALYPTIA_ENDPOINT_TRACE "/v1/traces/%s"
+
+/* Storage */
+#define CALYPTIA_SESSION_FILE "session.CALYPTIA"
+
+/* Headers */
+#define CALYPTIA_H_PROJECT "X-Project-Token"
+#define CALYPTIA_H_AGENT_TOKEN "X-Agent-Token"
+#define CALYPTIA_H_CTYPE "Content-Type"
+#define CALYPTIA_H_CTYPE_JSON "application/json"
+#define CALYPTIA_H_CTYPE_MSGPACK "application/x-msgpack"
+
+struct flb_calyptia {
+ /* config map */
+ int cloud_port;
+ flb_sds_t api_key;
+ flb_sds_t cloud_host;
+ flb_sds_t store_path;
+
+ /* config reader for 'add_label' */
+ struct mk_list *add_labels;
+
+ /* internal */
+ flb_sds_t agent_id;
+ flb_sds_t agent_token;
+ flb_sds_t machine_id; /* machine-id */
+ flb_sds_t fleet_id; /* fleet-id */
+ flb_sds_t metrics_endpoint; /* metrics endpoint */
+ struct flb_fstore *fs; /* fstore ctx */
+ struct flb_fstore_stream *fs_stream; /* fstore stream */
+ struct flb_fstore_file *fs_file; /* fstore session file */
+ struct flb_env *env; /* environment */
+ struct flb_upstream *u; /* upstream connection */
+ struct mk_list kv_labels; /* parsed add_labels */
+ struct flb_output_instance *ins; /* plugin instance */
+ struct flb_config *config; /* Fluent Bit context */
+/* used for reporting chunk trace records to calyptia cloud. */
+#ifdef FLB_HAVE_CHUNK_TRACE
+ flb_sds_t trace_endpoint;
+ flb_sds_t pipeline_id;
+#endif /* FLB_HAVE_CHUNK_TRACE */
+};
+
+#endif