diff options
Diffstat (limited to 'fluent-bit/plugins/out_chronicle')
-rw-r--r-- | fluent-bit/plugins/out_chronicle/CMakeLists.txt | 6 | ||||
-rw-r--r-- | fluent-bit/plugins/out_chronicle/chronicle.c | 962 | ||||
-rw-r--r-- | fluent-bit/plugins/out_chronicle/chronicle.h | 96 | ||||
-rw-r--r-- | fluent-bit/plugins/out_chronicle/chronicle_conf.c | 421 | ||||
-rw-r--r-- | fluent-bit/plugins/out_chronicle/chronicle_conf.h | 29 |
5 files changed, 1514 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_chronicle/CMakeLists.txt b/fluent-bit/plugins/out_chronicle/CMakeLists.txt new file mode 100644 index 000000000..ca9180305 --- /dev/null +++ b/fluent-bit/plugins/out_chronicle/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + chronicle_conf.c + chronicle.c + ) + +FLB_PLUGIN(out_chronicle "${src}" "") diff --git a/fluent-bit/plugins/out_chronicle/chronicle.c b/fluent-bit/plugins/out_chronicle/chronicle.c new file mode 100644 index 000000000..479dd8035 --- /dev/null +++ b/fluent-bit/plugins/out_chronicle/chronicle.c @@ -0,0 +1,962 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_oauth2.h> +#include <fluent-bit/flb_base64.h> +#include <fluent-bit/flb_hash.h> +#include <fluent-bit/flb_crypto.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_log_event_encoder.h> +#include <fluent-bit/flb_log_event_decoder.h> + +#include <msgpack.h> + +#include "chronicle.h" +#include "chronicle_conf.h" + +// TODO: The following code is copied from the Stackdriver plugin and should be +// factored into common library functions. + +/* + * Base64 Encoding in JWT must: + * + * - remove any trailing padding '=' character + * - replace '+' with '-' + * - replace '/' with '_' + * + * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C + */ +int chronicle_jwt_base64_url_encode(unsigned char *out_buf, size_t out_size, + unsigned char *in_buf, size_t in_size, + size_t *olen) + +{ + int i; + size_t len; + int result; + + /* do normal base64 encoding */ + result = flb_base64_encode((unsigned char *) out_buf, out_size - 1, + &len, in_buf, in_size); + if (result != 0) { + return -1; + } + + /* Replace '+' and '/' characters */ + for (i = 0; i < len && out_buf[i] != '='; i++) { + if (out_buf[i] == '+') { + out_buf[i] = '-'; + } + else if (out_buf[i] == '/') { + out_buf[i] = '_'; + } + } + + /* Now 'i' becomes the new length */ + *olen = i; + return 0; +} + +static int chronicle_jwt_encode(struct flb_chronicle *ctx, + char *payload, char *secret, + char **out_signature, size_t *out_size) +{ + int ret; + int len; + int buf_size; + size_t olen; + char *buf; + char *sigd; + char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}"; + unsigned char sha256_buf[32] = {0}; + flb_sds_t out; + unsigned char sig[256] = {0}; + size_t sig_len; + + buf_size = (strlen(payload) + strlen(secret)) * 2; + buf = flb_malloc(buf_size); + if (!buf) { + flb_errno(); + return -1; + } + + /* Encode header */ + len = strlen(headers); + ret = flb_base64_encode((unsigned char *) buf, buf_size - 1, + &olen, (unsigned char *) headers, len); + if (ret != 0) { + flb_free(buf); + + return ret; + } + + /* Create buffer to store JWT */ + out = flb_sds_create_size(2048); + if (!out) { + flb_errno(); + flb_free(buf); + return -1; + } + + /* Append header */ + flb_sds_cat(out, buf, olen); + flb_sds_cat(out, ".", 1); + + /* Encode Payload */ + len = strlen(payload); + chronicle_jwt_base64_url_encode((unsigned char *) buf, buf_size, + (unsigned char *) payload, len, &olen); + + /* Append Payload */ + flb_sds_cat(out, buf, olen); + + /* do sha256() of base64(header).base64(payload) */ + ret = flb_hash_simple(FLB_HASH_SHA256, + (unsigned char *) out, flb_sds_len(out), + sha256_buf, sizeof(sha256_buf)); + + if (ret != FLB_CRYPTO_SUCCESS) { + flb_plg_error(ctx->ins, "error hashing token"); + flb_free(buf); + flb_sds_destroy(out); + return -1; + } + + len = strlen(secret); + sig_len = sizeof(sig); + + ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY, + FLB_CRYPTO_PADDING_PKCS1, + FLB_HASH_SHA256, + (unsigned char *) secret, len, + sha256_buf, sizeof(sha256_buf), + sig, &sig_len); + + if (ret != FLB_CRYPTO_SUCCESS) { + flb_plg_error(ctx->ins, "error creating RSA context"); + flb_free(buf); + flb_sds_destroy(out); + return -1; + } + + sigd = flb_malloc(2048); + if (!sigd) { + flb_errno(); + flb_free(buf); + flb_sds_destroy(out); + return -1; + } + + chronicle_jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen); + + flb_sds_cat(out, ".", 1); + flb_sds_cat(out, sigd, olen); + + *out_signature = out; + *out_size = flb_sds_len(out); + + flb_free(buf); + flb_free(sigd); + + return 0; +} + +/* Create a new oauth2 context and get a oauth2 token */ +static int chronicle_get_oauth2_token(struct flb_chronicle *ctx) +{ + int ret; + char *token; + char *sig_data; + size_t sig_size; + time_t issued; + time_t expires; + char payload[1024]; + + /* Clear any previous oauth2 payload content */ + flb_oauth2_payload_clear(ctx->o); + + /* JWT encode for oauth2 */ + issued = time(NULL); + expires = issued + FLB_CHRONICLE_TOKEN_REFRESH; + + snprintf(payload, sizeof(payload) - 1, + "{\"iss\": \"%s\", \"scope\": \"%s\", " + "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}", + ctx->oauth_credentials->client_email, FLB_CHRONICLE_SCOPE, + FLB_CHRONICLE_AUTH_URL, + expires, issued); + + /* Compose JWT signature */ + ret = chronicle_jwt_encode(ctx, payload, ctx->oauth_credentials->private_key, + &sig_data, &sig_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "JWT signature generation failed"); + return -1; + } + + flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data); + + ret = flb_oauth2_payload_append(ctx->o, + "grant_type", -1, + "urn%3Aietf%3Aparams%3Aoauth%3A" + "grant-type%3Ajwt-bearer", -1); + if (ret == -1) { + flb_plg_error(ctx->ins, "error appending oauth2 params"); + flb_sds_destroy(sig_data); + return -1; + } + + ret = flb_oauth2_payload_append(ctx->o, + "assertion", -1, + sig_data, sig_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "error appending oauth2 params"); + flb_sds_destroy(sig_data); + return -1; + } + flb_sds_destroy(sig_data); + + /* Retrieve access token */ + token = flb_oauth2_token_get(ctx->o); + if (!token) { + flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + return -1; + } + + return 0; +} + +static flb_sds_t get_google_token(struct flb_chronicle *ctx) +{ + int ret = 0; + flb_sds_t output = NULL; + + if (pthread_mutex_lock(&ctx->token_mutex)){ + flb_plg_error(ctx->ins, "error locking mutex"); + return NULL; + } + + if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { + ret = chronicle_get_oauth2_token(ctx); + } + + /* Copy string to prevent race conditions (get_oauth2 can free the string) */ + if (ret == 0) { + output = flb_sds_create(ctx->o->token_type); + flb_sds_printf(&output, " %s", ctx->o->access_token); + } + + if (pthread_mutex_unlock(&ctx->token_mutex)){ + flb_plg_error(ctx->ins, "error unlocking mutex"); + if (output) { + flb_sds_destroy(output); + } + return NULL; + } + + return output; +} + +static int validate_log_type(struct flb_chronicle *ctx, struct flb_config *config, + const char *body, size_t len) +{ + int ret = -1; + int root_type; + char *msgpack_buf = NULL; + size_t msgpack_size; + size_t off = 0; + msgpack_unpacked result; + int i, j, k; + msgpack_object key; + msgpack_object val; + msgpack_object root; + msgpack_object *array; + msgpack_object *supported_type; + int root_map_size; + int array_size = 0; + + + ret = flb_pack_json(body, len, + &msgpack_buf, &msgpack_size, + &root_type, NULL); + + if (ret != 0 || root_type != JSMN_OBJECT) { + flb_plg_error(ctx->ins, "json to msgpack conversion error"); + } + + ret = -1; + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, msgpack_buf, msgpack_size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "Invalid log_type payload"); + ret = -2; + + goto cleanup; + } + + root = result.data; + root_map_size = root.via.map.size; + + for (i = 0; i < root_map_size; i++) { + key = root.via.map.ptr[i].key; + val = root.via.map.ptr[i].val; + + if (val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "Invalid inner array type of log_type payload"); + ret = -2; + + goto cleanup; + } + + array = val.via.array.ptr; + array_size = val.via.array.size; + + for (j = 0; j < array_size; j++) { + supported_type = &array[j]; + + if (supported_type->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "Invalid inner maps of log_type payload"); + ret = -2; + + continue; + } + + for (k = 0; k < supported_type->via.map.size; k++) { + key = supported_type->via.map.ptr[k].key; + val = supported_type->via.map.ptr[k].val; + + if (strncmp("logType", key.via.str.ptr, key.via.str.size) == 0) { + if (strncmp(ctx->log_type, val.via.bin.ptr, val.via.str.size) == 0) { + ret = 0; + goto cleanup; + } + } + } + } + } + } + +cleanup: + msgpack_unpacked_destroy(&result); + + /* release 'out_buf' if it was allocated */ + if (msgpack_buf) { + flb_free(msgpack_buf); + } + + return ret; +} + +static int check_chronicle_log_type(struct flb_chronicle *ctx, struct flb_config *config) +{ + int ret; + size_t b_sent; + flb_sds_t token; + struct flb_connection *u_conn; + struct flb_http_client *c; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + return -1; + } + + /* Get or renew Token */ + token = get_google_token(ctx); + + if (!token) { + flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_GET, FLB_CHRONICLE_LOG_TYPE_ENDPOINT, + NULL, 0, NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(token); + + return -1; + } + + /* Chronicle supported types are growing. Not to specify the read limit. */ + flb_http_buffer_size(c, 0); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Content-Type", 12, "application/json", 16); + + /* Compose and append Authorization header */ + flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + + /* validate response */ + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + goto cleanup; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); + if (c->resp.status == 200) { + ret = validate_log_type(ctx, config, c->resp.payload, c->resp.payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, "Validate log_type is failed"); + goto cleanup; + } + } + else { + if (c->resp.payload && c->resp.payload_size > 0) { + /* we got an error */ + flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); + } + + goto cleanup; + } + } + +cleanup: + + /* Cleanup */ + flb_sds_destroy(token); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + return ret; +} + +static int cb_chronicle_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + char *token; + int io_flags = FLB_IO_TLS; + struct flb_chronicle *ctx; + int ret; + + /* Create config context */ + ctx = flb_chronicle_conf_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "configuration failed"); + return -1; + } + + flb_output_set_context(ins, ctx); + + /* Network mode IPv6 */ + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ + pthread_mutex_init(&ctx->token_mutex, NULL); + + /* + * Create upstream context for Chronicle Streaming Inserts + * (no oauth2 service) + */ + ctx->u = flb_upstream_create_url(config, ctx->uri, + io_flags, ins->tls); + if (!ctx->u) { + flb_plg_error(ctx->ins, "upstream creation failed"); + return -1; + } + + /* Create oauth2 context */ + ctx->o = flb_oauth2_create(ctx->config, FLB_CHRONICLE_AUTH_URL, 3000); + if (!ctx->o) { + flb_plg_error(ctx->ins, "cannot create oauth2 context"); + return -1; + } + flb_output_upstream_set(ctx->u, ins); + + /* Get or renew the OAuth2 token */ + token = get_google_token(ctx); + + if (!token) { + flb_plg_warn(ctx->ins, "token retrieval failed"); + } + else { + flb_sds_destroy(token); + } + + ret = check_chronicle_log_type(ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "Validate log_type failed. '%s' is not supported. ret = %d", + ctx->log_type, ret); + return -1; + } + + return 0; +} + +static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, uint64_t bytes, struct flb_log_event log_event) +{ + int i; + int map_size; + int check = FLB_FALSE; + int found = FLB_FALSE; + int log_key_missing = 0; + int ret; + struct flb_chronicle *ctx = out_context; + char *val_buf; + char *key_str = NULL; + size_t key_str_size = 0; + size_t msgpack_size = bytes + bytes / 4; + size_t val_offset = 0; + flb_sds_t out_buf; + msgpack_object map; + msgpack_object key; + msgpack_object val; + + /* Allocate buffer to store log_key contents */ + val_buf = flb_calloc(1, msgpack_size); + if (val_buf == NULL) { + flb_plg_error(ctx->ins, "Could not allocate enough " + "memory to read record"); + flb_errno(); + return NULL; + } + + /* Get the record/map */ + map = *log_event.body; + + if (map.type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + map_size = map.via.map.size; + + /* Reset variables for found log_key and correct type */ + found = FLB_FALSE; + check = FLB_FALSE; + + /* Extract log_key from record and append to output buffer */ + for (i = 0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { + found = FLB_TRUE; + + /* + * Copy contents of value into buffer. Necessary to copy + * strings because flb_msgpack_to_json does not handle nested + * JSON gracefully and double escapes them. + */ + if (val.type == MSGPACK_OBJECT_BIN) { + memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size); + val_offset += val.via.bin.size; + val_buf[val_offset] = '\0'; + val_offset++; + } + else if (val.type == MSGPACK_OBJECT_STR) { + memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size); + val_offset += val.via.str.size; + val_buf[val_offset] = '\0'; + val_offset++; + } + else { + ret = flb_msgpack_to_json(val_buf + val_offset, + msgpack_size - val_offset, &val); + if (ret < 0) { + break; + } + val_offset += ret; + val_buf[val_offset] = '\0'; + val_offset++; + } + /* Exit early once log_key has been found for current record */ + break; + } + } + + /* If log_key was not found in the current record, mark log key as missing */ + if (found == FLB_FALSE) { + log_key_missing++; + } + } + + if (log_key_missing > 0) { + flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records", + ctx->log_key, log_key_missing); + } + + /* If nothing was read, destroy buffer */ + if (val_offset == 0) { + flb_free(val_buf); + return NULL; + } + val_buf[val_offset] = '\0'; + + /* Create output buffer to store contents */ + out_buf = flb_sds_create(val_buf); + if (out_buf == NULL) { + flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents."); + flb_errno(); + } + flb_free(val_buf); + + return out_buf; +} + +static int chronicle_format(const void *data, size_t bytes, + const char *tag, size_t tag_len, + char **out_data, size_t *out_size, + struct flb_chronicle *ctx) +{ + int len; + int ret; + int array_size = 0; + size_t off = 0; + size_t last_off = 0; + size_t alloc_size = 0; + size_t s; + char time_formatted[255]; + /* Parameters for Timestamp */ + struct tm tm; + flb_sds_t out_buf; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + flb_sds_t log_text = NULL; + int log_text_size; + + /* Count number of records */ + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + return -1; + } + + array_size = flb_mp_count(data, bytes); + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* + * Pack root map (unstructured log): + * see: https://cloud.google.com/chronicle/docs/reference/ingestion-api#request_body_2 + * { + * "customer_id": "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c", + * "log_type": "BIND_DNS", + * "entries": [ + * { + * "log_text": "26-Feb-2019 13:35:02.187 client 10.120.20.32#4238: query: altostrat.com IN A + (203.0.113.102)", + * "ts_epoch_microseconds": 1551188102187000 + * }, + * { + * "log_text": "26-Feb-2019 13:37:04.523 client 10.50.100.33#1116: query: examplepetstore.com IN A + (203.0.113.102)", + * "ts_rfc3339": "2019-26-02T13:37:04.523-08:00" + * }, + * { + * "log_text": "26-Feb-2019 13:39:01.115 client 10.1.2.3#3333: query: www.example.com IN A + (203.0.113.102)" + * }, + * ] + * } + */ + msgpack_pack_map(&mp_pck, 3); + + msgpack_pack_str(&mp_pck, 11); + msgpack_pack_str_body(&mp_pck, "customer_id", 11); + + msgpack_pack_str(&mp_pck, strlen(ctx->customer_id)); + msgpack_pack_str_body(&mp_pck, ctx->customer_id, strlen(ctx->customer_id)); + + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "log_type", 8); + + msgpack_pack_str(&mp_pck, strlen(ctx->log_type)); + msgpack_pack_str_body(&mp_pck, ctx->log_type, strlen(ctx->log_type)); + + msgpack_pack_str(&mp_pck, 7); + msgpack_pack_str_body(&mp_pck, "entries", 7); + + /* Append entries */ + msgpack_pack_array(&mp_pck, array_size); + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + off = log_decoder.offset; + alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ + last_off = off; + + /* + * Pack entries + * + * { + * "log_text": {...}, + * "ts_rfc3339": "..." + * } + * + */ + msgpack_pack_map(&mp_pck, 2); + + /* log_text */ + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "log_text", 8); + if (ctx->log_key != NULL) { + log_text = flb_pack_msgpack_extract_log_key(ctx, bytes, log_event); + log_text_size = flb_sds_len(log_text); + } + else { + log_text = flb_msgpack_to_json_str(alloc_size, log_event.body); + log_text_size = strlen(log_text); + } + + if (log_text == NULL) { + flb_plg_error(ctx->ins, "Could not marshal msgpack to output string"); + return -1; + } + msgpack_pack_str(&mp_pck, log_text_size); + msgpack_pack_str_body(&mp_pck, log_text, log_text_size); + + if (ctx->log_key != NULL) { + flb_sds_destroy(log_text); + } + else { + flb_free(log_text); + } + /* timestamp */ + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "ts_rfc3339", 10); + + gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); + s = strftime(time_formatted, sizeof(time_formatted) - 1, + FLB_STD_TIME_FMT, &tm); + len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, + ".%03" PRIu64 "Z", + (uint64_t) log_event.timestamp.tm.tv_nsec); + s += len; + + msgpack_pack_str(&mp_pck, s); + msgpack_pack_str_body(&mp_pck, time_formatted, s); + } + + /* Convert from msgpack to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + + flb_log_event_decoder_destroy(&log_decoder); + msgpack_sbuffer_destroy(&mp_sbuf); + + if (!out_buf) { + flb_plg_error(ctx->ins, "error formatting JSON payload"); + return -1; + } + + *out_data = out_buf; + *out_size = flb_sds_len(out_buf); + + return 0; +} + +static void cb_chronicle_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + (void) i_ins; + (void) config; + int ret; + int ret_code = FLB_RETRY; + size_t b_sent; + flb_sds_t token; + flb_sds_t payload_buf; + size_t payload_size; + struct flb_chronicle *ctx = out_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + + flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Get or renew Token */ + token = get_google_token(ctx); + + if (!token) { + flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Reformat msgpack to chronicle JSON payload */ + ret = chronicle_format(event_chunk->data, event_chunk->size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + &payload_buf, &payload_size, ctx); + if (ret != 0) { + flb_upstream_conn_release(u_conn); + flb_sds_destroy(token); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->endpoint, + payload_buf, payload_size, NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(token); + flb_sds_destroy(payload_buf); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + flb_http_buffer_size(c, 4192); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Content-Type", 12, "application/json", 16); + + /* Compose and append Authorization header */ + flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + + /* validate response */ + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + ret_code = FLB_RETRY; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); + if (c->resp.status == 200) { + ret_code = FLB_OK; + } + else { + if (c->resp.payload && c->resp.payload_size > 0) { + /* we got an error */ + flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); + } + ret_code = FLB_RETRY; + } + } + + /* Cleanup */ + flb_sds_destroy(payload_buf); + flb_sds_destroy(token); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + /* Done */ + FLB_OUTPUT_RETURN(ret_code); +} + +static int cb_chronicle_exit(void *data, struct flb_config *config) +{ + struct flb_chronicle *ctx = data; + + if (!ctx) { + return -1; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + flb_chronicle_conf_destroy(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_chronicle, credentials_file), + "Set the path for the google service credentials file" + }, + // set in flb_chronicle_oauth_credentials + { + FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the service account email" + }, + // set in flb_chronicle_oauth_credentials + { + FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL, + 0, FLB_FALSE, 0, + "Set the service account secret" + }, + { + FLB_CONFIG_MAP_STR, "project_id", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_chronicle, project_id), + "Set the project id" + }, + { + FLB_CONFIG_MAP_STR, "customer_id", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_chronicle, customer_id), + "Set the customer id" + }, + { + FLB_CONFIG_MAP_STR, "log_type", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_chronicle, log_type), + "Set the log type" + }, + { + FLB_CONFIG_MAP_STR, "region", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_chronicle, region), + "Set the region" + }, + { + FLB_CONFIG_MAP_STR, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_chronicle, log_key), + "Set the log key" + }, + /* EOF */ + {0} +}; + +struct flb_output_plugin out_chronicle_plugin = { + .name = "chronicle", + .description = "Send logs to Google Chronicle as unstructured log", + .cb_init = cb_chronicle_init, + .cb_flush = cb_chronicle_flush, + .cb_exit = cb_chronicle_exit, + .config_map = config_map, + /* Plugin flags */ + .flags = FLB_OUTPUT_NET | FLB_IO_TLS, +}; diff --git a/fluent-bit/plugins/out_chronicle/chronicle.h b/fluent-bit/plugins/out_chronicle/chronicle.h new file mode 100644 index 000000000..0243223f0 --- /dev/null +++ b/fluent-bit/plugins/out_chronicle/chronicle.h @@ -0,0 +1,96 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_CHRONICLE +#define FLB_OUT_CHRONICLE + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_oauth2.h> +#include <fluent-bit/flb_sds.h> + +/* refresh token every 50 minutes */ +#define FLB_CHRONICLE_TOKEN_REFRESH 3000 + +/* Timestamp format */ +#define FLB_STD_TIME_FMT "%Y-%m-%dT%H:%M:%S" + +/* Chronicle unstructureed logs oauth scope */ +#define FLB_CHRONICLE_SCOPE "https://www.googleapis.com/auth/malachite-ingestion" + +/* Chronicle authorization URL */ +#define FLB_CHRONICLE_AUTH_URL "https://oauth2.googleapis.com/token" + +#define FLB_CHRONICLE_UNSTRUCTURED_ENDPOINT "/v2/unstructuredlogentries:batchCreate" +#define FLB_CHRONICLE_LOG_TYPE_ENDPOINT "/v2/logtypes" +#define FLB_CHRONICLE_URL_BASE "https://malachiteingestion-pa.googleapis.com" +#define FLB_CHRONICLE_URL_BASE_EU "https://europe-malachiteingestion-pa.googleapis.com" +#define FLB_CHRONICLE_URL_BASE_UK "https://europe-west2-malachiteingestion-pa.googleapis.com" +#define FLB_CHRONICLE_URL_BASE_ASIA "https://asia-southeast1-malachiteingestion-pa.googleapis.com" + +struct flb_chronicle_oauth_credentials { + /* parsed credentials file */ + flb_sds_t type; + flb_sds_t project_id; + flb_sds_t private_key_id; + flb_sds_t private_key; + flb_sds_t client_email; + flb_sds_t client_id; + flb_sds_t auth_uri; + flb_sds_t token_uri; +}; + +struct flb_chronicle { + /* credentials */ + flb_sds_t credentials_file; + + struct flb_chronicle_oauth_credentials *oauth_credentials; + + /* chronicle configuration */ + flb_sds_t project_id; + flb_sds_t customer_id; + flb_sds_t log_type; + + flb_sds_t uri; + flb_sds_t health_uri; + flb_sds_t endpoint; + flb_sds_t region; + flb_sds_t log_key; + + int json_date_format; + flb_sds_t json_date_key; + flb_sds_t date_key; + + /* oauth2 context */ + struct flb_oauth2 *o; + + /* mutex for acquiring oauth tokens */ + pthread_mutex_t token_mutex; + + /* Upstream connection to the backend server */ + struct flb_upstream *u; + + /* Fluent Bit context */ + struct flb_config *config; + + /* Plugin output instance reference */ + struct flb_output_instance *ins; +}; + +#endif diff --git a/fluent-bit/plugins/out_chronicle/chronicle_conf.c b/fluent-bit/plugins/out_chronicle/chronicle_conf.c new file mode 100644 index 000000000..5d6cdf9b2 --- /dev/null +++ b/fluent-bit/plugins/out_chronicle/chronicle_conf.c @@ -0,0 +1,421 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_unescape.h> +#include <fluent-bit/flb_jsmn.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_aws_credentials.h> + +#include <sys/types.h> +#include <sys/stat.h> + +#include "chronicle.h" +#include "chronicle_conf.h" + + +static inline int key_cmp(char *str, int len, char *cmp) { + + if (strlen(cmp) != len) { + return -1; + } + + return strncasecmp(str, cmp, len); +} + +static int flb_chronicle_read_credentials_file(struct flb_chronicle *ctx, + char *creds, + struct flb_chronicle_oauth_credentials *ctx_creds) +{ + int i; + int ret; + int len; + int key_len; + int val_len; + int tok_size = 32; + char *buf; + char *key; + char *val; + flb_sds_t tmp; + struct stat st; + jsmn_parser parser; + jsmntok_t *t; + jsmntok_t *tokens; + + /* Validate credentials path */ + ret = stat(creds, &st); + if (ret == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot open credentials file: %s", + creds); + return -1; + } + + if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) { + flb_plg_error(ctx->ins, "credentials file " + "is not a valid file: %s", creds); + return -1; + } + + /* Read file content */ + buf = mk_file_to_buffer(creds); + if (!buf) { + flb_plg_error(ctx->ins, "error reading credentials file: %s", + creds); + return -1; + } + + /* Parse content */ + jsmn_init(&parser); + tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + if (!tokens) { + flb_errno(); + flb_free(buf); + return -1; + } + + ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size); + if (ret <= 0) { + flb_plg_error(ctx->ins, "invalid JSON credentials file: %s", + creds); + flb_free(buf); + flb_free(tokens); + return -1; + } + + t = &tokens[0]; + if (t->type != JSMN_OBJECT) { + flb_plg_error(ctx->ins, "invalid JSON map on file: %s", + creds); + flb_free(buf); + flb_free(tokens); + return -1; + } + + /* Parse JSON tokens */ + for (i = 1; i < ret; i++) { + t = &tokens[i]; + if (t->type != JSMN_STRING) { + continue; + } + + if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){ + break; + } + + /* Key */ + key = buf + t->start; + key_len = (t->end - t->start); + + /* Value */ + i++; + t = &tokens[i]; + val = buf + t->start; + val_len = (t->end - t->start); + + if (key_cmp(key, key_len, "type") == 0) { + ctx_creds->type = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "project_id") == 0) { + ctx_creds->project_id = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "private_key_id") == 0) { + ctx_creds->private_key_id = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "private_key") == 0) { + tmp = flb_sds_create_len(val, val_len); + if (tmp) { + /* Unescape private key */ + len = flb_sds_len(tmp); + ctx_creds->private_key = flb_sds_create_size(len); + flb_unescape_string(tmp, len, + &ctx_creds->private_key); + flb_sds_destroy(tmp); + } + } + else if (key_cmp(key, key_len, "client_email") == 0) { + ctx_creds->client_email = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "client_id") == 0) { + ctx_creds->client_id = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "auth_uri") == 0) { + ctx_creds->auth_uri = flb_sds_create_len(val, val_len); + } + else if (key_cmp(key, key_len, "token_uri") == 0) { + ctx_creds->token_uri = flb_sds_create_len(val, val_len); + } + } + + flb_free(buf); + flb_free(tokens); + + return 0; +} + + +struct flb_chronicle *flb_chronicle_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + struct flb_chronicle *ctx; + struct flb_chronicle_oauth_credentials *creds; + + /* Allocate config context */ + ctx = flb_calloc(1, sizeof(struct flb_chronicle)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + ctx->config = config; + + ret = flb_output_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to load configuration"); + flb_free(ctx); + return NULL; + } + + /* Lookup credentials file */ + creds = flb_calloc(1, sizeof(struct flb_chronicle_oauth_credentials)); + if (!creds) { + flb_errno(); + flb_free(ctx); + return NULL; + } + ctx->oauth_credentials = creds; + + if (ctx->credentials_file == NULL) { + tmp = getenv("GOOGLE_SERVICE_CREDENTIALS"); + if (tmp) { + ctx->credentials_file = flb_sds_create(tmp); + } + } + + if (ctx->credentials_file) { + ret = flb_chronicle_read_credentials_file(ctx, + ctx->credentials_file, + ctx->oauth_credentials); + if (ret != 0) { + flb_chronicle_conf_destroy(ctx); + return NULL; + } + } + else if (!ctx->credentials_file) { + /* + * If no credentials file has been defined, do manual lookup of the + * client email and the private key. + */ + + /* Service Account Email */ + tmp = flb_output_get_property("service_account_email", ins); + if (tmp) { + creds->client_email = flb_sds_create(tmp); + } + else { + tmp = getenv("SERVICE_ACCOUNT_EMAIL"); + if (tmp) { + creds->client_email = flb_sds_create(tmp); + } + } + + /* Service Account Secret */ + tmp = flb_output_get_property("service_account_secret", ins); + if (tmp) { + creds->private_key = flb_sds_create(tmp); + } + else { + tmp = getenv("SERVICE_ACCOUNT_SECRET"); + if (tmp) { + creds->private_key = flb_sds_create(tmp); + } + } + + if (!creds->client_email) { + flb_plg_error(ctx->ins, "service_account_email/client_email is not defined"); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + + if (!creds->private_key) { + flb_plg_error(ctx->ins, "service_account_secret/private_key is not defined"); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + } + + /* config: 'project_id' */ + if (ctx->project_id == NULL) { + if (creds->project_id) { + /* flb_config_map_destroy uses the pointer within the config_map struct to + * free the value so if we assign it here it is safe to free later with the + * creds struct. If we do not we will leak here. + */ + ctx->project_id = creds->project_id; + if (!ctx->project_id) { + flb_plg_error(ctx->ins, + "failed extracting 'project_id' from credentials."); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + } + else { + flb_plg_error(ctx->ins, + "no 'project_id' configured or present in credentials."); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + } + + /* config: 'customer_id' */ + if (ctx->customer_id == NULL) { + flb_plg_error(ctx->ins, "property 'customer_id' is not defined"); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + + /* config: 'log_type' */ + if (ctx->log_type == NULL) { + flb_plg_error(ctx->ins, "property 'log_type' is not defined"); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + + /* Date key */ + ctx->date_key = ctx->json_date_key; + tmp = flb_output_get_property("json_date_key", ins); + if (tmp) { + /* Just check if we have to disable it */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ctx->date_key = NULL; + } + } + + /* Date format for JSON output */ + ctx->json_date_format = FLB_PACK_JSON_DATE_ISO8601; + tmp = flb_output_get_property("json_date_format", ins); + if (tmp) { + ret = flb_pack_to_json_date_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "invalid json_date_format '%s'. ", tmp); + return NULL; + } + else { + ctx->json_date_format = ret; + } + } + + /* Create the target endpoint URI */ + ctx->endpoint = flb_sds_create_size(sizeof(FLB_CHRONICLE_UNSTRUCTURED_ENDPOINT)); + if (!ctx->endpoint) { + flb_errno(); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + ctx->endpoint = flb_sds_printf(&ctx->endpoint, FLB_CHRONICLE_UNSTRUCTURED_ENDPOINT); + + /* Create the base URI */ + if (ctx->region == NULL || strncasecmp(ctx->region, "US", 2) == 0) { + ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE)); + if (!ctx->uri) { + flb_errno(); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE); + } + else if (strncasecmp(ctx->region, "EU", 2) == 0){ + ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE_EU)); + if (!ctx->uri) { + flb_errno(); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE_EU); + } + else if (strncasecmp(ctx->region, "UK", 2) == 0) { + ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE_UK)); + if (!ctx->uri) { + flb_errno(); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE_UK); + } + else if (strncasecmp(ctx->region, "ASIA", 4) == 0) { + ctx->uri = flb_sds_create_size(sizeof(FLB_CHRONICLE_URL_BASE_ASIA)); + if (!ctx->uri) { + flb_errno(); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + ctx->uri = flb_sds_printf(&ctx->uri, FLB_CHRONICLE_URL_BASE_ASIA); + } + else { + flb_plg_error(ctx->ins, "unsupported region"); + flb_chronicle_conf_destroy(ctx); + return NULL; + } + flb_plg_info(ctx->ins, "project='%s' custumer_id='%s' region='%s'", + ctx->project_id, ctx->customer_id, ctx->region); + + return ctx; +} + + +int flb_chronicle_oauth_credentials_destroy(struct flb_chronicle_oauth_credentials *creds) +{ + if (!creds) { + return -1; + } + flb_sds_destroy(creds->type); + flb_sds_destroy(creds->project_id); + flb_sds_destroy(creds->private_key_id); + flb_sds_destroy(creds->private_key); + flb_sds_destroy(creds->client_email); + flb_sds_destroy(creds->client_id); + flb_sds_destroy(creds->auth_uri); + flb_sds_destroy(creds->token_uri); + + flb_free(creds); + + return 0; +} + +int flb_chronicle_conf_destroy(struct flb_chronicle *ctx) +{ + if (!ctx) { + return -1; + } + + flb_chronicle_oauth_credentials_destroy(ctx->oauth_credentials); + + flb_sds_destroy(ctx->endpoint); + flb_sds_destroy(ctx->uri); + + if (ctx->o) { + flb_oauth2_destroy(ctx->o); + } + + flb_free(ctx); + return 0; +} diff --git a/fluent-bit/plugins/out_chronicle/chronicle_conf.h b/fluent-bit/plugins/out_chronicle/chronicle_conf.h new file mode 100644 index 000000000..76dcfb3d2 --- /dev/null +++ b/fluent-bit/plugins/out_chronicle/chronicle_conf.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_CHRONICLE_CONF_H +#define FLB_OUT_CHRONICLE_CONF_H + +#include "chronicle.h" + +struct flb_chronicle *flb_chronicle_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +int flb_chronicle_conf_destroy(struct flb_chronicle *ctx); + +#endif |