From b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.46.3. Signed-off-by: Daniel Baumann --- fluent-bit/plugins/out_chronicle/CMakeLists.txt | 6 - fluent-bit/plugins/out_chronicle/chronicle.c | 962 ---------------------- fluent-bit/plugins/out_chronicle/chronicle.h | 96 --- fluent-bit/plugins/out_chronicle/chronicle_conf.c | 421 ---------- fluent-bit/plugins/out_chronicle/chronicle_conf.h | 29 - 5 files changed, 1514 deletions(-) delete mode 100644 fluent-bit/plugins/out_chronicle/CMakeLists.txt delete mode 100644 fluent-bit/plugins/out_chronicle/chronicle.c delete mode 100644 fluent-bit/plugins/out_chronicle/chronicle.h delete mode 100644 fluent-bit/plugins/out_chronicle/chronicle_conf.c delete mode 100644 fluent-bit/plugins/out_chronicle/chronicle_conf.h (limited to 'fluent-bit/plugins/out_chronicle') diff --git a/fluent-bit/plugins/out_chronicle/CMakeLists.txt b/fluent-bit/plugins/out_chronicle/CMakeLists.txt deleted file mode 100644 index ca9180305..000000000 --- a/fluent-bit/plugins/out_chronicle/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(src - chronicle_conf.c - chronicle.c - ) - -FLB_PLUGIN(out_chronicle "${src}" "") diff --git a/fluent-bit/plugins/out_chronicle/chronicle.c b/fluent-bit/plugins/out_chronicle/chronicle.c deleted file mode 100644 index 479dd8035..000000000 --- a/fluent-bit/plugins/out_chronicle/chronicle.c +++ /dev/null @@ -1,962 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "chronicle.h" -#include "chronicle_conf.h" - -// TODO: The following code is copied from the Stackdriver plugin and should be -// factored into common library functions. - -/* - * Base64 Encoding in JWT must: - * - * - remove any trailing padding '=' character - * - replace '+' with '-' - * - replace '/' with '_' - * - * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C - */ -int chronicle_jwt_base64_url_encode(unsigned char *out_buf, size_t out_size, - unsigned char *in_buf, size_t in_size, - size_t *olen) - -{ - int i; - size_t len; - int result; - - /* do normal base64 encoding */ - result = flb_base64_encode((unsigned char *) out_buf, out_size - 1, - &len, in_buf, in_size); - if (result != 0) { - return -1; - } - - /* Replace '+' and '/' characters */ - for (i = 0; i < len && out_buf[i] != '='; i++) { - if (out_buf[i] == '+') { - out_buf[i] = '-'; - } - else if (out_buf[i] == '/') { - out_buf[i] = '_'; - } - } - - /* Now 'i' becomes the new length */ - *olen = i; - return 0; -} - -static int chronicle_jwt_encode(struct flb_chronicle *ctx, - char *payload, char *secret, - char **out_signature, size_t *out_size) -{ - int ret; - int len; - int buf_size; - size_t olen; - char *buf; - char *sigd; - char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}"; - unsigned char sha256_buf[32] = {0}; - flb_sds_t out; - unsigned char sig[256] = {0}; - size_t sig_len; - - buf_size = (strlen(payload) + strlen(secret)) * 2; - buf = flb_malloc(buf_size); - if (!buf) { - flb_errno(); - return -1; - } - - /* Encode header */ - len = strlen(headers); - ret = flb_base64_encode((unsigned char *) buf, buf_size - 1, - &olen, (unsigned char *) headers, len); - if (ret != 0) { - flb_free(buf); - - return ret; - } - - /* Create buffer to store JWT */ - out = flb_sds_create_size(2048); - if (!out) { - flb_errno(); - flb_free(buf); - return -1; - } - - /* Append header */ - flb_sds_cat(out, buf, olen); - flb_sds_cat(out, ".", 1); - - /* Encode Payload */ - len = strlen(payload); - chronicle_jwt_base64_url_encode((unsigned char *) buf, buf_size, - (unsigned char *) payload, len, &olen); - - /* Append Payload */ - flb_sds_cat(out, buf, olen); - - /* do sha256() of base64(header).base64(payload) */ - ret = flb_hash_simple(FLB_HASH_SHA256, - (unsigned char *) out, flb_sds_len(out), - sha256_buf, sizeof(sha256_buf)); - - if (ret != FLB_CRYPTO_SUCCESS) { - flb_plg_error(ctx->ins, "error hashing token"); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - len = strlen(secret); - sig_len = sizeof(sig); - - ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY, - FLB_CRYPTO_PADDING_PKCS1, - FLB_HASH_SHA256, - (unsigned char *) secret, len, - sha256_buf, sizeof(sha256_buf), - sig, &sig_len); - - if (ret != FLB_CRYPTO_SUCCESS) { - flb_plg_error(ctx->ins, "error creating RSA context"); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - sigd = flb_malloc(2048); - if (!sigd) { - flb_errno(); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - chronicle_jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen); - - flb_sds_cat(out, ".", 1); - flb_sds_cat(out, sigd, olen); - - *out_signature = out; - *out_size = flb_sds_len(out); - - flb_free(buf); - flb_free(sigd); - - return 0; -} - -/* Create a new oauth2 context and get a oauth2 token */ -static int chronicle_get_oauth2_token(struct flb_chronicle *ctx) -{ - int ret; - char *token; - char *sig_data; - size_t sig_size; - time_t issued; - time_t expires; - char payload[1024]; - - /* Clear any previous oauth2 payload content */ - flb_oauth2_payload_clear(ctx->o); - - /* JWT encode for oauth2 */ - issued = time(NULL); - expires = issued + FLB_CHRONICLE_TOKEN_REFRESH; - - snprintf(payload, sizeof(payload) - 1, - "{\"iss\": \"%s\", \"scope\": \"%s\", " - "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}", - ctx->oauth_credentials->client_email, FLB_CHRONICLE_SCOPE, - FLB_CHRONICLE_AUTH_URL, - expires, issued); - - /* Compose JWT signature */ - ret = chronicle_jwt_encode(ctx, payload, ctx->oauth_credentials->private_key, - &sig_data, &sig_size); - if (ret != 0) { - flb_plg_error(ctx->ins, "JWT signature generation failed"); - return -1; - } - - flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data); - - ret = flb_oauth2_payload_append(ctx->o, - "grant_type", -1, - "urn%3Aietf%3Aparams%3Aoauth%3A" - "grant-type%3Ajwt-bearer", -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - flb_sds_destroy(sig_data); - return -1; - } - - ret = flb_oauth2_payload_append(ctx->o, - "assertion", -1, - sig_data, sig_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - flb_sds_destroy(sig_data); - return -1; - } - flb_sds_destroy(sig_data); - - /* Retrieve access token */ - token = flb_oauth2_token_get(ctx->o); - if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); - return -1; - } - - return 0; -} - -static flb_sds_t get_google_token(struct flb_chronicle *ctx) -{ - int ret = 0; - flb_sds_t output = NULL; - - if (pthread_mutex_lock(&ctx->token_mutex)){ - flb_plg_error(ctx->ins, "error locking mutex"); - return NULL; - } - - if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - ret = chronicle_get_oauth2_token(ctx); - } - - /* Copy string to prevent race conditions (get_oauth2 can free the string) */ - if (ret == 0) { - output = flb_sds_create(ctx->o->token_type); - flb_sds_printf(&output, " %s", ctx->o->access_token); - } - - if (pthread_mutex_unlock(&ctx->token_mutex)){ - flb_plg_error(ctx->ins, "error unlocking mutex"); - if (output) { - flb_sds_destroy(output); - } - return NULL; - } - - return output; -} - -static int validate_log_type(struct flb_chronicle *ctx, struct flb_config *config, - const char *body, size_t len) -{ - int ret = -1; - int root_type; - char *msgpack_buf = NULL; - size_t msgpack_size; - size_t off = 0; - msgpack_unpacked result; - int i, j, k; - msgpack_object key; - msgpack_object val; - msgpack_object root; - msgpack_object *array; - msgpack_object *supported_type; - int root_map_size; - int array_size = 0; - - - ret = flb_pack_json(body, len, - &msgpack_buf, &msgpack_size, - &root_type, NULL); - - if (ret != 0 || root_type != JSMN_OBJECT) { - flb_plg_error(ctx->ins, "json to msgpack conversion error"); - } - - ret = -1; - msgpack_unpacked_init(&result); - while (msgpack_unpack_next(&result, msgpack_buf, msgpack_size, &off) == MSGPACK_UNPACK_SUCCESS) { - if (result.data.type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "Invalid log_type payload"); - ret = -2; - - goto cleanup; - } - - root = result.data; - root_map_size = root.via.map.size; - - for (i = 0; i < root_map_size; i++) { - key = root.via.map.ptr[i].key; - val = root.via.map.ptr[i].val; - - if (val.type != MSGPACK_OBJECT_ARRAY) { - flb_plg_error(ctx->ins, "Invalid inner array type of log_type payload"); - ret = -2; - - goto cleanup; - } - - array = val.via.array.ptr; - array_size = val.via.array.size; - - for (j = 0; j < array_size; j++) { - supported_type = &array[j]; - - if (supported_type->type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "Invalid inner maps of log_type payload"); - ret = -2; - - continue; - } - - for (k = 0; k < supported_type->via.map.size; k++) { - key = supported_type->via.map.ptr[k].key; - val = supported_type->via.map.ptr[k].val; - - if (strncmp("logType", key.via.str.ptr, key.via.str.size) == 0) { - if (strncmp(ctx->log_type, val.via.bin.ptr, val.via.str.size) == 0) { - ret = 0; - goto cleanup; - } - } - } - } - } - } - -cleanup: - msgpack_unpacked_destroy(&result); - - /* release 'out_buf' if it was allocated */ - if (msgpack_buf) { - flb_free(msgpack_buf); - } - - return ret; -} - -static int check_chronicle_log_type(struct flb_chronicle *ctx, struct flb_config *config) -{ - int ret; - size_t b_sent; - flb_sds_t token; - struct flb_connection *u_conn; - struct flb_http_client *c; - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - return -1; - } - - /* Get or renew Token */ - token = get_google_token(ctx); - - if (!token) { - flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); - flb_upstream_conn_release(u_conn); - return -1; - } - - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_GET, FLB_CHRONICLE_LOG_TYPE_ENDPOINT, - NULL, 0, NULL, 0, NULL, 0); - if (!c) { - flb_plg_error(ctx->ins, "cannot create HTTP client context"); - flb_upstream_conn_release(u_conn); - flb_sds_destroy(token); - - return -1; - } - - /* Chronicle supported types are growing. Not to specify the read limit. */ - flb_http_buffer_size(c, 0); - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/json", 16); - - /* Compose and append Authorization header */ - flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* validate response */ - if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i", ret); - goto cleanup; - } - else { - /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); - if (c->resp.status == 200) { - ret = validate_log_type(ctx, config, c->resp.payload, c->resp.payload_size); - if (ret != 0) { - flb_plg_error(ctx->ins, "Validate log_type is failed"); - goto cleanup; - } - } - else { - if (c->resp.payload && c->resp.payload_size > 0) { - /* we got an error */ - flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); - } - - goto cleanup; - } - } - -cleanup: - - /* Cleanup */ - flb_sds_destroy(token); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - - return ret; -} - -static int cb_chronicle_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - char *token; - int io_flags = FLB_IO_TLS; - struct flb_chronicle *ctx; - int ret; - - /* Create config context */ - ctx = flb_chronicle_conf_create(ins, config); - if (!ctx) { - flb_plg_error(ins, "configuration failed"); - return -1; - } - - flb_output_set_context(ins, ctx); - - /* Network mode IPv6 */ - if (ins->host.ipv6 == FLB_TRUE) { - io_flags |= FLB_IO_IPV6; - } - - /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ - pthread_mutex_init(&ctx->token_mutex, NULL); - - /* - * Create upstream context for Chronicle Streaming Inserts - * (no oauth2 service) - */ - ctx->u = flb_upstream_create_url(config, ctx->uri, - io_flags, ins->tls); - if (!ctx->u) { - flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; - } - - /* Create oauth2 context */ - ctx->o = flb_oauth2_create(ctx->config, FLB_CHRONICLE_AUTH_URL, 3000); - if (!ctx->o) { - flb_plg_error(ctx->ins, "cannot create oauth2 context"); - return -1; - } - flb_output_upstream_set(ctx->u, ins); - - /* Get or renew the OAuth2 token */ - token = get_google_token(ctx); - - if (!token) { - flb_plg_warn(ctx->ins, "token retrieval failed"); - } - else { - flb_sds_destroy(token); - } - - ret = check_chronicle_log_type(ctx, config); - if (ret != 0) { - flb_plg_error(ctx->ins, "Validate log_type failed. '%s' is not supported. ret = %d", - ctx->log_type, ret); - return -1; - } - - return 0; -} - -static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, uint64_t bytes, struct flb_log_event log_event) -{ - int i; - int map_size; - int check = FLB_FALSE; - int found = FLB_FALSE; - int log_key_missing = 0; - int ret; - struct flb_chronicle *ctx = out_context; - char *val_buf; - char *key_str = NULL; - size_t key_str_size = 0; - size_t msgpack_size = bytes + bytes / 4; - size_t val_offset = 0; - flb_sds_t out_buf; - msgpack_object map; - msgpack_object key; - msgpack_object val; - - /* Allocate buffer to store log_key contents */ - val_buf = flb_calloc(1, msgpack_size); - if (val_buf == NULL) { - flb_plg_error(ctx->ins, "Could not allocate enough " - "memory to read record"); - flb_errno(); - return NULL; - } - - /* Get the record/map */ - map = *log_event.body; - - if (map.type != MSGPACK_OBJECT_MAP) { - return NULL; - } - - map_size = map.via.map.size; - - /* Reset variables for found log_key and correct type */ - found = FLB_FALSE; - check = FLB_FALSE; - - /* Extract log_key from record and append to output buffer */ - for (i = 0; i < map_size; i++) { - key = map.via.map.ptr[i].key; - val = map.via.map.ptr[i].val; - - if (key.type == MSGPACK_OBJECT_BIN) { - key_str = (char *) key.via.bin.ptr; - key_str_size = key.via.bin.size; - check = FLB_TRUE; - } - if (key.type == MSGPACK_OBJECT_STR) { - key_str = (char *) key.via.str.ptr; - key_str_size = key.via.str.size; - check = FLB_TRUE; - } - - if (check == FLB_TRUE) { - if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { - found = FLB_TRUE; - - /* - * Copy contents of value into buffer. Necessary to copy - * strings because flb_msgpack_to_json does not handle nested - * JSON gracefully and double escapes them. - */ - if (val.type == MSGPACK_OBJECT_BIN) { - memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size); - val_offset += val.via.bin.size; - val_buf[val_offset] = '\0'; - val_offset++; - } - else if (val.type == MSGPACK_OBJECT_STR) { - memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size); - val_offset += val.via.str.size; - val_buf[val_offset] = '\0'; - val_offset++; - } - else { - ret = flb_msgpack_to_json(val_buf + val_offset, - msgpack_size - val_offset, &val); - if (ret < 0) { - break; - } - val_offset += ret; - val_buf[val_offset] = '\0'; - val_offset++; - } - /* Exit early once log_key has been found for current record */ - break; - } - } - - /* If log_key was not found in the current record, mark log key as missing */ - if (found == FLB_FALSE) { - log_key_missing++; - } - } - - if (log_key_missing > 0) { - flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records", - ctx->log_key, log_key_missing); - } - - /* If nothing was read, destroy buffer */ - if (val_offset == 0) { - flb_free(val_buf); - return NULL; - } - val_buf[val_offset] = '\0'; - - /* Create output buffer to store contents */ - out_buf = flb_sds_create(val_buf); - if (out_buf == NULL) { - flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents."); - flb_errno(); - } - flb_free(val_buf); - - return out_buf; -} - -static int chronicle_format(const void *data, size_t bytes, - const char *tag, size_t tag_len, - char **out_data, size_t *out_size, - struct flb_chronicle *ctx) -{ - int len; - int ret; - int array_size = 0; - size_t off = 0; - size_t last_off = 0; - size_t alloc_size = 0; - size_t s; - char time_formatted[255]; - /* Parameters for Timestamp */ - struct tm tm; - flb_sds_t out_buf; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - msgpack_sbuffer mp_sbuf; - msgpack_packer mp_pck; - flb_sds_t log_text = NULL; - int log_text_size; - - /* Count number of records */ - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - array_size = flb_mp_count(data, bytes); - - /* Create temporary msgpack buffer */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - /* - * Pack root map (unstructured log): - * see: https://cloud.google.com/chronicle/docs/reference/ingestion-api#request_body_2 - * { - * "customer_id": "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c", - * "log_type": "BIND_DNS", - * "entries": [ - * { - * "log_text": "26-Feb-2019 13:35:02.187 client 10.120.20.32#4238: query: altostrat.com IN A + (203.0.113.102)", - * "ts_epoch_microseconds": 1551188102187000 - * }, - * { - * "log_text": "26-Feb-2019 13:37:04.523 client 10.50.100.33#1116: query: examplepetstore.com IN A + (203.0.113.102)", - * "ts_rfc3339": "2019-26-02T13:37:04.523-08:00" - * }, - * { - * "log_text": "26-Feb-2019 13:39:01.115 client 10.1.2.3#3333: query: www.example.com IN A + (203.0.113.102)" - * }, - * ] - * } - */ - msgpack_pack_map(&mp_pck, 3); - - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "customer_id", 11); - - msgpack_pack_str(&mp_pck, strlen(ctx->customer_id)); - msgpack_pack_str_body(&mp_pck, ctx->customer_id, strlen(ctx->customer_id)); - - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "log_type", 8); - - msgpack_pack_str(&mp_pck, strlen(ctx->log_type)); - msgpack_pack_str_body(&mp_pck, ctx->log_type, strlen(ctx->log_type)); - - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "entries", 7); - - /* Append entries */ - msgpack_pack_array(&mp_pck, array_size); - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - off = log_decoder.offset; - alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ - last_off = off; - - /* - * Pack entries - * - * { - * "log_text": {...}, - * "ts_rfc3339": "..." - * } - * - */ - msgpack_pack_map(&mp_pck, 2); - - /* log_text */ - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "log_text", 8); - if (ctx->log_key != NULL) { - log_text = flb_pack_msgpack_extract_log_key(ctx, bytes, log_event); - log_text_size = flb_sds_len(log_text); - } - else { - log_text = flb_msgpack_to_json_str(alloc_size, log_event.body); - log_text_size = strlen(log_text); - } - - if (log_text == NULL) { - flb_plg_error(ctx->ins, "Could not marshal msgpack to output string"); - return -1; - } - msgpack_pack_str(&mp_pck, log_text_size); - msgpack_pack_str_body(&mp_pck, log_text, log_text_size); - - if (ctx->log_key != NULL) { - flb_sds_destroy(log_text); - } - else { - flb_free(log_text); - } - /* timestamp */ - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "ts_rfc3339", 10); - - gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); - s = strftime(time_formatted, sizeof(time_formatted) - 1, - FLB_STD_TIME_FMT, &tm); - len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, - ".%03" PRIu64 "Z", - (uint64_t) log_event.timestamp.tm.tv_nsec); - s += len; - - msgpack_pack_str(&mp_pck, s); - msgpack_pack_str_body(&mp_pck, time_formatted, s); - } - - /* Convert from msgpack to JSON */ - out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - - flb_log_event_decoder_destroy(&log_decoder); - msgpack_sbuffer_destroy(&mp_sbuf); - - if (!out_buf) { - flb_plg_error(ctx->ins, "error formatting JSON payload"); - return -1; - } - - *out_data = out_buf; - *out_size = flb_sds_len(out_buf); - - return 0; -} - -static void cb_chronicle_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - (void) i_ins; - (void) config; - int ret; - int ret_code = FLB_RETRY; - size_t b_sent; - flb_sds_t token; - flb_sds_t payload_buf; - size_t payload_size; - struct flb_chronicle *ctx = out_context; - struct flb_connection *u_conn; - struct flb_http_client *c; - - flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Get or renew Token */ - token = get_google_token(ctx); - - if (!token) { - flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Reformat msgpack to chronicle JSON payload */ - ret = chronicle_format(event_chunk->data, event_chunk->size, - event_chunk->tag, flb_sds_len(event_chunk->tag), - &payload_buf, &payload_size, ctx); - if (ret != 0) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(token); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->endpoint, - payload_buf, payload_size, NULL, 0, NULL, 0); - if (!c) { - flb_plg_error(ctx->ins, "cannot create HTTP client context"); - flb_upstream_conn_release(u_conn); - flb_sds_destroy(token); - flb_sds_destroy(payload_buf); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - flb_http_buffer_size(c, 4192); - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/json", 16); - - /* Compose and append Authorization header */ - flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* validate response */ - if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i", ret); - ret_code = FLB_RETRY; - } - else { - /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); - if (c->resp.status == 200) { - ret_code = FLB_OK; - } - else { - if (c->resp.payload && c->resp.payload_size > 0) { - /* we got an error */ - flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); - } - ret_code = FLB_RETRY; - } - } - - /* Cleanup */ - flb_sds_destroy(payload_buf); - flb_sds_destroy(token); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - - /* Done */ - FLB_OUTPUT_RETURN(ret_code); -} - -static int cb_chronicle_exit(void *data, struct flb_config *config) -{ - struct flb_chronicle *ctx = data; - - if (!ctx) { - return -1; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - flb_chronicle_conf_destroy(ctx); - return 0; -} - -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_chronicle, credentials_file), - "Set the path for the google service credentials file" - }, - // set in flb_chronicle_oauth_credentials - { - FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL, - 0, FLB_FALSE, 0, - "Set the service account email" - }, - // set in flb_chronicle_oauth_credentials - { - FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL, - 0, FLB_FALSE, 0, - "Set the service account secret" - }, - { - FLB_CONFIG_MAP_STR, "project_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_chronicle, project_id), - "Set the project id" - }, - { - FLB_CONFIG_MAP_STR, "customer_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_chronicle, customer_id), - "Set the customer id" - }, - { - FLB_CONFIG_MAP_STR, "log_type", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_chronicle, log_type), - "Set the log type" - }, - { - FLB_CONFIG_MAP_STR, "region", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_chronicle, region), - "Set the region" - }, - { - FLB_CONFIG_MAP_STR, "log_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_chronicle, log_key), - "Set the log key" - }, - /* EOF */ - {0} -}; - -struct flb_output_plugin out_chronicle_plugin = { - .name = "chronicle", - .description = "Send logs to Google Chronicle as unstructured log", - .cb_init = cb_chronicle_init, - .cb_flush = cb_chronicle_flush, - .cb_exit = cb_chronicle_exit, - .config_map = config_map, - /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; diff --git a/fluent-bit/plugins/out_chronicle/chronicle.h b/fluent-bit/plugins/out_chronicle/chronicle.h deleted file mode 100644 index 0243223f0..000000000 --- a/fluent-bit/plugins/out_chronicle/chronicle.h +++ /dev/null @@ -1,96 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_CHRONICLE -#define FLB_OUT_CHRONICLE - -#include -#include -#include -#include - -/* refresh token every 50 minutes */ -#define FLB_CHRONICLE_TOKEN_REFRESH 3000 - -/* Timestamp format */ -#define FLB_STD_TIME_FMT "%Y-%m-%dT%H:%M:%S" - -/* Chronicle unstructureed logs oauth scope */ -#define FLB_CHRONICLE_SCOPE "https://www.googleapis.com/auth/malachite-ingestion" - -/* Chronicle authorization URL */ -#define FLB_CHRONICLE_AUTH_URL "https://oauth2.googleapis.com/token" - -#define FLB_CHRONICLE_UNSTRUCTURED_ENDPOINT "/v2/unstructuredlogentries:batchCreate" -#define FLB_CHRONICLE_LOG_TYPE_ENDPOINT "/v2/logtypes" -#define FLB_CHRONICLE_URL_BASE "https://malachiteingestion-pa.googleapis.com" -#define FLB_CHRONICLE_URL_BASE_EU "https://europe-malachiteingestion-pa.googleapis.com" -#define FLB_CHRONICLE_URL_BASE_UK "https://europe-west2-malachiteingestion-pa.googleapis.com" -#define FLB_CHRONICLE_URL_BASE_ASIA "https://asia-southeast1-malachiteingestion-pa.googleapis.com" - -struct flb_chronicle_oauth_credentials { - /* parsed credentials file */ - flb_sds_t type; - flb_sds_t project_id; - flb_sds_t private_key_id; - flb_sds_t private_key; - flb_sds_t client_email; - flb_sds_t client_id; - flb_sds_t auth_uri; - flb_sds_t token_uri; -}; - -struct flb_chronicle { - /* credentials */ - flb_sds_t credentials_file; - - struct flb_chronicle_oauth_credentials *oauth_credentials; - - /* chronicle configuration */ - flb_sds_t project_id; - flb_sds_t customer_id; - flb_sds_t log_type; - - flb_sds_t uri; - flb_sds_t health_uri; - flb_sds_t endpoint; - flb_sds_t region; - flb_sds_t log_key; - - int json_date_format; - flb_sds_t json_date_key; - flb_sds_t date_key; - - /* oauth2 context */ - struct flb_oauth2 *o; - - /* mutex for acquiring oauth tokens */ - pthread_mutex_t token_mutex; - - /* Upstream connection to the backend server */ - struct flb_upstream *u; - - /* Fluent Bit context */ - struct flb_config *config; - - /* Plugin output instance reference */ - struct flb_output_instance *ins; -}; - -#endif diff --git a/fluent-bit/plugins/out_chronicle/chronicle_conf.c b/fluent-bit/plugins/out_chronicle/chronicle_conf.c deleted file mode 100644 index 5d6cdf9b2..000000000 --- a/fluent-bit/plugins/out_chronicle/chronicle_conf.c +++ /dev/null @@ -1,421 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "chronicle.h" -#include "chronicle_conf.h" - - -static inline int key_cmp(char *str, int len, char *cmp) { - - if (strlen(cmp) != len) { - return -1; - } - - return strncasecmp(str, cmp, len); -} - -static int flb_chronicle_read_credentials_file(struct flb_chronicle *ctx, - char *creds, - struct flb_chronicle_oauth_credentials *ctx_creds) -{ - int i; - int ret; - int len; - int key_len; - int val_len; - int tok_size = 32; - char *buf; - char *key; - char *val; - flb_sds_t tmp; - struct stat st; - jsmn_parser parser; - jsmntok_t *t; - jsmntok_t *tokens; - - /* Validate credentials path */ - ret = stat(creds, &st); - if (ret == -1) { - flb_errno(); - flb_plg_error(ctx->ins, "cannot open credentials file: %s", - creds); - return -1; - } - - if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) { - flb_plg_error(ctx->ins, "credentials file " - "is not a valid file: %s", creds); - return -1; - } - - /* Read file content */ - buf = mk_file_to_buffer(creds); - if (!buf) { - flb_plg_error(ctx->ins, "error reading credentials file: %s", - creds); - return -1; - } - - /* Parse content */ - jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); - if (!tokens) { - flb_errno(); - flb_free(buf); - return -1; - } - - ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size); - if (ret <= 0) { - flb_plg_error(ctx->ins, "invalid JSON credentials file: %s", - creds); - flb_free(buf); - flb_free(tokens); - return -1; - } - - t = &tokens[0]; - if (t->type != JSMN_OBJECT) { - flb_plg_error(ctx->ins, "invalid JSON map on file: %s", - creds); - flb_free(buf); - flb_free(tokens); - return -1; - } - - /* Parse JSON tokens */ - for (i = 1; i < ret; i++) { - t = &tokens[i]; - if (t->type != JSMN_STRING) { - continue; - } - - if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){ - break; - } - - /* Key */ - key = buf + t->start; - key_len = (t->end - t->start); - - /* Value */ - i++; - t = &tokens[i]; - val = buf + t->start; - val_len = (t->end - t->start); - - if (key_cmp(key, key_len, "type") == 0) { - ctx_creds->type = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "project_id") == 0) { - ctx_creds->project_id = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "private_key_id") == 0) { - ctx_creds->private_key_id = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "private_key") == 0) { - tmp = flb_sds_create_len(val, val_len); - if (tmp) { - /* Unescape private key */ - len = flb_sds_len(tmp); - ctx_creds->private_key = flb_sds_create_size(len); - flb_unescape_string(tmp, len, - &ctx_creds->private_key); - flb_sds_destroy(tmp); - } - } - else if (key_cmp(key, key_len, "client_email") == 0) { - ctx_creds->client_email = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "client_id") == 0) { - ctx_creds->client_id = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "auth_uri") == 0) { - ctx_creds->auth_uri = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "token_uri") == 0) { - ctx_creds->token_uri = flb_sds_create_len(val, val_len); - } - } - - flb_free(buf); - flb_free(tokens); - - return 0; -} - - -struct flb_chronicle *flb_chronicle_conf_create(struct flb_output_instance *ins, - struct flb_config *config) -{ - int ret; - const char *tmp; - struct flb_chronicle *ctx; - struct flb_chronicle_oauth_credentials *creds; - - /* Allocate config context */ - ctx = flb_calloc(1, sizeof(struct flb_chronicle)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - ctx->config = config; - - ret = flb_output_config_map_set(ins, (void *)ctx); - if (ret == -1) { - flb_plg_error(ins, "unable to load configuration"); - flb_free(ctx); - return NULL; - } - - /* Lookup credentials file */ - creds = flb_calloc(1, sizeof(struct flb_chronicle_oauth_credentials)); - if (!creds) { - flb_errno(); - flb_free(ctx); - return NULL; - } - ctx->oauth_credentials = creds; - - if (ctx->credentials_file == NULL) { - tmp = getenv("GOOGLE_SERVICE_CREDENTIALS"); - if (tmp) { - ctx->credentials_file = flb_sds_create(tmp); - } - } - - if (ctx->credentials_file) { - ret = flb_chronicle_read_credentials_file(ctx, - ctx->credentials_file, - ctx->oauth_credentials); - if (ret != 0) { - flb_chronicle_conf_destroy(ctx); - return NULL; - } - } - else if (!ctx->credentials_file) { - /* - * If no credentials file has been defined, do manual lookup of the - * client email and the private key. - */ - - /* Service Account Email */ - tmp = flb_output_get_property("service_account_email", ins); - if (tmp) { - creds->client_email = flb_sds_create(tmp); - } - else { - tmp = getenv("SERVICE_ACCOUNT_EMAIL"); - if (tmp) { - creds->client_email = flb_sds_create(tmp); - } - } - - /* Service Account Secret */ - tmp = flb_output_get_property("service_account_secret", ins); - if (tmp) { - creds->private_key = flb_sds_create(tmp); - } - else { - tmp = getenv("SERVICE_ACCOUNT_SECRET"); - if (tmp) { - creds->private_key = flb_sds_create(tmp); - } - } - - if (!creds->client_email) { - flb_plg_error(ctx->ins, "service_account_email/client_email is not defined"); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - - if (!creds->private_key) { - flb_plg_error(ctx->ins, "service_account_secret/private_key is not defined"); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - } - - /* config: 'project_id' */ - if (ctx->project_id == NULL) { - if (creds->project_id) { - /* flb_config_map_destroy uses the pointer within the config_map struct to - * free the value so if we assign it here it is safe to free later with the - * creds struct. If we do not we will leak here. - */ - ctx->project_id = creds->project_id; - if (!ctx->project_id) { - flb_plg_error(ctx->ins, - "failed extracting 'project_id' from credentials."); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - } - else { - flb_plg_error(ctx->ins, - "no 'project_id' configured or present in credentials."); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - } - - /* config: 'customer_id' */ - if (ctx->customer_id == NULL) { - flb_plg_error(ctx->ins, "property 'customer_id' is not defined"); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - - /* config: 'log_type' */ - if (ctx->log_type == NULL) { - flb_plg_error(ctx->ins, "property 'log_type' is not defined"); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - - /* Date key */ - ctx->date_key = ctx->json_date_key; - tmp = flb_output_get_property("json_date_key", ins); - if (tmp) { - /* Just check if we have to disable it */ - if (flb_utils_bool(tmp) == FLB_FALSE) { - ctx->date_key = NULL; - } - } - - /* Date format for JSON output */ - ctx->json_date_format = FLB_PACK_JSON_DATE_ISO8601; - tmp = flb_output_get_property("json_date_format", ins); - if (tmp) { - ret = flb_pack_to_json_date_type(tmp); - if (ret == -1) { - flb_plg_error(ctx->ins, "invalid json_date_format '%s'. ", tmp); - return NULL; - } - else { - ctx->json_date_format = ret; - } - } - - /* Create the target endpoint URI */ - ctx->endpoint = flb_sds_create_size(sizeof(FLB_CHRONICLE_UNSTRUCTURED_ENDPOINT)); - if (!ctx->endpoint) { - flb_errno(); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - ctx->endpoint = flb_sds_printf(&ctx->endpoint, FLB_CHRONICLE_UNSTRUCTURED_ENDPOINT); - - /* Create the base URI */ - if (ctx->region == NULL || strncasecmp(ctx->region, "US", 2) == 0) { - ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE)); - if (!ctx->uri) { - flb_errno(); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE); - } - else if (strncasecmp(ctx->region, "EU", 2) == 0){ - ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE_EU)); - if (!ctx->uri) { - flb_errno(); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE_EU); - } - else if (strncasecmp(ctx->region, "UK", 2) == 0) { - ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE_UK)); - if (!ctx->uri) { - flb_errno(); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE_UK); - } - else if (strncasecmp(ctx->region, "ASIA", 4) == 0) { - ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE_ASIA)); - if (!ctx->uri) { - flb_errno(); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE_ASIA); - } - else { - flb_plg_error(ctx->ins, "unsupported region"); - flb_chronicle_conf_destroy(ctx); - return NULL; - } - flb_plg_info(ctx->ins, "project='%s' custumer_id='%s' region='%s'", - ctx->project_id, ctx->customer_id, ctx->region); - - return ctx; -} - - -int flb_chronicle_oauth_credentials_destroy(struct flb_chronicle_oauth_credentials *creds) -{ - if (!creds) { - return -1; - } - flb_sds_destroy(creds->type); - flb_sds_destroy(creds->project_id); - flb_sds_destroy(creds->private_key_id); - flb_sds_destroy(creds->private_key); - flb_sds_destroy(creds->client_email); - flb_sds_destroy(creds->client_id); - flb_sds_destroy(creds->auth_uri); - flb_sds_destroy(creds->token_uri); - - flb_free(creds); - - return 0; -} - -int flb_chronicle_conf_destroy(struct flb_chronicle *ctx) -{ - if (!ctx) { - return -1; - } - - flb_chronicle_oauth_credentials_destroy(ctx->oauth_credentials); - - flb_sds_destroy(ctx->endpoint); - flb_sds_destroy(ctx->uri); - - if (ctx->o) { - flb_oauth2_destroy(ctx->o); - } - - flb_free(ctx); - return 0; -} diff --git a/fluent-bit/plugins/out_chronicle/chronicle_conf.h b/fluent-bit/plugins/out_chronicle/chronicle_conf.h deleted file mode 100644 index 76dcfb3d2..000000000 --- a/fluent-bit/plugins/out_chronicle/chronicle_conf.h +++ /dev/null @@ -1,29 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_CHRONICLE_CONF_H -#define FLB_OUT_CHRONICLE_CONF_H - -#include "chronicle.h" - -struct flb_chronicle *flb_chronicle_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_chronicle_conf_destroy(struct flb_chronicle *ctx); - -#endif -- cgit v1.2.3