summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c')
-rw-r--r--src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c1269
1 files changed, 1269 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c
new file mode 100644
index 000000000..3175b5645
--- /dev/null
+++ b/src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c
@@ -0,0 +1,1269 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+#include <msgpack.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_strptime.h>
+#include <fluent-bit/flb_reload.h>
+#include <fluent-bit/flb_lib.h>
+#include <fluent-bit/config_format/flb_cf_fluentbit.h>
+#include <fluent-bit/flb_base64.h>
+
+
+#define CALYPTIA_H_PROJECT "X-Project-Token"
+#define CALYPTIA_H_CTYPE "Content-Type"
+#define CALYPTIA_H_CTYPE_JSON "application/json"
+
+#define DEFAULT_INTERVAL_SEC "15"
+#define DEFAULT_INTERVAL_NSEC "0"
+
+#define CALYPTIA_HOST "cloud-api.calyptia.com"
+#define CALYPTIA_PORT "443"
+
+#ifndef _WIN32
+#define PATH_SEPARATOR "/"
+#define DEFAULT_CONFIG_DIR "/tmp/calyptia-fleet"
+#else
+#define DEFAULT_CONFIG_DIR NULL
+#define PATH_SEPARATOR "\\"
+#endif
+
+struct flb_in_calyptia_fleet_config {
+ /* Time interval check */
+ int interval_sec;
+ int interval_nsec;
+
+ /* Grabbed from the cfg_path, used to check if configuration has
+ * has been updated.
+ */
+ long config_timestamp;
+
+ flb_sds_t api_key;
+ flb_sds_t fleet_id;
+ flb_sds_t fleet_name;
+ flb_sds_t machine_id;
+ flb_sds_t config_dir;
+ flb_sds_t cloud_host;
+ flb_sds_t cloud_port;
+
+ flb_sds_t fleet_url;
+
+ struct flb_input_instance *ins; /* plugin instance */
+ struct flb_config *config; /* Fluent Bit context */
+
+ /* Networking */
+ struct flb_upstream *u;
+
+ int event_fd;
+
+ int collect_fd;
+};
+
+static char *find_case_header(struct flb_http_client *cli, const char *header)
+{
+ char *ptr;
+ char *headstart;
+
+
+ headstart = strstr(cli->resp.data, "\r\n");
+
+ if (headstart == NULL) {
+ return NULL;
+ }
+
+ /* Lookup the beginning of the header */
+ for (ptr = headstart; ptr != NULL && ptr+2 < cli->resp.payload; ptr = strstr(ptr, "\r\n")) {
+
+ if (ptr + 4 < cli->resp.payload && strcmp(ptr, "\r\n\r\n") == 0) {
+ return NULL;
+ }
+
+ ptr+=2;
+
+ /* no space left for header */
+ if (ptr + strlen(header)+2 >= cli->resp.payload) {
+ return NULL;
+ }
+
+ /* matched header and the delimiter */
+ if (strncasecmp(ptr, header, strlen(header)) == 0) {
+
+ if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') {
+ return ptr;
+ }
+ }
+ }
+
+ return NULL;
+}
+
+/* Try to find a header value in the buffer. Copied from flb_http_client.c. */
+static int case_header_lookup(struct flb_http_client *cli,
+ const char *header, int header_len,
+ const char **out_val, int *out_len)
+{
+ char *ptr;
+ char *crlf;
+ char *end;
+
+ if (!cli->resp.data) {
+ return -1;
+ }
+
+ ptr = find_case_header(cli, header);
+ end = strstr(cli->resp.data, "\r\n\r\n");
+
+ if (!ptr) {
+
+ if (end) {
+ /* The headers are complete but the header is not there */
+ return -1;
+ }
+
+ /* We need more data */
+ return -1;
+ }
+
+ /* Exclude matches in the body */
+ if (end && ptr > end) {
+ return -1;
+ }
+
+ /* Lookup CRLF (end of line \r\n) */
+ crlf = strstr(ptr, "\r\n");
+
+ if (!crlf) {
+ return -1;
+ }
+
+ /* sanity check that the header_len does not exceed the headers. */
+ if (ptr + header_len + 2 > end) {
+ return -1;
+ }
+
+ ptr += header_len + 2;
+
+ *out_val = ptr;
+ *out_len = (crlf - ptr);
+
+ return 0;
+}
+
+struct reload_ctx {
+ flb_ctx_t *flb;
+ flb_sds_t cfg_path;
+};
+
+static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname)
+{
+ flb_sds_t cfgname;
+
+ cfgname = flb_sds_create_size(4096);
+
+ if (ctx->fleet_name != NULL) {
+ flb_sds_printf(&cfgname,
+ "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini",
+ ctx->config_dir, ctx->machine_id, ctx->fleet_name, fname);
+ }
+ else {
+ flb_sds_printf(&cfgname,
+ "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini",
+ ctx->config_dir, ctx->machine_id, ctx->fleet_id, fname);
+ }
+
+ return cfgname;
+}
+
+static flb_sds_t new_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx)
+{
+ return fleet_config_filename(ctx, "new");
+}
+
+static flb_sds_t cur_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx)
+{
+ return fleet_config_filename(ctx, "cur");
+}
+
+static flb_sds_t old_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx)
+{
+ return fleet_config_filename(ctx, "old");
+}
+
+static flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t)
+{
+ char s_last_modified[32];
+
+ snprintf(s_last_modified, sizeof(s_last_modified)-1, "%d", (int)t);
+ return fleet_config_filename(ctx, s_last_modified);
+}
+
+static int is_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
+{
+ flb_sds_t cfgnewname;
+ int ret = FLB_FALSE;
+
+
+ if (cfg->conf_path_file == NULL) {
+ return FLB_FALSE;
+ }
+
+ cfgnewname = new_fleet_config_filename(ctx);
+
+ if (strcmp(cfgnewname, cfg->conf_path_file) == 0) {
+ ret = FLB_TRUE;
+ }
+
+ flb_sds_destroy(cfgnewname);
+
+ return ret;
+}
+
+static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
+{
+ flb_sds_t cfgcurname;
+ int ret = FLB_FALSE;
+
+
+ if (cfg->conf_path_file == NULL) {
+ return FLB_FALSE;
+ }
+
+ cfgcurname = cur_fleet_config_filename(ctx);
+
+ if (strcmp(cfgcurname, cfg->conf_path_file) == 0) {
+ ret = FLB_TRUE;
+ }
+
+ flb_sds_destroy(cfgcurname);
+
+ return ret;
+}
+
+static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
+{
+ char *fname;
+ char *end;
+ long val;
+
+ if (cfg->conf_path_file == NULL) {
+ return FLB_FALSE;
+ }
+
+ fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]);
+
+ if (fname == NULL) {
+ return FLB_FALSE;
+ }
+
+ fname++;
+
+ errno = 0;
+ val = strtol(fname, &end, 10);
+
+ if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) ||
+ (errno != 0 && val == 0)) {
+ flb_errno();
+ return FLB_FALSE;
+ }
+
+ if (strcmp(end, ".ini") == 0) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
+{
+ if (cfg->conf_path_file == NULL) {
+ return FLB_FALSE;
+ }
+
+ return is_new_fleet_config(ctx, cfg) ||
+ is_cur_fleet_config(ctx, cfg) ||
+ is_timestamped_fleet_config(ctx, cfg);
+}
+
+static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
+{
+ flb_sds_t cfgnewname;
+ int ret = FLB_FALSE;
+
+
+ cfgnewname = new_fleet_config_filename(ctx);
+ ret = access(cfgnewname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
+
+ flb_sds_destroy(cfgnewname);
+ return ret;
+}
+
+static int exists_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
+{
+ flb_sds_t cfgcurname;
+ int ret = FLB_FALSE;
+
+
+ cfgcurname = cur_fleet_config_filename(ctx);
+ ret = access(cfgcurname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
+
+ flb_sds_destroy(cfgcurname);
+ return ret;
+}
+
+static void *do_reload(void *data)
+{
+ struct reload_ctx *reload = (struct reload_ctx *)data;
+
+ /* avoid reloading the current configuration... just use our new one! */
+ flb_context_set(reload->flb);
+ reload->flb->config->enable_hot_reload = FLB_TRUE;
+ reload->flb->config->conf_path_file = reload->cfg_path;
+
+ sleep(5);
+#ifndef FLB_SYSTEM_WINDOWS
+ kill(getpid(), SIGHUP);
+#else
+ GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0);
+#endif
+ return NULL;
+}
+
+static int test_config_is_valid(flb_sds_t cfgpath)
+{
+ struct flb_config *config;
+ struct flb_cf *conf;
+ int ret = FLB_FALSE;
+
+
+ config = flb_config_init();
+
+ if (config == NULL) {
+ goto config_init_error;
+ }
+
+ conf = flb_cf_create();
+
+ if (conf == NULL) {
+ goto cf_create_error;
+ }
+
+ conf = flb_cf_create_from_file(conf, cfgpath);
+
+ if (conf == NULL) {
+ goto cf_create_from_file_error;
+ }
+
+ if (flb_config_load_config_format(config, conf)) {
+ goto cf_load_config_format_error;
+ }
+
+ if (flb_reload_property_check_all(config)) {
+ goto cf_property_check_error;
+ }
+
+ ret = FLB_TRUE;
+
+cf_property_check_error:
+cf_load_config_format_error:
+cf_create_from_file_error:
+ flb_cf_destroy(conf);
+cf_create_error:
+ flb_config_exit(config);
+config_init_error:
+ return ret;
+}
+
+static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgpath)
+{
+ struct reload_ctx *reload;
+ pthread_t pth;
+ pthread_attr_t ptha;
+ flb_ctx_t *flb = flb_context_get();
+
+ if (ctx->collect_fd > 0) {
+ flb_input_collector_pause(ctx->collect_fd, ctx->ins);
+ }
+
+ if (flb == NULL) {
+ flb_plg_error(ctx->ins, "unable to get fluent-bit context.");
+
+ if (ctx->collect_fd > 0) {
+ flb_input_collector_resume(ctx->collect_fd, ctx->ins);
+ }
+
+ return FLB_FALSE;
+ }
+
+ /* fix execution in valgrind...
+ * otherwise flb_reload errors out with:
+ * [error] [reload] given flb context is NULL
+ */
+ flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath);
+
+ if (test_config_is_valid(cfgpath) == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "unable to load configuration.");
+
+ if (ctx->collect_fd > 0) {
+ flb_input_collector_resume(ctx->collect_fd, ctx->ins);
+ }
+
+ return FLB_FALSE;
+ }
+
+ reload = flb_calloc(1, sizeof(struct reload_ctx));
+ reload->flb = flb;
+ reload->cfg_path = cfgpath;
+
+ pthread_attr_init(&ptha);
+ pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED);
+ pthread_create(&pth, &ptha, do_reload, reload);
+
+ return FLB_TRUE;
+}
+
+static char *tls_setting_string(int use_tls)
+{
+ if (use_tls) {
+ return "On";
+ }
+
+ return "Off";
+}
+
+static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx,
+ char *payload, size_t size)
+{
+ int ret;
+ int out_size;
+ char *pack;
+ struct flb_pack_state pack_state;
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object_kv *cur;
+ msgpack_object_str *key;
+ flb_sds_t project_id;
+ int idx = 0;
+
+ /* Initialize packer */
+ flb_pack_state_init(&pack_state);
+
+ /* Pack JSON as msgpack */
+ ret = flb_pack_json_state(payload, size,
+ &pack, &out_size, &pack_state);
+ flb_pack_state_reset(&pack_state);
+
+ /* Handle exceptions */
+ if (ret == FLB_ERR_JSON_PART) {
+ flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
+ return NULL;
+ }
+ else if (ret == FLB_ERR_JSON_INVAL) {
+ flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
+ return NULL;
+ }
+ else if (ret == -1) {
+ return NULL;
+ }
+
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) {
+
+ if (result.data.type == MSGPACK_OBJECT_MAP) {
+ for (idx = 0; idx < result.data.via.map.size; idx++) {
+ cur = &result.data.via.map.ptr[idx];
+ key = &cur->key.via.str;
+
+ if (strncmp(key->ptr, "ProjectID", key->size) == 0) {
+
+ if (cur->val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "unable to find fleet by name");
+ msgpack_unpacked_destroy(&result);
+ return NULL;
+ }
+
+ project_id = flb_sds_create_len(cur->val.via.str.ptr,
+ cur->val.via.str.size);
+ msgpack_unpacked_destroy(&result);
+ flb_free(pack);
+
+ return project_id;
+ }
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+ flb_free(pack);
+
+ return NULL;
+}
+
+static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx,
+ char *payload, size_t size)
+{
+ int ret;
+ int out_size;
+ char *pack;
+ struct flb_pack_state pack_state;
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object_array *results;
+ msgpack_object_kv *cur;
+ msgpack_object_str *key;
+ int idx = 0;
+
+ /* Initialize packer */
+ flb_pack_state_init(&pack_state);
+
+ /* Pack JSON as msgpack */
+ ret = flb_pack_json_state(payload, size,
+ &pack, &out_size, &pack_state);
+ flb_pack_state_reset(&pack_state);
+
+ /* Handle exceptions */
+ if (ret == FLB_ERR_JSON_PART) {
+ flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
+ return -1;
+ }
+ else if (ret == FLB_ERR_JSON_INVAL) {
+ flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
+ return -1;
+ }
+ else if (ret == -1) {
+ return -1;
+ }
+
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) {
+
+ if (result.data.type == MSGPACK_OBJECT_ARRAY) {
+ results = &result.data.via.array;
+
+ if (results->ptr[0].type == MSGPACK_OBJECT_MAP) {
+
+ for (idx = 0; idx < results->ptr[0].via.map.size; idx++) {
+ cur = &results->ptr[0].via.map.ptr[idx];
+ key = &cur->key.via.str;
+
+ if (strncasecmp(key->ptr, "id", key->size) == 0) {
+
+ if (cur->val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "unable to find fleet by name");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ ctx->fleet_id = flb_sds_create_len(cur->val.via.str.ptr,
+ cur->val.via.str.size);
+ break;
+ }
+ break;
+ }
+ break;
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+ flb_free(pack);
+
+ if (ctx->fleet_id == NULL) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx,
+ struct flb_connection *u_conn,
+ struct flb_config *config)
+{
+ struct flb_http_client *client;
+ flb_sds_t url;
+ flb_sds_t project_id;
+ unsigned char token[512] = {0};
+ unsigned char encoded[256];
+ size_t elen;
+ size_t tlen;
+ char *api_token_sep;
+ size_t b_sent;
+ int ret;
+
+ api_token_sep = strchr(ctx->api_key, '.');
+
+ if (api_token_sep == NULL) {
+ return -1;
+ }
+
+ elen = api_token_sep-ctx->api_key;
+ elen = elen + (4 - (elen % 4));
+
+ if (elen > sizeof(encoded)) {
+ flb_plg_error(ctx->ins, "API Token is too large");
+ return -1;
+ }
+
+ memset(encoded, '=', sizeof(encoded));
+ memcpy(encoded, ctx->api_key, api_token_sep-ctx->api_key);
+
+ ret = flb_base64_decode(token, sizeof(token)-1, &tlen,
+ encoded, elen);
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ project_id = parse_api_key_json(ctx, (char *)token, tlen);
+
+ if (project_id == NULL) {
+ return -1;
+ }
+
+ url = flb_sds_create_size(4096);
+ flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s",
+ project_id, ctx->fleet_name);
+
+ client = flb_http_client(u_conn, FLB_HTTP_GET, url, NULL, 0,
+ ctx->ins->host.name, ctx->ins->host.port, NULL, 0);
+
+ if (!client) {
+ flb_plg_error(ctx->ins, "unable to create http client");
+ return -1;
+ }
+
+ flb_http_buffer_size(client, 8192);
+
+ flb_http_add_header(client,
+ CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
+ ctx->api_key, flb_sds_len(ctx->api_key));
+
+ ret = flb_http_do(client, &b_sent);
+
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "http do error");
+ return -1;
+ }
+
+ if (client->resp.status != 200) {
+ flb_plg_error(ctx->ins, "search http status code error: %d", client->resp.status);
+ return -1;
+ }
+
+ if (client->resp.payload_size <= 0) {
+ flb_plg_error(ctx->ins, "empty response");
+ return -1;
+ }
+
+ if (parse_fleet_search_json(ctx, client->resp.payload, client->resp.payload_size) == -1) {
+ flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
+ return -1;
+ }
+
+ if (ctx->fleet_id == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef FLB_SYSTEM_WINDOWS
+#define link(a, b) CreateHardLinkA(b, a, 0)
+
+ssize_t readlink(const char *path, char *realpath, size_t srealpath) {
+ HANDLE hFile;
+ DWORD ret;
+
+ hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL, NULL);
+
+ if (hFile == INVALID_HANDLE_VALUE) {
+ return -1;
+ }
+
+ ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT);
+
+ if (ret < srealpath) {
+ CloseHandle(hFile);
+ return -1;
+ }
+
+ CloseHandle(hFile);
+ return ret;
+}
+
+#endif
+
+/* cb_collect callback */
+static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
+ struct flb_config *config,
+ void *in_context)
+{
+ struct flb_in_calyptia_fleet_config *ctx = in_context;
+ struct flb_connection *u_conn;
+ struct flb_http_client *client;
+ flb_sds_t cfgname;
+ flb_sds_t cfgnewname;
+ flb_sds_t cfgoldname;
+ flb_sds_t cfgcurname;
+ flb_sds_t header;
+ flb_sds_t hdr;
+ FILE *cfgfp;
+ const char *fbit_last_modified;
+ int fbit_last_modified_len;
+ struct flb_tm tm_last_modified = { 0 };
+ time_t time_last_modified;
+ char *data;
+ size_t b_sent;
+ int ret = -1;
+#ifdef FLB_SYSTEM_WINDOWS
+ DWORD err;
+ LPSTR lpMsg;
+#endif
+
+ u_conn = flb_upstream_conn_get(ctx->u);
+
+ if (!u_conn) {
+ flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u",
+ ctx->ins->host.name, ctx->ins->host.port);
+ goto conn_error;
+ }
+
+ if (ctx->fleet_id == NULL) {
+
+ if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) {
+ flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
+ goto conn_error;
+ }
+ }
+
+ if (ctx->fleet_url == NULL) {
+ ctx->fleet_url = flb_sds_create_size(4096);
+ flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id);
+ }
+
+ client = flb_http_client(u_conn, FLB_HTTP_GET, ctx->fleet_url,
+ NULL, 0,
+ ctx->ins->host.name, ctx->ins->host.port, NULL, 0);
+
+ if (!client) {
+ flb_plg_error(ins, "unable to create http client");
+ goto client_error;
+ }
+
+ flb_http_buffer_size(client, 8192);
+
+ flb_http_add_header(client,
+ CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
+ ctx->api_key, flb_sds_len(ctx->api_key));
+
+ ret = flb_http_do(client, &b_sent);
+
+ if (ret != 0) {
+ flb_plg_error(ins, "http do error");
+ goto http_error;
+ }
+
+ if (client->resp.status != 200) {
+ flb_plg_error(ins, "http status code error: %d", client->resp.status);
+ goto http_error;
+ }
+
+ if (client->resp.payload_size <= 0) {
+ flb_plg_error(ins, "empty response");
+ goto http_error;
+ }
+
+ /* copy and NULL terminate the payload */
+ data = flb_sds_create_size(client->resp.payload_size + 1);
+
+ if (!data) {
+ goto http_error;
+ }
+ memcpy(data, client->resp.payload, client->resp.payload_size);
+ data[client->resp.payload_size] = '\0';
+
+ ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"),
+ &fbit_last_modified, &fbit_last_modified_len);
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "unable to get last-modified header");
+ goto http_error;
+ }
+
+ flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified);
+ time_last_modified = mktime(&tm_last_modified.tm);
+
+ cfgname = time_fleet_config_filename(ctx, time_last_modified);
+
+ if (access(cfgname, F_OK) == -1 && errno == ENOENT) {
+ cfgfp = fopen(cfgname, "w+");
+
+ if (cfgfp == NULL) {
+ flb_plg_error(ctx->ins, "unable to open configuration file: %s", cfgname);
+ goto http_error;
+ }
+
+ header = flb_sds_create_size(4096);
+
+ if (ctx->fleet_name == NULL) {
+ hdr = flb_sds_printf(&header,
+ "[CUSTOM]\n"
+ " Name calyptia\n"
+ " api_key %s\n"
+ " fleet_id %s\n"
+ " add_label fleet_id %s\n"
+ " fleet.config_dir %s\n"
+ " calyptia_host %s\n"
+ " calyptia_port %d\n"
+ " calyptia_tls %s\n",
+ ctx->api_key,
+ ctx->fleet_id,
+ ctx->fleet_id,
+ ctx->config_dir,
+ ctx->ins->host.name,
+ ctx->ins->host.port,
+ tls_setting_string(ctx->ins->use_tls)
+ );
+ }
+ else {
+ hdr = flb_sds_printf(&header,
+ "[CUSTOM]\n"
+ " Name calyptia\n"
+ " api_key %s\n"
+ " fleet_name %s\n"
+ " fleet_id %s\n"
+ " add_label fleet_id %s\n"
+ " fleet.config_dir %s\n"
+ " calyptia_host %s\n"
+ " calyptia_port %d\n"
+ " calyptia_tls %s\n",
+ ctx->api_key,
+ ctx->fleet_name,
+ ctx->fleet_id,
+ ctx->fleet_id,
+ ctx->config_dir,
+ ctx->ins->host.name,
+ ctx->ins->host.port,
+ tls_setting_string(ctx->ins->use_tls)
+ );
+ }
+ if (hdr == NULL) {
+ fclose(cfgfp);
+ goto http_error;
+ }
+ if (ctx->machine_id) {
+ hdr = flb_sds_printf(&header, " machine_id %s\n", ctx->machine_id);
+ if (hdr == NULL) {
+ fclose(cfgfp);
+ goto http_error;
+ }
+ }
+ fwrite(header, strlen(header), 1, cfgfp);
+ flb_sds_destroy(header);
+ fwrite(data, client->resp.payload_size, 1, cfgfp);
+ fclose(cfgfp);
+
+ cfgnewname = new_fleet_config_filename(ctx);
+
+ if (exists_new_fleet_config(ctx) == FLB_TRUE) {
+ cfgoldname = old_fleet_config_filename(ctx);
+ rename(cfgnewname, cfgoldname);
+ unlink(cfgnewname);
+ flb_sds_destroy(cfgoldname);
+ }
+
+ if (!link(cfgname, cfgnewname)) {
+#ifdef FLB_SYSTEM_WINDOWS
+ err = GetLastError();
+ FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, err, 0, &lpMsg, 0, NULL);
+ flb_plg_error(ctx->ins, "unable to create hard link: %s", lpMsg);
+#else
+ flb_errno();
+#endif
+ }
+ }
+
+ if (ctx->config_timestamp < time_last_modified) {
+ flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld",
+ ctx->config_timestamp, time_last_modified);
+ flb_plg_info(ctx->ins, "force the reloading of the configuration file=%d.", ctx->event_fd);
+ flb_sds_destroy(data);
+
+ if (execute_reload(ctx, cfgname) == FLB_FALSE) {
+ cfgoldname = old_fleet_config_filename(ctx);
+ cfgcurname = cur_fleet_config_filename(ctx);
+ rename(cfgoldname, cfgcurname);
+ flb_sds_destroy(cfgcurname);
+ flb_sds_destroy(cfgoldname);
+ goto reload_error;
+ }
+ else {
+ FLB_INPUT_RETURN(0);
+ }
+ }
+
+ ret = 0;
+
+reload_error:
+http_error:
+ flb_http_client_destroy(client);
+client_error:
+ flb_upstream_conn_release(u_conn);
+conn_error:
+ FLB_INPUT_RETURN(ret);
+}
+
+#ifdef FLB_SYSTEM_WINDOWS
+#define _mkdir(a, b) mkdir(a)
+#else
+#define _mkdir(a, b) mkdir(a, b)
+#endif
+
+/* recursively create directories, based on:
+ * https://stackoverflow.com/a/2336245
+ * who found it at:
+ * http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html
+ */
+static int __mkdir(const char *dir, int perms) {
+ char tmp[255];
+ char *ptr = NULL;
+ size_t len;
+ int ret;
+
+ ret = snprintf(tmp, sizeof(tmp),"%s",dir);
+ if (ret > sizeof(tmp)) {
+ return -1;
+ }
+
+ len = strlen(tmp);
+ if (tmp[len - 1] == '/') {
+ tmp[len - 1] = 0;
+ }
+
+ for (ptr = tmp + 1; *ptr; ptr++) {
+ if (*ptr == '/') {
+ *ptr = 0;
+ if (access(tmp, F_OK) != 0) {
+ ret = _mkdir(tmp, perms);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ *ptr = '/';
+ }
+ }
+ return _mkdir(tmp, perms);
+}
+
+static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
+{
+ flb_sds_t myfleetdir;
+
+ if (access(ctx->config_dir, F_OK) != 0) {
+ if (__mkdir(ctx->config_dir, 0700) != 0) {
+ return -1;
+ }
+ }
+
+ myfleetdir = flb_sds_create_size(256);
+
+ if (ctx->fleet_name != NULL) {
+ flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s",
+ ctx->config_dir, ctx->machine_id, ctx->fleet_name);
+ }
+ else {
+ flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s",
+ ctx->config_dir, ctx->machine_id, ctx->fleet_id);
+ }
+
+ if (access(myfleetdir, F_OK) != 0) {
+ if (__mkdir(myfleetdir, 0700) !=0) {
+ return -1;
+ }
+ }
+
+ flb_sds_destroy(myfleetdir);
+ return 0;
+}
+
+static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
+{
+ flb_ctx_t *flb_ctx = flb_context_get();
+ char *fname;
+ char *ext;
+ long timestamp;
+ char realname[4096];
+ ssize_t len;
+
+ if (create_fleet_directory(ctx) != 0) {
+ return -1;
+ }
+
+ /* check if we are already using the fleet configuration file. */
+ if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) {
+ /* check which one and load it */
+ if (exists_cur_fleet_config(ctx) == FLB_TRUE) {
+ return execute_reload(ctx, cur_fleet_config_filename(ctx));
+ }
+ else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
+ return execute_reload(ctx, new_fleet_config_filename(ctx));
+ }
+ }
+ else {
+ if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) {
+ len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname));
+
+ if (len > sizeof(realname)) {
+ return FLB_FALSE;
+ }
+
+ fname = basename(realname);
+ }
+ else {
+ fname = basename(flb_ctx->config->conf_path_file);
+ }
+
+ if (fname == NULL) {
+ return FLB_FALSE;
+ }
+
+ errno = 0;
+ timestamp = strtol(fname, &ext, 10);
+
+ if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) ||
+ (errno != 0 && timestamp == 0)) {
+ flb_errno();
+ return FLB_FALSE;
+ }
+
+ /* unable to parse the timstamp */
+ if (errno == ERANGE) {
+ return FLB_FALSE;
+ }
+
+ ctx->config_timestamp = timestamp;
+ }
+
+ return FLB_FALSE;
+}
+
+static int in_calyptia_fleet_init(struct flb_input_instance *in,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ int upstream_flags;
+ struct flb_in_calyptia_fleet_config *ctx;
+ (void) data;
+
+#ifdef _WIN32
+ char *tmpdir;
+#endif
+
+ flb_plg_info(in, "initializing calyptia fleet input.");
+
+ if (in->host.name == NULL) {
+ flb_plg_error(in, "no input 'Host' provided");
+ return -1;
+ }
+
+ /* Allocate space for the configuration */
+ ctx = flb_calloc(1, sizeof(struct flb_in_calyptia_fleet_config));
+
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = in;
+ ctx->collect_fd = -1;
+
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(in, (void *)ctx);
+
+ if (ret == -1) {
+ flb_free(ctx);
+ flb_plg_error(in, "unable to load configuration");
+ return -1;
+ }
+
+#ifdef _WIN32
+ if (ctx->config_dir == NULL) {
+ tmpdir = getenv("TEMP");
+
+ if (tmpdir == NULL) {
+ flb_plg_error(in, "unable to find temporary directory (%%TEMP%%).");
+ return -1;
+ }
+
+ ctx->config_dir = flb_sds_create_size(4096);
+
+ if (ctx->config_dir == NULL) {
+ flb_plg_error(in, "unable to allocate config-dir.");
+ return -1;
+ }
+ flb_sds_printf(&ctx->config_dir, "%s" PATH_SEPARATOR "%s", tmpdir, "calyptia-fleet");
+ }
+#endif
+
+ upstream_flags = FLB_IO_TCP;
+
+ if (in->use_tls) {
+ upstream_flags |= FLB_IO_TLS;
+ }
+
+ ctx->u = flb_upstream_create(config, in->host.name, in->host.port,
+ upstream_flags, in->tls);
+
+ if (!ctx->u) {
+ flb_plg_error(ctx->ins, "could not initialize upstream");
+ flb_free(ctx);
+ return -1;
+ }
+
+ if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) {
+ /* Illegal settings. Override them. */
+ ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
+ ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
+ }
+
+ if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) {
+ ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
+ }
+
+ /* Set the context */
+ flb_input_set_context(in, ctx);
+
+ /* if we load a new configuration then we will be reloaded anyways */
+ if (load_fleet_config(ctx) == FLB_TRUE) {
+ return 0;
+ }
+
+ /* Set our collector based on time */
+ ret = flb_input_set_collector_time(in,
+ in_calyptia_fleet_collect,
+ ctx->interval_sec,
+ ctx->interval_nsec,
+ config);
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
+ flb_free(ctx);
+ return -1;
+ }
+
+ ctx->collect_fd = ret;
+
+ return 0;
+}
+
+static void cb_in_calyptia_fleet_pause(void *data, struct flb_config *config)
+{
+ struct flb_in_calyptia_fleet_config *ctx = data;
+ flb_input_collector_pause(ctx->collect_fd, ctx->ins);
+}
+
+static void cb_in_calyptia_fleet_resume(void *data, struct flb_config *config)
+{
+ struct flb_in_calyptia_fleet_config *ctx = data;
+ flb_input_collector_resume(ctx->collect_fd, ctx->ins);
+}
+
+static int in_calyptia_fleet_exit(void *data, struct flb_config *config)
+{
+ (void) *config;
+ struct flb_in_calyptia_fleet_config *ctx = (struct flb_in_calyptia_fleet_config *)data;
+
+ flb_input_collector_delete(ctx->collect_fd, ctx->ins);
+ flb_upstream_destroy(ctx->u);
+ flb_free(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "api_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, api_key),
+ "Calyptia Cloud API Key."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "config_dir", DEFAULT_CONFIG_DIR,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, config_dir),
+ "Base path for the configuration directory."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "fleet_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, fleet_id),
+ "Calyptia Fleet ID."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "fleet_name", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, fleet_name),
+ "Calyptia Fleet Name (used to lookup the fleet ID via the cloud API)."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "machine_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, machine_id),
+ "Agent Machine ID."
+ },
+ {
+ FLB_CONFIG_MAP_INT, "event_fd", "-1",
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, event_fd),
+ "Used internally to set the event fd."
+ },
+ {
+ FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, interval_sec),
+ "Set the collector interval"
+ },
+ {
+ FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC,
+ 0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, interval_nsec),
+ "Set the collector interval (nanoseconds)"
+ },
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_input_plugin in_calyptia_fleet_plugin = {
+ .name = "calyptia_fleet",
+ .description = "Calyptia Fleet Input",
+ .cb_init = in_calyptia_fleet_init,
+ .cb_pre_run = NULL,
+ .cb_collect = in_calyptia_fleet_collect,
+ .cb_resume = cb_in_calyptia_fleet_resume,
+ .cb_pause = cb_in_calyptia_fleet_pause,
+ .cb_flush_buf = NULL,
+ .cb_exit = in_calyptia_fleet_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET|FLB_INPUT_CORO|FLB_IO_OPT_TLS|FLB_INPUT_PRIVATE
+};