diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/out_stackdriver | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_stackdriver')
19 files changed, 0 insertions, 5573 deletions
diff --git a/fluent-bit/plugins/out_stackdriver/CMakeLists.txt b/fluent-bit/plugins/out_stackdriver/CMakeLists.txt deleted file mode 100644 index 2d7fa71bb..000000000 --- a/fluent-bit/plugins/out_stackdriver/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -set(src - gce_metadata.c - stackdriver_conf.c - stackdriver.c - stackdriver_operation.c - stackdriver_source_location.c - stackdriver_http_request.c - stackdriver_timestamp.c - stackdriver_helper.c - stackdriver_resource_types.c - ) - -FLB_PLUGIN(out_stackdriver "${src}" "") diff --git a/fluent-bit/plugins/out_stackdriver/gce_metadata.c b/fluent-bit/plugins/out_stackdriver/gce_metadata.c deleted file mode 100644 index fb942213b..000000000 --- a/fluent-bit/plugins/out_stackdriver/gce_metadata.c +++ /dev/null @@ -1,222 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_oauth2.h> - -#include <msgpack.h> - -#include "gce_metadata.h" -#include "stackdriver.h" -#include "stackdriver_conf.h" - - -static int fetch_metadata(struct flb_stackdriver *ctx, - struct flb_upstream *upstream, char *uri, - char *payload) -{ - int ret; - int ret_code; - size_t b_sent; - struct flb_connection *metadata_conn; - struct flb_http_client *c; - - /* If runtime test mode is enabled, add test data */ - if (ctx->ins->test_mode == FLB_TRUE) { - if (strcmp(uri, FLB_STD_METADATA_PROJECT_ID_URI) == 0) { - flb_sds_cat(payload, "fluent-bit-test", 15); - return 0; - } - else if (strcmp(uri, FLB_STD_METADATA_ZONE_URI) == 0) { - flb_sds_cat(payload, "projects/0123456789/zones/fluent", 32); - return 0; - } - else if (strcmp(uri, FLB_STD_METADATA_INSTANCE_ID_URI) == 0) { - flb_sds_cat(payload, "333222111", 9); - return 0; - } - return -1; - } - - /* Get metadata connection */ - metadata_conn = flb_upstream_conn_get(upstream); - if (!metadata_conn) { - flb_plg_error(ctx->ins, "failed to create metadata connection"); - return -1; - } - - /* Compose HTTP Client request */ - c = flb_http_client(metadata_conn, FLB_HTTP_GET, uri, - "", 0, NULL, 0, NULL, 0); - - flb_http_buffer_size(c, FLB_STD_METADATA_TOKEN_SIZE_MAX); - - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/text", 16); - flb_http_add_header(c, "Metadata-Flavor", 15, "Google", 6); - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* validate response */ - if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i", ret); - ret_code = -1; - } - else { - /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); - if (c->resp.status == 200) { - ret_code = 0; - flb_sds_copy(payload, c->resp.payload, c->resp.payload_size); - } - else { - if (c->resp.payload_size > 0) { - /* we got an error */ - flb_plg_warn(ctx->ins, "error\n%s", c->resp.payload); - } - else { - flb_plg_debug(ctx->ins, "response\n%s", c->resp.payload); - } - ret_code = -1; - } - } - - /* Cleanup */ - flb_http_client_destroy(c); - flb_upstream_conn_release(metadata_conn); - - return ret_code; -} - -int gce_metadata_read_token(struct flb_stackdriver *ctx) -{ - int ret; - flb_sds_t uri = flb_sds_create(FLB_STD_METADATA_SERVICE_ACCOUNT_URI); - flb_sds_t payload = flb_sds_create_size(FLB_STD_METADATA_TOKEN_SIZE_MAX); - - uri = flb_sds_cat(uri, ctx->client_email, flb_sds_len(ctx->client_email)); - uri = flb_sds_cat(uri, "/token", 6); - ret = fetch_metadata(ctx, ctx->metadata_u, uri, payload); - if (ret != 0) { - flb_plg_error(ctx->ins, "can't fetch token from the metadata server"); - flb_sds_destroy(payload); - flb_sds_destroy(uri); - return -1; - } - - ret = flb_oauth2_parse_json_response(payload, flb_sds_len(payload), ctx->o); - flb_sds_destroy(payload); - flb_sds_destroy(uri); - - if (ret != 0) { - flb_plg_error(ctx->ins, "unable to parse token body"); - return -1; - } - ctx->o->expires = time(NULL) + ctx->o->expires_in; - return 0; -} - -int gce_metadata_read_zone(struct flb_stackdriver *ctx) -{ - int ret; - int i; - int j; - int part = 0; - flb_sds_t payload = flb_sds_create_size(4096); - flb_sds_t zone = NULL; - - ret = fetch_metadata(ctx, ctx->metadata_u, FLB_STD_METADATA_ZONE_URI, - payload); - if (ret != 0) { - flb_plg_error(ctx->ins, "can't fetch zone from the metadata server"); - flb_sds_destroy(payload); - return -1; - } - - /* Data returned in the format projects/{project-id}/zones/{name} */ - for (i = 0; i < flb_sds_len(payload); ++i) { - if (payload[i] == '/') { - part++; - } - if (part == 3) { - i++; - break; - } - } - - if (part != 3) { - flb_plg_error(ctx->ins, "wrong format of zone response"); - flb_sds_destroy(payload); - return -1; - } - - zone = flb_sds_create_size(flb_sds_len(payload) - i); - - j = 0; - while (i != flb_sds_len(payload)) { - zone[j] = payload[i]; - i++; - j++; - } - zone[j] = '\0'; - ctx->zone = flb_sds_create(zone); - flb_sds_destroy(zone); - flb_sds_destroy(payload); - - return 0; -} - -int gce_metadata_read_project_id(struct flb_stackdriver *ctx) -{ - int ret; - flb_sds_t payload = flb_sds_create_size(4096); - - ret = fetch_metadata(ctx, ctx->metadata_u, - FLB_STD_METADATA_PROJECT_ID_URI, payload); - if (ret != 0) { - flb_plg_error(ctx->ins, "can't fetch project id from the metadata server"); - flb_sds_destroy(payload); - return -1; - } - ctx->project_id = flb_sds_create(payload); - flb_sds_destroy(payload); - return 0; -} - -int gce_metadata_read_instance_id(struct flb_stackdriver *ctx) -{ - int ret; - flb_sds_t payload = flb_sds_create_size(4096); - - ret = fetch_metadata(ctx, ctx->metadata_u, - FLB_STD_METADATA_INSTANCE_ID_URI, payload); - if (ret != 0) { - flb_plg_error(ctx->ins, "can't fetch instance id from the metadata server"); - flb_sds_destroy(payload); - return -1; - } - ctx->instance_id = flb_sds_create(payload); - flb_sds_destroy(payload); - return 0; -} diff --git a/fluent-bit/plugins/out_stackdriver/gce_metadata.h b/fluent-bit/plugins/out_stackdriver/gce_metadata.h deleted file mode 100644 index 65009588d..000000000 --- a/fluent-bit/plugins/out_stackdriver/gce_metadata.h +++ /dev/null @@ -1,48 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLUENT_BIT_GCE_METADATA_H -#define FLUENT_BIT_GCE_METADATA_H - -#include "stackdriver.h" - -/* Metadata server URL */ -#define FLB_STD_METADATA_SERVER "http://metadata.google.internal" - -/* Project ID metadata URI */ -#define FLB_STD_METADATA_PROJECT_ID_URI "/computeMetadata/v1/project/project-id" - -/* Zone metadata URI */ -#define FLB_STD_METADATA_ZONE_URI "/computeMetadata/v1/instance/zone" - -/* Instance ID metadata URI */ -#define FLB_STD_METADATA_INSTANCE_ID_URI "/computeMetadata/v1/instance/id" - -/* Service account metadata URI */ -#define FLB_STD_METADATA_SERVICE_ACCOUNT_URI "/computeMetadata/v1/instance/service-accounts/" - -/* Max size of token response from metadata server */ -#define FLB_STD_METADATA_TOKEN_SIZE_MAX 14336 - -int gce_metadata_read_token(struct flb_stackdriver *ctx); -int gce_metadata_read_zone(struct flb_stackdriver *ctx); -int gce_metadata_read_project_id(struct flb_stackdriver *ctx); -int gce_metadata_read_instance_id(struct flb_stackdriver *ctx); - -#endif //FLUENT_BIT_GCE_METADATA_H diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver.c b/fluent-bit/plugins/out_stackdriver/stackdriver.c deleted file mode 100644 index 5c48a338b..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver.c +++ /dev/null @@ -1,2867 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_oauth2.h> -#include <fluent-bit/flb_regex.h> -#include <fluent-bit/flb_pthread.h> -#include <fluent-bit/flb_crypto.h> -#include <fluent-bit/flb_hash.h> -#include <fluent-bit/flb_base64.h> -#include <fluent-bit/flb_kv.h> -#include <fluent-bit/flb_ra_key.h> -#include <fluent-bit/flb_record_accessor.h> -#include <fluent-bit/flb_log_event_decoder.h> -#include <fluent-bit/flb_gzip.h> - -#include <msgpack.h> - -#include "gce_metadata.h" -#include "stackdriver.h" -#include "stackdriver_conf.h" -#include "stackdriver_operation.h" -#include "stackdriver_source_location.h" -#include "stackdriver_http_request.h" -#include "stackdriver_timestamp.h" -#include "stackdriver_helper.h" -#include "stackdriver_resource_types.h" - -pthread_key_t oauth2_type; -pthread_key_t oauth2_token; -pthread_key_t oauth2_token_expires; - -static void oauth2_cache_exit(void *ptr) -{ - if (ptr) { - flb_sds_destroy(ptr); - } -} - -static void oauth2_cache_free_expiration(void *ptr) -{ - if (ptr) { - flb_free(ptr); - } -} - -static void oauth2_cache_init() -{ - /* oauth2 pthread key */ - pthread_key_create(&oauth2_type, oauth2_cache_exit); - pthread_key_create(&oauth2_token, oauth2_cache_exit); - pthread_key_create(&oauth2_token_expires, oauth2_cache_free_expiration); -} - -/* Set oauth2 type and token in pthread keys */ -static void oauth2_cache_set(char *type, char *token, time_t expires) -{ - flb_sds_t tmp; - time_t *tmp_expires; - - /* oauth2 type */ - tmp = pthread_getspecific(oauth2_type); - if (tmp) { - flb_sds_destroy(tmp); - } - tmp = flb_sds_create(type); - pthread_setspecific(oauth2_type, tmp); - - /* oauth2 access token */ - tmp = pthread_getspecific(oauth2_token); - if (tmp) { - flb_sds_destroy(tmp); - } - tmp = flb_sds_create(token); - pthread_setspecific(oauth2_token, tmp); - - /* oauth2 access token expiration */ - tmp_expires = pthread_getspecific(oauth2_token_expires); - if (tmp_expires) { - flb_free(tmp_expires); - } - tmp_expires = flb_calloc(1, sizeof(time_t)); - if (!tmp_expires) { - flb_errno(); - return; - } - *tmp_expires = expires; - pthread_setspecific(oauth2_token_expires, tmp_expires); -} - -/* By using pthread keys cached values, compose the authorizatoin token */ -static time_t oauth2_cache_get_expiration() -{ - time_t *expires = pthread_getspecific(oauth2_token_expires); - if (expires) { - return *expires; - } - return 0; -} - -/* By using pthread keys cached values, compose the authorizatoin token */ -static flb_sds_t oauth2_cache_to_token() -{ - flb_sds_t type; - flb_sds_t token; - flb_sds_t output; - - type = pthread_getspecific(oauth2_type); - if (!type) { - return NULL; - } - - output = flb_sds_create(type); - if (!output) { - return NULL; - } - - token = pthread_getspecific(oauth2_token); - flb_sds_printf(&output, " %s", token); - return output; -} - -/* - * Base64 Encoding in JWT must: - * - * - remove any trailing padding '=' character - * - replace '+' with '-' - * - replace '/' with '_' - * - * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C - */ -int jwt_base64_url_encode(unsigned char *out_buf, size_t out_size, - unsigned char *in_buf, size_t in_size, - size_t *olen) - -{ - int i; - size_t len; - int result; - - - /* do normal base64 encoding */ - result = flb_base64_encode((unsigned char *) out_buf, out_size - 1, - &len, in_buf, in_size); - if (result != 0) { - return -1; - } - - /* Replace '+' and '/' characters */ - for (i = 0; i < len && out_buf[i] != '='; i++) { - if (out_buf[i] == '+') { - out_buf[i] = '-'; - } - else if (out_buf[i] == '/') { - out_buf[i] = '_'; - } - } - - /* Now 'i' becomes the new length */ - *olen = i; - return 0; -} - -static int jwt_encode(char *payload, char *secret, - char **out_signature, size_t *out_size, - struct flb_stackdriver *ctx) -{ - int ret; - int len; - int buf_size; - size_t olen; - char *buf; - char *sigd; - char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}"; - unsigned char sha256_buf[32] = {0}; - flb_sds_t out; - unsigned char sig[256] = {0}; - size_t sig_len; - - buf_size = (strlen(payload) + strlen(secret)) * 2; - buf = flb_malloc(buf_size); - if (!buf) { - flb_errno(); - return -1; - } - - /* Encode header */ - len = strlen(headers); - ret = flb_base64_encode((unsigned char *) buf, buf_size - 1, - &olen, (unsigned char *) headers, len); - if (ret != 0) { - flb_free(buf); - - return ret; - } - - /* Create buffer to store JWT */ - out = flb_sds_create_size(2048); - if (!out) { - flb_errno(); - flb_free(buf); - return -1; - } - - /* Append header */ - flb_sds_cat(out, buf, olen); - flb_sds_cat(out, ".", 1); - - /* Encode Payload */ - len = strlen(payload); - jwt_base64_url_encode((unsigned char *) buf, buf_size, - (unsigned char *) payload, len, &olen); - - /* Append Payload */ - flb_sds_cat(out, buf, olen); - - /* do sha256() of base64(header).base64(payload) */ - ret = flb_hash_simple(FLB_HASH_SHA256, - (unsigned char *) out, flb_sds_len(out), - sha256_buf, sizeof(sha256_buf)); - - if (ret != FLB_CRYPTO_SUCCESS) { - flb_plg_error(ctx->ins, "error hashing token"); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - len = strlen(secret); - sig_len = sizeof(sig); - - ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY, - FLB_CRYPTO_PADDING_PKCS1, - FLB_HASH_SHA256, - (unsigned char *) secret, len, - sha256_buf, sizeof(sha256_buf), - sig, &sig_len); - - if (ret != FLB_CRYPTO_SUCCESS) { - flb_plg_error(ctx->ins, "error creating RSA context"); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - sigd = flb_malloc(2048); - if (!sigd) { - flb_errno(); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen); - - flb_sds_cat(out, ".", 1); - flb_sds_cat(out, sigd, olen); - - *out_signature = out; - *out_size = flb_sds_len(out); - - flb_free(buf); - flb_free(sigd); - - return 0; -} - -/* Create a new oauth2 context and get a oauth2 token */ -static int get_oauth2_token(struct flb_stackdriver *ctx) -{ - int ret; - char *token; - char *sig_data; - size_t sig_size; - time_t issued; - time_t expires; - char payload[1024]; - - flb_oauth2_payload_clear(ctx->o); - - /* In case of using metadata server, fetch token from there */ - if (ctx->metadata_server_auth) { - return gce_metadata_read_token(ctx); - } - - /* JWT encode for oauth2 */ - issued = time(NULL); - expires = issued + FLB_STD_TOKEN_REFRESH; - - snprintf(payload, sizeof(payload) - 1, - "{\"iss\": \"%s\", \"scope\": \"%s\", " - "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}", - ctx->client_email, FLB_STD_SCOPE, - FLB_STD_AUTH_URL, - expires, issued); - - /* Compose JWT signature */ - ret = jwt_encode(payload, ctx->private_key, &sig_data, &sig_size, ctx); - if (ret != 0) { - flb_plg_error(ctx->ins, "JWT signature generation failed"); - return -1; - } - flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data); - - ret = flb_oauth2_payload_append(ctx->o, - "grant_type", -1, - "urn%3Aietf%3Aparams%3Aoauth%3A" - "grant-type%3Ajwt-bearer", -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - flb_sds_destroy(sig_data); - return -1; - } - - ret = flb_oauth2_payload_append(ctx->o, - "assertion", -1, - sig_data, sig_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - flb_sds_destroy(sig_data); - return -1; - } - flb_sds_destroy(sig_data); - - /* Retrieve access token */ - token = flb_oauth2_token_get(ctx->o); - if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); - return -1; - } - - return 0; -} - -static flb_sds_t get_google_token(struct flb_stackdriver *ctx) -{ - int ret = 0; - flb_sds_t output = NULL; - time_t cached_expiration = 0; - - ret = pthread_mutex_trylock(&ctx->token_mutex); - if (ret == EBUSY) { - /* - * If the routine is locked we just use our pre-cached values and - * compose the expected authorization value. - * - * If the routine fails it will return NULL and the caller will just - * issue a FLB_RETRY. - */ - output = oauth2_cache_to_token(); - cached_expiration = oauth2_cache_get_expiration(); - if (time(NULL) >= cached_expiration) { - return output; - } else { - /* - * Cached token is expired. Wait on lock to use up-to-date token - * by either waiting for it to be refreshed or refresh it ourselves. - */ - flb_plg_info(ctx->ins, "Cached token is expired. Waiting on lock."); - ret = pthread_mutex_lock(&ctx->token_mutex); - } - } - - if (ret != 0) { - flb_plg_error(ctx->ins, "error locking mutex"); - return NULL; - } - - if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - ret = get_oauth2_token(ctx); - } - - /* Copy string to prevent race conditions (get_oauth2 can free the string) */ - if (ret == 0) { - /* Update pthread keys cached values */ - oauth2_cache_set(ctx->o->token_type, ctx->o->access_token, ctx->o->expires); - - /* Compose outgoing buffer using cached values */ - output = oauth2_cache_to_token(); - } - - if (pthread_mutex_unlock(&ctx->token_mutex)){ - flb_plg_error(ctx->ins, "error unlocking mutex"); - if (output) { - flb_sds_destroy(output); - } - return NULL; - } - - - return output; -} - -void replace_prefix_dot(flb_sds_t s, int tag_prefix_len) -{ - int i; - int str_len; - char c; - - if (!s) { - return; - } - - str_len = flb_sds_len(s); - if (tag_prefix_len > str_len) { - flb_error("[output] tag_prefix shouldn't be longer than local_resource_id"); - return; - } - - for (i = 0; i < tag_prefix_len; i++) { - c = s[i]; - - if (c == '.') { - s[i] = '_'; - } - } -} - -static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map, - const char *key, int key_size) -{ - int i; - msgpack_object k; - msgpack_object v; - flb_sds_t ptr = NULL; - - for (i = 0; i < map.size; i++) { - k = map.ptr[i].key; - v = map.ptr[i].val; - - if (k.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (k.via.str.size == key_size && - strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) { - /* make sure to free it after use */ - ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size); - break; - } - } - - return ptr; -} - -/* parse_monitored_resource is to extract the monitoired resource labels - * from "logging.googleapis.com/monitored_resource" in log data - * and append to 'resource'/'labels' in log entry. - * Monitored resource type is already read from resource field in stackdriver - * output plugin configuration parameters. - * - * The structure of monitored_resource is: - * { - * "logging.googleapis.com/monitored_resource": { - * "labels": { - * "resource_label": <label_value>, - * } - * } - * } - * See https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types - * for required labels for each monitored resource. - */ - -static int parse_monitored_resource(struct flb_stackdriver *ctx, const void *data, size_t bytes, msgpack_packer *mp_pck) -{ - int ret = -1; - msgpack_object *obj; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - obj = log_event.body; - - msgpack_object_kv *kv = obj->via.map.ptr; - msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; - for (; kv < kvend; ++kv) { - if (kv->val.type == MSGPACK_OBJECT_MAP && kv->key.type == MSGPACK_OBJECT_STR - && strncmp (MONITORED_RESOURCE_KEY, kv->key.via.str.ptr, kv->key.via.str.size) == 0) { - msgpack_object subobj = kv->val; - msgpack_object_kv *p = subobj.via.map.ptr; - msgpack_object_kv *pend = subobj.via.map.ptr + subobj.via.map.size; - for (; p < pend; ++p) { - if (p->key.type != MSGPACK_OBJECT_STR || p->val.type != MSGPACK_OBJECT_MAP) { - continue; - } - if (strncmp("labels", p->key.via.str.ptr, p->key.via.str.size) == 0) { - msgpack_object labels = p->val; - msgpack_object_kv *q = labels.via.map.ptr; - msgpack_object_kv *qend = labels.via.map.ptr + labels.via.map.size; - int fields = 0; - for (; q < qend; ++q) { - if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "Key and value should be string in the %s/labels", MONITORED_RESOURCE_KEY); - } - ++fields; - } - if (fields > 0) { - msgpack_pack_map(mp_pck, fields); - q = labels.via.map.ptr; - for (; q < qend; ++q) { - if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) { - continue; - } - flb_plg_debug(ctx->ins, "[%s] found in the payload", MONITORED_RESOURCE_KEY); - msgpack_pack_str(mp_pck, q->key.via.str.size); - msgpack_pack_str_body(mp_pck, q->key.via.str.ptr, q->key.via.str.size); - msgpack_pack_str(mp_pck, q->val.via.str.size); - msgpack_pack_str_body(mp_pck, q->val.via.str.ptr, q->val.via.str.size); - } - - flb_log_event_decoder_destroy(&log_decoder); - - return 0; - } - } - } - } - } - } - - flb_log_event_decoder_destroy(&log_decoder); - - flb_plg_debug(ctx->ins, "[%s] not found in the payload", MONITORED_RESOURCE_KEY); - - return ret; -} - -/* - * Given a local_resource_id, split the content using the proper separator generating - * a linked list to store the spliited string - */ -static struct mk_list *parse_local_resource_id_to_list(char *local_resource_id, char *type) -{ - int ret = -1; - int max_split = -1; - int len_k8s_container; - int len_k8s_node; - int len_k8s_pod; - struct mk_list *list; - - len_k8s_container = sizeof(K8S_CONTAINER) - 1; - len_k8s_node = sizeof(K8S_NODE) - 1; - len_k8s_pod = sizeof(K8S_POD) - 1; - - /* Allocate list head */ - list = flb_malloc(sizeof(struct mk_list)); - if (!list) { - flb_errno(); - return NULL; - } - mk_list_init(list); - - /* Determinate the max split value based on type */ - if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) { - /* including the prefix of tag */ - max_split = 4; - } - else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) { - max_split = 2; - } - else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) { - max_split = 3; - } - - /* The local_resource_id is splitted by '.' */ - ret = flb_slist_split_string(list, local_resource_id, '.', max_split); - - if (ret == -1 || mk_list_size(list) != max_split) { - flb_error("error parsing local_resource_id [%s] for type %s", local_resource_id, type); - flb_slist_destroy(list); - flb_free(list); - return NULL; - } - - return list; -} - -/* - * extract_local_resource_id(): - * - extract the value from "logging.googleapis.com/local_resource_id" field - * - if local_resource_id is missing from the payLoad, use the tag of the log - */ -static int extract_local_resource_id(const void *data, size_t bytes, - struct flb_stackdriver *ctx, const char *tag) { - msgpack_object_map map; - flb_sds_t local_resource_id; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - int ret; - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - if ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - map = log_event.body->via.map; - local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY, - LEN_LOCAL_RESOURCE_ID_KEY); - - if (local_resource_id == NULL) { - /* if local_resource_id is not found, use the tag of the log */ - flb_plg_debug(ctx->ins, "local_resource_id not found, " - "tag [%s] is assigned for local_resource_id", tag); - local_resource_id = flb_sds_create(tag); - } - - /* we need to create up the local_resource_id from previous log */ - if (ctx->local_resource_id) { - flb_sds_destroy(ctx->local_resource_id); - } - - ctx->local_resource_id = flb_sds_create(local_resource_id); - - flb_sds_destroy(local_resource_id); - - ret = 0; - } - else { - flb_plg_error(ctx->ins, "failed to unpack data"); - - ret = -1; - } - - flb_log_event_decoder_destroy(&log_decoder); - - return ret; -} - -/* - * set_monitored_resource_labels(): - * - use the extracted local_resource_id to assign the label keys for different - * resource types that are specified in the configuration of stackdriver_out plugin - */ -static int set_monitored_resource_labels(struct flb_stackdriver *ctx, char *type) -{ - int ret = -1; - int first = FLB_TRUE; - int counter = 0; - int len_k8s_container; - int len_k8s_node; - int len_k8s_pod; - size_t prefix_len = 0; - struct local_resource_id_list *ptr; - struct mk_list *list = NULL; - struct mk_list *head; - flb_sds_t new_local_resource_id; - - if (!ctx->local_resource_id) { - flb_plg_error(ctx->ins, "local_resource_is is not assigned"); - return -1; - } - - len_k8s_container = sizeof(K8S_CONTAINER) - 1; - len_k8s_node = sizeof(K8S_NODE) - 1; - len_k8s_pod = sizeof(K8S_POD) - 1; - - prefix_len = flb_sds_len(ctx->tag_prefix); - if (flb_sds_casecmp(ctx->tag_prefix, ctx->local_resource_id, prefix_len) != 0) { - flb_plg_error(ctx->ins, "tag_prefix [%s] doesn't match the prefix of" - " local_resource_id [%s]", ctx->tag_prefix, - ctx->local_resource_id); - return -1; - } - - new_local_resource_id = flb_sds_create_len(ctx->local_resource_id, - flb_sds_len(ctx->local_resource_id)); - replace_prefix_dot(new_local_resource_id, prefix_len - 1); - - if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) { - list = parse_local_resource_id_to_list(new_local_resource_id, K8S_CONTAINER); - if (!list) { - goto error; - } - - /* iterate through the list */ - mk_list_foreach(head, list) { - ptr = mk_list_entry(head, struct local_resource_id_list, _head); - if (first) { - first = FLB_FALSE; - continue; - } - - /* Follow the order of fields in local_resource_id */ - if (counter == 0) { - if (ctx->namespace_name) { - flb_sds_destroy(ctx->namespace_name); - } - ctx->namespace_name = flb_sds_create(ptr->val); - } - else if (counter == 1) { - if (ctx->pod_name) { - flb_sds_destroy(ctx->pod_name); - } - ctx->pod_name = flb_sds_create(ptr->val); - } - else if (counter == 2) { - if (ctx->container_name) { - flb_sds_destroy(ctx->container_name); - } - ctx->container_name = flb_sds_create(ptr->val); - } - - counter++; - } - - if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) { - goto error; - } - } - else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) { - list = parse_local_resource_id_to_list(new_local_resource_id, K8S_NODE); - if (!list) { - goto error; - } - - mk_list_foreach(head, list) { - ptr = mk_list_entry(head, struct local_resource_id_list, _head); - if (first) { - first = FLB_FALSE; - continue; - } - - if (ptr != NULL) { - if (ctx->node_name) { - flb_sds_destroy(ctx->node_name); - } - ctx->node_name = flb_sds_create(ptr->val); - } - } - - if (!ctx->node_name) { - goto error; - } - } - else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) { - list = parse_local_resource_id_to_list(new_local_resource_id, K8S_POD); - if (!list) { - goto error; - } - - mk_list_foreach(head, list) { - ptr = mk_list_entry(head, struct local_resource_id_list, _head); - if (first) { - first = FLB_FALSE; - continue; - } - - /* Follow the order of fields in local_resource_id */ - if (counter == 0) { - if (ctx->namespace_name) { - flb_sds_destroy(ctx->namespace_name); - } - ctx->namespace_name = flb_sds_create(ptr->val); - } - else if (counter == 1) { - if (ctx->pod_name) { - flb_sds_destroy(ctx->pod_name); - } - ctx->pod_name = flb_sds_create(ptr->val); - } - - counter++; - } - - if (!ctx->namespace_name || !ctx->pod_name) { - goto error; - } - } - - ret = 0; - - if (list) { - flb_slist_destroy(list); - flb_free(list); - } - flb_sds_destroy(new_local_resource_id); - - return ret; - - error: - if (list) { - flb_slist_destroy(list); - flb_free(list); - } - - if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) { - if (ctx->namespace_name) { - flb_sds_destroy(ctx->namespace_name); - } - - if (ctx->pod_name) { - flb_sds_destroy(ctx->pod_name); - } - - if (ctx->container_name) { - flb_sds_destroy(ctx->container_name); - } - } - else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) { - if (ctx->node_name) { - flb_sds_destroy(ctx->node_name); - } - } - else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) { - if (ctx->namespace_name) { - flb_sds_destroy(ctx->namespace_name); - } - - if (ctx->pod_name) { - flb_sds_destroy(ctx->pod_name); - } - } - - flb_sds_destroy(new_local_resource_id); - return -1; -} - -static int is_tag_match_regex(struct flb_stackdriver *ctx, - const char *tag, int tag_len) -{ - int ret; - int tag_prefix_len; - int len_to_be_matched; - const char *tag_str_to_be_matcheds; - - tag_prefix_len = flb_sds_len(ctx->tag_prefix); - if (tag_len > tag_prefix_len && - flb_sds_cmp(ctx->tag_prefix, tag, tag_prefix_len) != 0) { - return 0; - } - - tag_str_to_be_matcheds = tag + tag_prefix_len; - len_to_be_matched = tag_len - tag_prefix_len; - ret = flb_regex_match(ctx->regex, - (unsigned char *) tag_str_to_be_matcheds, - len_to_be_matched); - - /* 1 -> match; 0 -> doesn't match; < 0 -> error */ - return ret; -} - -static int is_local_resource_id_match_regex(struct flb_stackdriver *ctx) -{ - int ret; - int prefix_len; - int len_to_be_matched; - const char *str_to_be_matcheds; - - if (!ctx->local_resource_id) { - flb_plg_warn(ctx->ins, "local_resource_id not found in the payload"); - return -1; - } - - prefix_len = flb_sds_len(ctx->tag_prefix); - str_to_be_matcheds = ctx->local_resource_id + prefix_len; - len_to_be_matched = flb_sds_len(ctx->local_resource_id) - prefix_len; - - ret = flb_regex_match(ctx->regex, - (unsigned char *) str_to_be_matcheds, - len_to_be_matched); - - /* 1 -> match; 0 -> doesn't match; < 0 -> error */ - return ret; -} - -static void cb_results(const char *name, const char *value, - size_t vlen, void *data); -/* - * extract_resource_labels_from_regex(4) will only be called if the - * tag or local_resource_id field matches the regex rule - */ -static int extract_resource_labels_from_regex(struct flb_stackdriver *ctx, - const char *tag, int tag_len, - int from_tag) -{ - int ret = 1; - int prefix_len; - int len_to_be_matched; - int local_resource_id_len; - const char *str_to_be_matcheds; - struct flb_regex_search result; - - prefix_len = flb_sds_len(ctx->tag_prefix); - if (from_tag == FLB_TRUE) { - local_resource_id_len = tag_len; - str_to_be_matcheds = tag + prefix_len; - } - else { - // this will be called only if the payload contains local_resource_id - local_resource_id_len = flb_sds_len(ctx->local_resource_id); - str_to_be_matcheds = ctx->local_resource_id + prefix_len; - } - - len_to_be_matched = local_resource_id_len - prefix_len; - ret = flb_regex_do(ctx->regex, str_to_be_matcheds, len_to_be_matched, &result); - if (ret <= 0) { - flb_plg_warn(ctx->ins, "invalid pattern for given value %s when" - " extracting resource labels", str_to_be_matcheds); - return -1; - } - - flb_regex_parse(ctx->regex, &result, cb_results, ctx); - - return ret; -} - -static int process_local_resource_id(struct flb_stackdriver *ctx, - const char *tag, int tag_len, char *type) -{ - int ret; - - // parsing local_resource_id from tag takes higher priority - if (is_tag_match_regex(ctx, tag, tag_len) > 0) { - ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_TRUE); - } - else if (is_local_resource_id_match_regex(ctx) > 0) { - ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_FALSE); - } - else { - ret = set_monitored_resource_labels(ctx, type); - } - - return ret; -} - -/* - * get_payload_labels - * - Iterate throught the original payload (obj) and find out the entry that matches - * the labels_key - * - Used to convert all labels under labels_key to root-level `labels` field - */ -static msgpack_object *get_payload_labels(struct flb_stackdriver *ctx, msgpack_object *obj) -{ - int i; - int len; - msgpack_object_kv *kv = NULL; - - if (!obj || obj->type != MSGPACK_OBJECT_MAP) { - return NULL; - } - - len = flb_sds_len(ctx->labels_key); - for (i = 0; i < obj->via.map.size; i++) { - kv = &obj->via.map.ptr[i]; - if (flb_sds_casecmp(ctx->labels_key, kv->key.via.str.ptr, len) == 0) { - /* only the first matching entry will be returned */ - return &kv->val; - } - } - - //flb_plg_debug(ctx->ins, "labels_key [%s] not found in the payload", - // ctx->labels_key); - return NULL; -} - -/* - * pack_resource_labels(): - * - Looks through the resource_labels parameter and appends new key value - * pair to the log entry. - * - Supports field access, plaintext assignment and environment variables. - */ -static int pack_resource_labels(struct flb_stackdriver *ctx, - struct flb_mp_map_header *mh, - msgpack_packer *mp_pck, - const void *data, - size_t bytes) -{ - struct mk_list *head; - struct flb_kv *label_kv; - struct flb_record_accessor *ra; - struct flb_ra_value *rval; - int len; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - int ret; - - if (ctx->should_skip_resource_labels_api == FLB_TRUE) { - return -1; - } - - len = mk_list_size(&ctx->resource_labels_kvs); - if (len == 0) { - return -1; - } - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - if ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - - flb_mp_map_header_init(mh, mp_pck); - mk_list_foreach(head, &ctx->resource_labels_kvs) { - label_kv = mk_list_entry(head, struct flb_kv, _head); - /* - * KVs have the form destination=original, so the original key is the value. - * If the value starts with '$', it will be processed using record accessor. - * Otherwise, it will be treated as a plaintext assignment. - */ - if (label_kv->val[0] == '$') { - ra = flb_ra_create(label_kv->val, FLB_TRUE); - rval = flb_ra_get_value_object(ra, *log_event.body); - - if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) { - flb_mp_map_header_append(mh); - msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, - flb_sds_len(label_kv->key)); - msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string)); - msgpack_pack_str_body(mp_pck, rval->val.string, - flb_sds_len(rval->val.string)); - flb_ra_key_value_destroy(rval); - } else { - flb_plg_warn(ctx->ins, "failed to find a corresponding entry for " - "resource label entry [%s=%s]", label_kv->key, label_kv->val); - } - flb_ra_destroy(ra); - } else { - flb_mp_map_header_append(mh); - msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); - msgpack_pack_str_body(mp_pck, label_kv->key, - flb_sds_len(label_kv->key)); - msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val)); - msgpack_pack_str_body(mp_pck, label_kv->val, - flb_sds_len(label_kv->val)); - } - } - } - else { - flb_plg_error(ctx->ins, "failed to unpack data"); - - flb_log_event_decoder_destroy(&log_decoder); - - return -1; - } - - /* project_id should always be packed from config parameter */ - flb_mp_map_header_append(mh); - msgpack_pack_str(mp_pck, 10); - msgpack_pack_str_body(mp_pck, "project_id", 10); - msgpack_pack_str(mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - - flb_log_event_decoder_destroy(&log_decoder); - flb_mp_map_header_end(mh); - - return 0; -} - -static void pack_labels(struct flb_stackdriver *ctx, - msgpack_packer *mp_pck, - msgpack_object *payload_labels_ptr) -{ - int i; - int labels_size = 0; - struct mk_list *head; - struct flb_kv *list_kv; - msgpack_object_kv *obj_kv = NULL; - - /* Determine size of labels map */ - labels_size = mk_list_size(&ctx->config_labels); - if (payload_labels_ptr != NULL && - payload_labels_ptr->type == MSGPACK_OBJECT_MAP) { - labels_size += payload_labels_ptr->via.map.size; - } - - msgpack_pack_map(mp_pck, labels_size); - - /* pack labels from the payload */ - if (payload_labels_ptr != NULL && - payload_labels_ptr->type == MSGPACK_OBJECT_MAP) { - - for (i = 0; i < payload_labels_ptr->via.map.size; i++) { - obj_kv = &payload_labels_ptr->via.map.ptr[i]; - msgpack_pack_object(mp_pck, obj_kv->key); - msgpack_pack_object(mp_pck, obj_kv->val); - } - } - - /* pack labels set in configuration */ - /* in msgpack duplicate keys are overriden by the last set */ - /* static label keys override payload labels */ - mk_list_foreach(head, &ctx->config_labels){ - list_kv = mk_list_entry(head, struct flb_kv, _head); - msgpack_pack_str(mp_pck, flb_sds_len(list_kv->key)); - msgpack_pack_str_body(mp_pck, list_kv->key, flb_sds_len(list_kv->key)); - msgpack_pack_str(mp_pck, flb_sds_len(list_kv->val)); - msgpack_pack_str_body(mp_pck, list_kv->val, flb_sds_len(list_kv->val)); - } -} - -static void cb_results(const char *name, const char *value, - size_t vlen, void *data) -{ - struct flb_stackdriver *ctx = data; - - if (vlen == 0) { - return; - } - - if (strcmp(name, "pod_name") == 0) { - if (ctx->pod_name != NULL) { - flb_sds_destroy(ctx->pod_name); - } - ctx->pod_name = flb_sds_create_len(value, vlen); - } - else if (strcmp(name, "namespace_name") == 0) { - if (ctx->namespace_name != NULL) { - flb_sds_destroy(ctx->namespace_name); - } - ctx->namespace_name = flb_sds_create_len(value, vlen); - } - else if (strcmp(name, "container_name") == 0) { - if (ctx->container_name != NULL) { - flb_sds_destroy(ctx->container_name); - } - ctx->container_name = flb_sds_create_len(value, vlen); - } - else if (strcmp(name, "node_name") == 0) { - if (ctx->node_name != NULL) { - flb_sds_destroy(ctx->node_name); - } - ctx->node_name = flb_sds_create_len(value, vlen); - } - - return; -} - -int flb_stackdriver_regex_init(struct flb_stackdriver *ctx) -{ - /* If a custom regex is not set, use the defaults */ - ctx->regex = flb_regex_create(ctx->custom_k8s_regex); - if (!ctx->regex) { - return -1; - } - - return 0; -} - -static int cb_stackdriver_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - int ret; - int io_flags = FLB_IO_TLS; - char *token; - struct flb_stackdriver *ctx; - - /* Create config context */ - ctx = flb_stackdriver_conf_create(ins, config); - if (!ctx) { - flb_plg_error(ins, "configuration failed"); - return -1; - } - - /* Load config map */ - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { - return -1; - } - - /* Set context */ - flb_output_set_context(ins, ctx); - - /* Network mode IPv6 */ - if (ins->host.ipv6 == FLB_TRUE) { - io_flags |= FLB_IO_IPV6; - } - - /* Initialize oauth2 cache pthread keys */ - oauth2_cache_init(); - - /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ - pthread_mutex_init(&ctx->token_mutex, NULL); - - /* Create Upstream context for Stackdriver Logging (no oauth2 service) */ - ctx->u = flb_upstream_create_url(config, FLB_STD_WRITE_URL, - io_flags, ins->tls); - ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_server, - FLB_IO_TCP, NULL); - - /* Create oauth2 context */ - ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000); - - if (!ctx->u) { - flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; - } - if (!ctx->metadata_u) { - flb_plg_error(ctx->ins, "metadata upstream creation failed"); - return -1; - } - if (!ctx->o) { - flb_plg_error(ctx->ins, "cannot create oauth2 context"); - return -1; - } - flb_output_upstream_set(ctx->u, ins); - - /* Metadata Upstream Sync flags */ - flb_stream_disable_async_mode(&ctx->metadata_u->base); - - if (ins->test_mode == FLB_FALSE) { - /* Retrieve oauth2 token */ - token = get_google_token(ctx); - if (!token) { - flb_plg_warn(ctx->ins, "token retrieval failed"); - } - else { - flb_sds_destroy(token); - } - } - - if (ctx->metadata_server_auth) { - ret = gce_metadata_read_project_id(ctx); - if (ret == -1) { - return -1; - } - - if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE - && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) { - ret = gce_metadata_read_zone(ctx); - if (ret == -1) { - return -1; - } - - ret = gce_metadata_read_instance_id(ctx); - if (ret == -1) { - return -1; - } - } - } - - /* Validate project_id */ - if (!ctx->project_id) { - flb_plg_error(ctx->ins, "property 'project_id' is not set"); - return -1; - } - - if (!ctx->export_to_project_id) { - ctx->export_to_project_id = ctx->project_id; - } - - ret = flb_stackdriver_regex_init(ctx); - if (ret == -1) { - flb_plg_error(ctx->ins, "failed to init stackdriver custom regex"); - return -1; - } - - return 0; -} - -static int validate_severity_level(severity_t * s, - const char * str, - const unsigned int str_size) -{ - int i = 0; - - const static struct { - severity_t s; - const unsigned int str_size; - const char * str; - } enum_mapping[] = { - {FLB_STD_EMERGENCY, 9, "EMERGENCY"}, - {FLB_STD_EMERGENCY, 5, "EMERG" }, - - {FLB_STD_ALERT , 1, "A" }, - {FLB_STD_ALERT , 5, "ALERT" }, - - {FLB_STD_CRITICAL , 1, "C" }, - {FLB_STD_CRITICAL , 1, "F" }, - {FLB_STD_CRITICAL , 4, "CRIT" }, - {FLB_STD_CRITICAL , 5, "FATAL" }, - {FLB_STD_CRITICAL , 8, "CRITICAL" }, - - {FLB_STD_ERROR , 1, "E" }, - {FLB_STD_ERROR , 3, "ERR" }, - {FLB_STD_ERROR , 5, "ERROR" }, - {FLB_STD_ERROR , 6, "SEVERE" }, - - {FLB_STD_WARNING , 1, "W" }, - {FLB_STD_WARNING , 4, "WARN" }, - {FLB_STD_WARNING , 7, "WARNING" }, - - {FLB_STD_NOTICE , 1, "N" }, - {FLB_STD_NOTICE , 6, "NOTICE" }, - - {FLB_STD_INFO , 1, "I" }, - {FLB_STD_INFO , 4, "INFO" }, - - {FLB_STD_DEBUG , 1, "D" }, - {FLB_STD_DEBUG , 5, "DEBUG" }, - {FLB_STD_DEBUG , 5, "TRACE" }, - {FLB_STD_DEBUG , 9, "TRACE_INT"}, - {FLB_STD_DEBUG , 4, "FINE" }, - {FLB_STD_DEBUG , 5, "FINER" }, - {FLB_STD_DEBUG , 6, "FINEST" }, - {FLB_STD_DEBUG , 6, "CONFIG" }, - - {FLB_STD_DEFAULT , 7, "DEFAULT" } - }; - - for (i = 0; i < sizeof (enum_mapping) / sizeof (enum_mapping[0]); ++i) { - if (enum_mapping[i].str_size != str_size) { - continue; - } - - if (strncasecmp(str, enum_mapping[i].str, str_size) == 0) { - *s = enum_mapping[i].s; - return 0; - } - } - return -1; -} - -static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o, - const flb_sds_t key, const int key_size, - msgpack_object_type type) -{ - int i = 0; - msgpack_object_kv * p = NULL; - - if (o == NULL || subobj == NULL) { - return -1; - } - - for (i = 0; i < o->via.map.size; i++) { - p = &o->via.map.ptr[i]; - if (p->val.type != type) { - continue; - } - - if (flb_sds_cmp(key, p->key.via.str.ptr, p->key.via.str.size) == 0) { - *subobj = p->val; - return 0; - } - } - return -1; -} - -static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key) -{ - msgpack_object tmp; - if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) { - *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size); - return 0; - } - - *s = 0; - return -1; -} - -static int get_severity_level(severity_t * s, const msgpack_object * o, - const flb_sds_t key) -{ - msgpack_object tmp; - if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0 - && validate_severity_level(s, tmp.via.str.ptr, tmp.via.str.size) == 0) { - return 0; - } - *s = 0; - return -1; -} - -static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * src_obj, - const flb_sds_t key) -{ - msgpack_object tmp; - int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN); - - if (ret == 0 && tmp.via.boolean == true) { - *trace_sampled_value = FLB_TRUE; - return 0; - } else if (ret == 0 && tmp.via.boolean == false) { - *trace_sampled_value = FLB_FALSE; - return 0; - } - - return -1; -} - -static insert_id_status validate_insert_id(msgpack_object * insert_id_value, - const msgpack_object * obj) -{ - int i = 0; - msgpack_object_kv * p = NULL; - insert_id_status ret = INSERTID_NOT_PRESENT; - - if (obj == NULL) { - return ret; - } - - for (i = 0; i < obj->via.map.size; i++) { - p = &obj->via.map.ptr[i]; - if (p->key.type != MSGPACK_OBJECT_STR) { - continue; - } - if (validate_key(p->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) { - if (p->val.type == MSGPACK_OBJECT_STR && p->val.via.str.size > 0) { - *insert_id_value = p->val; - ret = INSERTID_VALID; - } - else { - ret = INSERTID_INVALID; - } - break; - } - } - return ret; -} - -static int pack_json_payload(int insert_id_extracted, - int operation_extracted, int operation_extra_size, - int source_location_extracted, - int source_location_extra_size, - int http_request_extracted, - int http_request_extra_size, - timestamp_status tms_status, - msgpack_packer *mp_pck, msgpack_object *obj, - struct flb_stackdriver *ctx) -{ - /* Specified fields include local_resource_id, operation, sourceLocation ... */ - int i, j; - int to_remove = 0; - int ret; - int map_size; - int new_map_size; - int len; - int len_to_be_removed; - int key_not_found; - flb_sds_t removed; - flb_sds_t monitored_resource_key; - flb_sds_t local_resource_id_key; - flb_sds_t stream; - msgpack_object_kv *kv = obj->via.map.ptr; - msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; - - monitored_resource_key = flb_sds_create(MONITORED_RESOURCE_KEY); - local_resource_id_key = flb_sds_create(LOCAL_RESOURCE_ID_KEY); - stream = flb_sds_create("stream"); - /* - * array of elements that need to be removed from payload, - * special field 'operation' will be processed individually - */ - flb_sds_t to_be_removed[] = - { - monitored_resource_key, - local_resource_id_key, - ctx->labels_key, - ctx->severity_key, - ctx->trace_key, - ctx->span_id_key, - ctx->trace_sampled_key, - ctx->log_name_key, - stream - /* more special fields are required to be added, but, if this grows with more - than a few records, it might need to be converted to flb_hash - */ - }; - - if (insert_id_extracted == FLB_TRUE) { - to_remove += 1; - } - if (operation_extracted == FLB_TRUE && operation_extra_size == 0) { - to_remove += 1; - } - if (source_location_extracted == FLB_TRUE && source_location_extra_size == 0) { - to_remove += 1; - } - if (http_request_extracted == FLB_TRUE && http_request_extra_size == 0) { - to_remove += 1; - } - if (tms_status == FORMAT_TIMESTAMP_OBJECT) { - to_remove += 1; - } - if (tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) { - to_remove += 2; - } - - map_size = obj->via.map.size; - len_to_be_removed = sizeof(to_be_removed) / sizeof(to_be_removed[0]); - for (i = 0; i < map_size; i++) { - kv = &obj->via.map.ptr[i]; - len = kv->key.via.str.size; - for (j = 0; j < len_to_be_removed; j++) { - removed = to_be_removed[j]; - /* - * check length of key to avoid partial matching - * e.g. labels key = labels && kv->key = labelss - */ - if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { - to_remove += 1; - break; - } - } - } - - new_map_size = map_size - to_remove; - - ret = msgpack_pack_map(mp_pck, new_map_size); - if (ret < 0) { - goto error; - } - - /* points back to the beginning of map */ - kv = obj->via.map.ptr; - for(; kv != kvend; ++kv ) { - key_not_found = 1; - - /* processing logging.googleapis.com/insertId */ - if (insert_id_extracted == FLB_TRUE - && validate_key(kv->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) { - continue; - } - - /* processing logging.googleapis.com/operation */ - if (validate_key(kv->key, OPERATION_FIELD_IN_JSON, - OPERATION_KEY_SIZE) - && kv->val.type == MSGPACK_OBJECT_MAP) { - if (operation_extra_size > 0) { - msgpack_pack_object(mp_pck, kv->key); - pack_extra_operation_subfields(mp_pck, &kv->val, operation_extra_size); - } - continue; - } - - if (validate_key(kv->key, SOURCELOCATION_FIELD_IN_JSON, - SOURCE_LOCATION_SIZE) - && kv->val.type == MSGPACK_OBJECT_MAP) { - - if (source_location_extra_size > 0) { - msgpack_pack_object(mp_pck, kv->key); - pack_extra_source_location_subfields(mp_pck, &kv->val, - source_location_extra_size); - } - continue; - } - - if (validate_key(kv->key, ctx->http_request_key, - ctx->http_request_key_size) - && kv->val.type == MSGPACK_OBJECT_MAP) { - - if(http_request_extra_size > 0) { - msgpack_pack_object(mp_pck, kv->key); - pack_extra_http_request_subfields(mp_pck, &kv->val, - http_request_extra_size); - } - continue; - } - - if (validate_key(kv->key, "timestamp", 9) - && tms_status == FORMAT_TIMESTAMP_OBJECT) { - continue; - } - - if (validate_key(kv->key, "timestampSeconds", 16) - && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) { - continue; - } - if (validate_key(kv->key, "timestampNanos", 14) - && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) { - continue; - } - - len = kv->key.via.str.size; - for (j = 0; j < len_to_be_removed; j++) { - removed = to_be_removed[j]; - if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { - key_not_found = 0; - break; - } - } - - if (key_not_found) { - ret = msgpack_pack_object(mp_pck, kv->key); - if (ret < 0) { - goto error; - } - ret = msgpack_pack_object(mp_pck, kv->val); - if (ret < 0) { - goto error; - } - } - } - - flb_sds_destroy(monitored_resource_key); - flb_sds_destroy(local_resource_id_key); - flb_sds_destroy(stream); - return 0; - - error: - flb_sds_destroy(monitored_resource_key); - flb_sds_destroy(local_resource_id_key); - flb_sds_destroy(stream); - return ret; -} - -static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, - int total_records, - const char *tag, int tag_len, - const void *data, size_t bytes) -{ - int len; - int ret; - int array_size = 0; - /* The default value is 3: timestamp, jsonPayload, logName. */ - int entry_size = 3; - size_t s; - // size_t off = 0; - char path[PATH_MAX]; - char time_formatted[255]; - const char *newtag; - const char *new_log_name; - msgpack_object *obj; - msgpack_sbuffer mp_sbuf; - msgpack_packer mp_pck; - flb_sds_t out_buf; - struct flb_mp_map_header mh; - - /* Parameters for severity */ - int severity_extracted = FLB_FALSE; - severity_t severity; - - /* Parameters for trace */ - int trace_extracted = FLB_FALSE; - flb_sds_t trace; - char stackdriver_trace[PATH_MAX]; - const char *new_trace; - - /* Parameters for span id */ - int span_id_extracted = FLB_FALSE; - flb_sds_t span_id; - - /* Parameters for trace sampled */ - int trace_sampled_extracted = FLB_FALSE; - int trace_sampled = FLB_FALSE; - - /* Parameters for log name */ - int log_name_extracted = FLB_FALSE; - flb_sds_t log_name = NULL; - flb_sds_t stream = NULL; - flb_sds_t stream_key; - - /* Parameters for insertId */ - msgpack_object insert_id_obj; - insert_id_status in_status; - int insert_id_extracted; - - /* Parameters in Operation */ - flb_sds_t operation_id; - flb_sds_t operation_producer; - int operation_first = FLB_FALSE; - int operation_last = FLB_FALSE; - int operation_extracted = FLB_FALSE; - int operation_extra_size = 0; - - /* Parameters for sourceLocation */ - flb_sds_t source_location_file; - int64_t source_location_line = 0; - flb_sds_t source_location_function; - int source_location_extracted = FLB_FALSE; - int source_location_extra_size = 0; - - /* Parameters for httpRequest */ - struct http_request_field http_request; - int http_request_extracted = FLB_FALSE; - int http_request_extra_size = 0; - - /* Parameters for Timestamp */ - struct tm tm; - // struct flb_time tms; - timestamp_status tms_status; - /* Count number of records */ - array_size = total_records; - - /* Parameters for labels */ - msgpack_object *payload_labels_ptr; - int labels_size = 0; - - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return NULL; - } - - /* - * Search each entry and validate insertId. - * Reject the entry if insertId is invalid. - * If all the entries are rejected, stop formatting. - * - */ - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - /* Extract insertId */ - in_status = validate_insert_id(&insert_id_obj, log_event.body); - - if (in_status == INSERTID_INVALID) { - flb_plg_error(ctx->ins, - "Incorrect insertId received. InsertId should be non-empty string."); - array_size -= 1; - } - } - - flb_log_event_decoder_destroy(&log_decoder); - - /* Sounds like this should compare to -1 instead of zero */ - if (array_size == 0) { - return NULL; - } - - /* Create temporal msgpack buffer */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - /* - * Pack root map (resource & entries): - * - * {"resource": {"type": "...", "labels": {...}, - * "entries": [] - */ - msgpack_pack_map(&mp_pck, 2); - - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "resource", 8); - - /* type & labels */ - msgpack_pack_map(&mp_pck, 2); - - /* type */ - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "type", 4); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->resource)); - msgpack_pack_str_body(&mp_pck, ctx->resource, - flb_sds_len(ctx->resource)); - - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "labels", 6); - - ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes); - if (ret != 0) { - if (ctx->resource_type == RESOURCE_TYPE_K8S) { - ret = extract_local_resource_id(data, bytes, ctx, tag); - if (ret != 0) { - flb_plg_error(ctx->ins, "fail to construct local_resource_id"); - msgpack_sbuffer_destroy(&mp_sbuf); - return NULL; - } - } - ret = parse_monitored_resource(ctx, data, bytes, &mp_pck); - if (ret != 0) { - if (strcmp(ctx->resource, "global") == 0) { - /* global resource has field project_id */ - msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "project_id", 10); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(&mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - } - else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE - || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) { - flb_mp_map_header_init(&mh, &mp_pck); - - if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE && ctx->node_id) { - /* generic_node has fields project_id, location, namespace, node_id */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "node_id", 7); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_id)); - msgpack_pack_str_body(&mp_pck, - ctx->node_id, flb_sds_len(ctx->node_id)); - } - else { - /* generic_task has fields project_id, location, namespace, job, task_id */ - if (ctx->job) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 3); - msgpack_pack_str_body(&mp_pck, "job", 3); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->job)); - msgpack_pack_str_body(&mp_pck, - ctx->job, flb_sds_len(ctx->job)); - } - - if (ctx->task_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "task_id", 7); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->task_id)); - msgpack_pack_str_body(&mp_pck, - ctx->task_id, flb_sds_len(ctx->task_id)); - } - } - - if (ctx->project_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "project_id", 10); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(&mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - } - - if (ctx->location) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "location", 8); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->location)); - msgpack_pack_str_body(&mp_pck, - ctx->location, flb_sds_len(ctx->location)); - } - - if (ctx->namespace_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "namespace", 9); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_id)); - msgpack_pack_str_body(&mp_pck, - ctx->namespace_id, flb_sds_len(ctx->namespace_id)); - } - - flb_mp_map_header_end(&mh); - } - else if (strcmp(ctx->resource, "gce_instance") == 0) { - /* gce_instance resource has fields project_id, zone, instance_id */ - flb_mp_map_header_init(&mh, &mp_pck); - - if (ctx->project_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "project_id", 10); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(&mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - } - - if (ctx->zone) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "zone", 4); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->zone)); - msgpack_pack_str_body(&mp_pck, ctx->zone, flb_sds_len(ctx->zone)); - } - - if (ctx->instance_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "instance_id", 11); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->instance_id)); - msgpack_pack_str_body(&mp_pck, - ctx->instance_id, flb_sds_len(ctx->instance_id)); - } - flb_mp_map_header_end(&mh); - } - else if (strcmp(ctx->resource, K8S_CONTAINER) == 0) { - /* k8s_container resource has fields project_id, location, cluster_name, - * namespace_name, pod_name, container_name - * - * The local_resource_id for k8s_container is in format: - * k8s_container.<namespace_name>.<pod_name>.<container_name> - */ - - ret = process_local_resource_id(ctx, tag, tag_len, K8S_CONTAINER); - if (ret == -1) { - flb_plg_error(ctx->ins, "fail to extract resource labels " - "for k8s_container resource type"); - msgpack_sbuffer_destroy(&mp_sbuf); - return NULL; - } - - flb_mp_map_header_init(&mh, &mp_pck); - - if (ctx->project_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "project_id", 10); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(&mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - } - - if (ctx->cluster_location) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "location", 8); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); - msgpack_pack_str_body(&mp_pck, - ctx->cluster_location, - flb_sds_len(ctx->cluster_location)); - } - - if (ctx->cluster_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 12); - msgpack_pack_str_body(&mp_pck, "cluster_name", 12); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); - msgpack_pack_str_body(&mp_pck, - ctx->cluster_name, flb_sds_len(ctx->cluster_name)); - } - - if (ctx->namespace_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 14); - msgpack_pack_str_body(&mp_pck, "namespace_name", 14); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name)); - msgpack_pack_str_body(&mp_pck, - ctx->namespace_name, - flb_sds_len(ctx->namespace_name)); - } - - if (ctx->pod_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "pod_name", 8); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name)); - msgpack_pack_str_body(&mp_pck, - ctx->pod_name, flb_sds_len(ctx->pod_name)); - } - - if (ctx->container_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 14); - msgpack_pack_str_body(&mp_pck, "container_name", 14); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name)); - msgpack_pack_str_body(&mp_pck, - ctx->container_name, - flb_sds_len(ctx->container_name)); - } - - flb_mp_map_header_end(&mh); - } - else if (strcmp(ctx->resource, K8S_NODE) == 0) { - /* k8s_node resource has fields project_id, location, cluster_name, node_name - * - * The local_resource_id for k8s_node is in format: - * k8s_node.<node_name> - */ - - ret = process_local_resource_id(ctx, tag, tag_len, K8S_NODE); - if (ret == -1) { - flb_plg_error(ctx->ins, "fail to process local_resource_id from " - "log entry for k8s_node"); - msgpack_sbuffer_destroy(&mp_sbuf); - return NULL; - } - - flb_mp_map_header_init(&mh, &mp_pck); - - if (ctx->project_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "project_id", 10); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(&mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - } - - if (ctx->cluster_location) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "location", 8); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); - msgpack_pack_str_body(&mp_pck, - ctx->cluster_location, - flb_sds_len(ctx->cluster_location)); - } - - if (ctx->cluster_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 12); - msgpack_pack_str_body(&mp_pck, "cluster_name", 12); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); - msgpack_pack_str_body(&mp_pck, - ctx->cluster_name, flb_sds_len(ctx->cluster_name)); - } - - if (ctx->node_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "node_name", 9); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name)); - msgpack_pack_str_body(&mp_pck, - ctx->node_name, flb_sds_len(ctx->node_name)); - } - - flb_mp_map_header_end(&mh); - } - else if (strcmp(ctx->resource, K8S_POD) == 0) { - /* k8s_pod resource has fields project_id, location, cluster_name, - * namespace_name, pod_name. - * - * The local_resource_id for k8s_pod is in format: - * k8s_pod.<namespace_name>.<pod_name> - */ - - ret = process_local_resource_id(ctx, tag, tag_len, K8S_POD); - if (ret != 0) { - flb_plg_error(ctx->ins, "fail to process local_resource_id from " - "log entry for k8s_pod"); - msgpack_sbuffer_destroy(&mp_sbuf); - return NULL; - } - - flb_mp_map_header_init(&mh, &mp_pck); - - if (ctx->project_id) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "project_id", 10); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); - msgpack_pack_str_body(&mp_pck, - ctx->project_id, flb_sds_len(ctx->project_id)); - } - - if (ctx->cluster_location) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "location", 8); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); - msgpack_pack_str_body(&mp_pck, - ctx->cluster_location, - flb_sds_len(ctx->cluster_location)); - } - - if (ctx->cluster_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 12); - msgpack_pack_str_body(&mp_pck, "cluster_name", 12); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); - msgpack_pack_str_body(&mp_pck, - ctx->cluster_name, flb_sds_len(ctx->cluster_name)); - } - - if (ctx->namespace_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 14); - msgpack_pack_str_body(&mp_pck, "namespace_name", 14); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name)); - msgpack_pack_str_body(&mp_pck, - ctx->namespace_name, - flb_sds_len(ctx->namespace_name)); - } - - if (ctx->pod_name) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "pod_name", 8); - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name)); - msgpack_pack_str_body(&mp_pck, - ctx->pod_name, flb_sds_len(ctx->pod_name)); - } - - flb_mp_map_header_end(&mh); - } - else { - flb_plg_error(ctx->ins, "unsupported resource type '%s'", - ctx->resource); - msgpack_sbuffer_destroy(&mp_sbuf); - return NULL; - } - } - } - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "entries", 7); - - /* Append entries */ - msgpack_pack_array(&mp_pck, array_size); - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - msgpack_sbuffer_destroy(&mp_sbuf); - - return NULL; - } - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - obj = log_event.body; - tms_status = extract_timestamp(obj, &log_event.timestamp); - - /* - * Pack entry - * - * { - * "severity": "...", - * "labels": "...", - * "logName": "...", - * "jsonPayload": {...}, - * "timestamp": "...", - * "spanId": "...", - * "traceSampled": <true or false>, - * "trace": "..." - * } - */ - entry_size = 3; - - /* Extract severity */ - severity_extracted = FLB_FALSE; - if (ctx->severity_key - && get_severity_level(&severity, obj, ctx->severity_key) == 0) { - severity_extracted = FLB_TRUE; - entry_size += 1; - } - - /* Extract trace */ - trace_extracted = FLB_FALSE; - if (ctx->trace_key - && get_string(&trace, obj, ctx->trace_key) == 0) { - trace_extracted = FLB_TRUE; - entry_size += 1; - } - - /* Extract span id */ - span_id_extracted = FLB_FALSE; - if (ctx->span_id_key - && get_string(&span_id, obj, ctx->span_id_key) == 0) { - span_id_extracted = FLB_TRUE; - entry_size += 1; - } - - /* Extract trace sampled */ - trace_sampled_extracted = FLB_FALSE; - if (ctx->trace_sampled_key - && get_trace_sampled(&trace_sampled, obj, ctx->trace_sampled_key) == 0) { - trace_sampled_extracted = FLB_TRUE; - entry_size += 1; - } - - /* Extract log name */ - log_name_extracted = FLB_FALSE; - if (ctx->log_name_key - && get_string(&log_name, obj, ctx->log_name_key) == 0) { - log_name_extracted = FLB_TRUE; - } - - /* Extract insertId */ - in_status = validate_insert_id(&insert_id_obj, obj); - if (in_status == INSERTID_VALID) { - insert_id_extracted = FLB_TRUE; - entry_size += 1; - } - else if (in_status == INSERTID_NOT_PRESENT) { - insert_id_extracted = FLB_FALSE; - } - else { - if (log_name_extracted == FLB_TRUE) { - flb_sds_destroy(log_name); - } - continue; - } - - /* Extract operation */ - operation_id = flb_sds_create(""); - operation_producer = flb_sds_create(""); - operation_first = FLB_FALSE; - operation_last = FLB_FALSE; - operation_extra_size = 0; - operation_extracted = extract_operation(&operation_id, &operation_producer, - &operation_first, &operation_last, obj, - &operation_extra_size); - - if (operation_extracted == FLB_TRUE) { - entry_size += 1; - } - - /* Extract sourceLocation */ - source_location_file = flb_sds_create(""); - source_location_line = 0; - source_location_function = flb_sds_create(""); - source_location_extra_size = 0; - source_location_extracted = extract_source_location(&source_location_file, - &source_location_line, - &source_location_function, - obj, - &source_location_extra_size); - - if (source_location_extracted == FLB_TRUE) { - entry_size += 1; - } - - /* Extract httpRequest */ - init_http_request(&http_request); - http_request_extra_size = 0; - http_request_extracted = extract_http_request(&http_request, - ctx->http_request_key, - ctx->http_request_key_size, - obj, &http_request_extra_size); - if (http_request_extracted == FLB_TRUE) { - entry_size += 1; - } - - /* Extract payload labels */ - payload_labels_ptr = get_payload_labels(ctx, obj); - if (payload_labels_ptr != NULL && - payload_labels_ptr->type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "the type of payload labels should be map"); - flb_sds_destroy(operation_id); - flb_sds_destroy(operation_producer); - flb_log_event_decoder_destroy(&log_decoder); - msgpack_sbuffer_destroy(&mp_sbuf); - return NULL; - } - - /* Number of parsed labels */ - labels_size = mk_list_size(&ctx->config_labels); - if (payload_labels_ptr != NULL && - payload_labels_ptr->type == MSGPACK_OBJECT_MAP) { - labels_size += payload_labels_ptr->via.map.size; - } - - if (labels_size > 0) { - entry_size += 1; - } - - msgpack_pack_map(&mp_pck, entry_size); - - /* Add severity into the log entry */ - if (severity_extracted == FLB_TRUE) { - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "severity", 8); - msgpack_pack_int(&mp_pck, severity); - } - - /* Add trace into the log entry */ - if (trace_extracted == FLB_TRUE) { - msgpack_pack_str(&mp_pck, 5); - msgpack_pack_str_body(&mp_pck, "trace", 5); - - if (ctx->autoformat_stackdriver_trace) { - len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1, - "projects/%s/traces/%s", ctx->project_id, trace); - new_trace = stackdriver_trace; - } - else { - len = flb_sds_len(trace); - new_trace = trace; - } - - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, new_trace, len); - flb_sds_destroy(trace); - } - - /* Add spanId field into the log entry */ - if (span_id_extracted == FLB_TRUE) { - msgpack_pack_str_with_body(&mp_pck, "spanId", 6); - len = flb_sds_len(span_id); - msgpack_pack_str_with_body(&mp_pck, span_id, len); - flb_sds_destroy(span_id); - } - - /* Add traceSampled field into the log entry */ - if (trace_sampled_extracted == FLB_TRUE) { - msgpack_pack_str_with_body(&mp_pck, "traceSampled", 12); - - if (trace_sampled == FLB_TRUE) { - msgpack_pack_true(&mp_pck); - } else { - msgpack_pack_false(&mp_pck); - } - - } - - /* Add insertId field into the log entry */ - if (insert_id_extracted == FLB_TRUE) { - msgpack_pack_str(&mp_pck, 8); - msgpack_pack_str_body(&mp_pck, "insertId", 8); - msgpack_pack_object(&mp_pck, insert_id_obj); - } - - /* Add operation field into the log entry */ - if (operation_extracted == FLB_TRUE) { - add_operation_field(&operation_id, &operation_producer, - &operation_first, &operation_last, &mp_pck); - } - - /* Add sourceLocation field into the log entry */ - if (source_location_extracted == FLB_TRUE) { - add_source_location_field(&source_location_file, source_location_line, - &source_location_function, &mp_pck); - } - - /* Add httpRequest field into the log entry */ - if (http_request_extracted == FLB_TRUE) { - add_http_request_field(&http_request, &mp_pck); - } - - /* labels */ - if (labels_size > 0) { - msgpack_pack_str(&mp_pck, 6); - msgpack_pack_str_body(&mp_pck, "labels", 6); - pack_labels(ctx, &mp_pck, payload_labels_ptr); - } - - /* Clean up id and producer if operation extracted */ - flb_sds_destroy(operation_id); - flb_sds_destroy(operation_producer); - flb_sds_destroy(source_location_file); - flb_sds_destroy(source_location_function); - destroy_http_request(&http_request); - - /* jsonPayload */ - msgpack_pack_str(&mp_pck, 11); - msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); - pack_json_payload(insert_id_extracted, - operation_extracted, operation_extra_size, - source_location_extracted, - source_location_extra_size, - http_request_extracted, - http_request_extra_size, - tms_status, - &mp_pck, obj, ctx); - - /* avoid modifying the original tag */ - newtag = tag; - stream_key = flb_sds_create("stream"); - if (ctx->resource_type == RESOURCE_TYPE_K8S - && get_string(&stream, obj, stream_key) == 0) { - if (flb_sds_cmp(stream, STDOUT, flb_sds_len(stream)) == 0) { - newtag = "stdout"; - } - else if (flb_sds_cmp(stream, STDERR, flb_sds_len(stream)) == 0) { - newtag = "stderr"; - } - } - - if (log_name_extracted == FLB_FALSE) { - new_log_name = newtag; - } - else { - new_log_name = log_name; - } - - /* logName */ - len = snprintf(path, sizeof(path) - 1, - "projects/%s/logs/%s", ctx->export_to_project_id, new_log_name); - - if (log_name_extracted == FLB_TRUE) { - flb_sds_destroy(log_name); - } - - msgpack_pack_str(&mp_pck, 7); - msgpack_pack_str_body(&mp_pck, "logName", 7); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, path, len); - flb_sds_destroy(stream_key); - flb_sds_destroy(stream); - - /* timestamp */ - msgpack_pack_str(&mp_pck, 9); - msgpack_pack_str_body(&mp_pck, "timestamp", 9); - - /* Format the time */ - /* - * If format is timestamp_object or timestamp_duo_fields, - * tms has been updated. - * - * If timestamp is not presen, - * use the default tms(current time). - */ - - gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); - s = strftime(time_formatted, sizeof(time_formatted) - 1, - FLB_STD_TIME_FMT, &tm); - len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, - ".%09" PRIu64 "Z", - (uint64_t) log_event.timestamp.tm.tv_nsec); - s += len; - - msgpack_pack_str(&mp_pck, s); - msgpack_pack_str_body(&mp_pck, time_formatted, s); - } - - flb_log_event_decoder_destroy(&log_decoder); - - /* Convert from msgpack to JSON */ - out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - msgpack_sbuffer_destroy(&mp_sbuf); - - if (!out_buf) { - flb_plg_error(ctx->ins, "error formatting JSON payload"); - return NULL; - } - - return out_buf; -} - -static int stackdriver_format_test(struct flb_config *config, - struct flb_input_instance *ins, - void *plugin_context, - void *flush_ctx, - int event_type, - const char *tag, int tag_len, - const void *data, size_t bytes, - void **out_data, size_t *out_size) -{ - int total_records; - flb_sds_t payload = NULL; - struct flb_stackdriver *ctx = plugin_context; - - /* Count number of records */ - total_records = flb_mp_count(data, bytes); - - payload = stackdriver_format(ctx, total_records, - (char *) tag, tag_len, data, bytes); - if (payload == NULL) { - return -1; - } - - *out_data = payload; - *out_size = flb_sds_len(payload); - - return 0; - -} - -#ifdef FLB_HAVE_METRICS -static void update_http_metrics(struct flb_stackdriver *ctx, - struct flb_event_chunk *event_chunk, - uint64_t ts, - int http_status) -{ - char tmp[32]; - - /* convert status to string format */ - snprintf(tmp, sizeof(tmp) - 1, "%i", http_status); - char *name = (char *) flb_output_name(ctx->ins); - - /* processed records total */ - cmt_counter_add(ctx->cmt_proc_records_total, ts, event_chunk->total_events, - 2, (char *[]) {tmp, name}); - - /* HTTP status */ - if (http_status != STACKDRIVER_NET_ERROR) { - cmt_counter_inc(ctx->cmt_requests_total, ts, 2, (char *[]) {tmp, name}); - } -} - -static void update_retry_metric(struct flb_stackdriver *ctx, - struct flb_event_chunk *event_chunk, - uint64_t ts, - int http_status, int ret_code) -{ - char tmp[32]; - char *name = (char *) flb_output_name(ctx->ins); - - if (ret_code != FLB_RETRY) { - return; - } - - /* convert status to string format */ - snprintf(tmp, sizeof(tmp) - 1, "%i", http_status); - cmt_counter_add(ctx->cmt_retried_records_total, - ts, event_chunk->total_events, 2, (char *[]) {tmp, name}); - -} -#endif - -static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - (void) i_ins; - (void) config; - int ret; - int ret_code = FLB_RETRY; - size_t b_sent; - flb_sds_t token; - flb_sds_t payload_buf; - void *compressed_payload_buffer = NULL; - size_t compressed_payload_size; - struct flb_stackdriver *ctx = out_context; - struct flb_connection *u_conn; - struct flb_http_client *c; - int compressed = FLB_FALSE; -#ifdef FLB_HAVE_METRICS - char *name = (char *) flb_output_name(ctx->ins); - uint64_t ts = cfl_time_now(); -#endif - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { -#ifdef FLB_HAVE_METRICS - cmt_counter_inc(ctx->cmt_failed_requests, - ts, 1, (char *[]) {name}); - - /* OLD api */ - flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); - - update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); - update_retry_metric(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR, FLB_RETRY); -#endif - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Reformat msgpack to stackdriver JSON payload */ - payload_buf = stackdriver_format(ctx, - event_chunk->total_events, - event_chunk->tag, flb_sds_len(event_chunk->tag), - event_chunk->data, event_chunk->size); - if (!payload_buf) { -#ifdef FLB_HAVE_METRICS - cmt_counter_inc(ctx->cmt_failed_requests, - ts, 1, (char *[]) {name}); - - /* OLD api */ - flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); -#endif - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Get or renew Token */ - token = get_google_token(ctx); - if (!token) { - flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); - flb_upstream_conn_release(u_conn); - flb_sds_destroy(payload_buf); -#ifdef FLB_HAVE_METRICS - cmt_counter_inc(ctx->cmt_failed_requests, - ts, 1, (char *[]) {name}); - - /* OLD api */ - flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); -#endif - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - compressed_payload_buffer = payload_buf; - compressed_payload_size = flb_sds_len(payload_buf); - if (ctx->compress_gzip == FLB_TRUE) { - ret = flb_gzip_compress((void *) payload_buf, flb_sds_len(payload_buf), - &compressed_payload_buffer, &compressed_payload_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression"); - } else { - compressed = FLB_TRUE; - flb_sds_destroy(payload_buf); - } - } - - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI, - compressed_payload_buffer, compressed_payload_size, NULL, 0, NULL, 0); - - flb_http_buffer_size(c, 4192); - - if (ctx->stackdriver_agent) { - flb_http_add_header(c, "User-Agent", 10, - ctx->stackdriver_agent, - flb_sds_len(ctx->stackdriver_agent)); - } - else { - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - } - - flb_http_add_header(c, "Content-Type", 12, "application/json", 16); - flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); - /* Content Encoding: gzip */ - if (compressed == FLB_TRUE) { - flb_http_set_content_encoding_gzip(c); - } - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* validate response */ - if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i", ret); - ret_code = FLB_RETRY; -#ifdef FLB_HAVE_METRICS - update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); -#endif - } - else { - /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); - if (c->resp.status == 200) { - ret_code = FLB_OK; - } - else if (c->resp.status >= 400 && c->resp.status < 500) { - ret_code = FLB_ERROR; - flb_plg_warn(ctx->ins, "error\n%s", - c->resp.payload); - } - else { - if (c->resp.payload_size > 0) { - /* we got an error */ - flb_plg_warn(ctx->ins, "error\n%s", - c->resp.payload); - } - else { - flb_plg_debug(ctx->ins, "response\n%s", - c->resp.payload); - } - ret_code = FLB_RETRY; - } - } - - /* Update specific stackdriver metrics */ -#ifdef FLB_HAVE_METRICS - if (ret_code == FLB_OK) { - cmt_counter_inc(ctx->cmt_successful_requests, - ts, 1, (char *[]) {name}); - - /* OLD api */ - flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics); - } - else { - cmt_counter_inc(ctx->cmt_failed_requests, - ts, 1, (char *[]) {name}); - - /* OLD api */ - flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); - } - - /* Update metrics counter by using labels/http status code */ - if (ret == 0) { - update_http_metrics(ctx, event_chunk, ts, c->resp.status); - } - - /* Update retry count if necessary */ - update_retry_metric(ctx, event_chunk, ts, c->resp.status, ret_code); -#endif - - - /* Cleanup */ - if (compressed == FLB_TRUE) { - flb_free(compressed_payload_buffer); - } - else { - flb_sds_destroy(payload_buf); - } - flb_sds_destroy(token); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - - /* Done */ - FLB_OUTPUT_RETURN(ret_code); -} - -static int cb_stackdriver_exit(void *data, struct flb_config *config) -{ - struct flb_stackdriver *ctx = data; - - if (!ctx) { - return -1; - } - - flb_stackdriver_conf_destroy(ctx); - return 0; -} - -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, credentials_file), - "Set the path for the google service credentials file" - }, - { - FLB_CONFIG_MAP_STR, "metadata_server", (char *)NULL, - 0, FLB_FALSE, 0, - "Set the metadata server" - }, - { - FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, client_email), - "Set the service account email" - }, - // set in flb_bigquery_oauth_credentials - { - FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, private_key), - "Set the service account secret" - }, - { - FLB_CONFIG_MAP_STR, "export_to_project_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, export_to_project_id), - "Export to project id" - }, - { - FLB_CONFIG_MAP_STR, "resource", FLB_SDS_RESOURCE_TYPE, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource), - "Set the resource" - }, - { - FLB_CONFIG_MAP_STR, "severity_key", DEFAULT_SEVERITY_KEY, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, severity_key), - "Set the severity key" - }, - { - FLB_CONFIG_MAP_BOOL, "autoformat_stackdriver_trace", "false", - 0, FLB_TRUE, offsetof(struct flb_stackdriver, autoformat_stackdriver_trace), - "Autoformat the stackdriver trace" - }, - { - FLB_CONFIG_MAP_STR, "trace_key", DEFAULT_TRACE_KEY, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_key), - "Set the trace key" - }, - { - FLB_CONFIG_MAP_STR, "span_id_key", DEFAULT_SPAN_ID_KEY, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, span_id_key), - "Set the span id key" - }, - { - FLB_CONFIG_MAP_STR, "trace_sampled_key", DEFAULT_TRACE_SAMPLED_KEY, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_sampled_key), - "Set the trace sampled key" - }, - { - FLB_CONFIG_MAP_STR, "log_name_key", DEFAULT_LOG_NAME_KEY, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, log_name_key), - "Set the logname key" - }, - { - FLB_CONFIG_MAP_STR, "http_request_key", HTTPREQUEST_FIELD_IN_JSON, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, http_request_key), - "Set the http request key" - }, - { - FLB_CONFIG_MAP_STR, "k8s_cluster_name", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_name), - "Set the kubernetes cluster name" - }, - { - FLB_CONFIG_MAP_STR, "k8s_cluster_location", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_location), - "Set the kubernetes cluster location" - }, - { - FLB_CONFIG_MAP_STR, "location", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, location), - "Set the resource location" - }, - { - FLB_CONFIG_MAP_STR, "namespace", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, namespace_id), - "Set the resource namespace" - }, - { - FLB_CONFIG_MAP_STR, "node_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, node_id), - "Set the resource node id" - }, - { - FLB_CONFIG_MAP_STR, "job", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, job), - "Set the resource job" - }, - { - FLB_CONFIG_MAP_STR, "task_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, task_id), - "Set the resource task id" - }, - { - FLB_CONFIG_MAP_STR, "compress", NULL, - 0, FLB_FALSE, 0, - "Set log payload compression method. Option available is 'gzip'" - }, - { - FLB_CONFIG_MAP_CLIST, "labels", NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels), - "Set the labels" - }, - { - FLB_CONFIG_MAP_STR, "labels_key", DEFAULT_LABELS_KEY, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels_key), - "Set the labels key" - }, - { - FLB_CONFIG_MAP_STR, "tag_prefix", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, tag_prefix), - "Set the tag prefix" - }, - { - FLB_CONFIG_MAP_STR, "stackdriver_agent", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, stackdriver_agent), - "Set the stackdriver agent" - }, - /* Custom Regex */ - { - FLB_CONFIG_MAP_STR, "custom_k8s_regex", DEFAULT_TAG_REGEX, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, custom_k8s_regex), - "Set a custom kubernetes regex filter" - }, - { - FLB_CONFIG_MAP_CLIST, "resource_labels", NULL, - 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels), - "Set the resource labels" - }, - /* EOF */ - {0} -}; - -struct flb_output_plugin out_stackdriver_plugin = { - .name = "stackdriver", - .description = "Send events to Google Stackdriver Logging", - .cb_init = cb_stackdriver_init, - .cb_flush = cb_stackdriver_flush, - .cb_exit = cb_stackdriver_exit, - .workers = 1, - .config_map = config_map, - - /* Test */ - .test_formatter.callback = stackdriver_format_test, - - /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver.h b/fluent-bit/plugins/out_stackdriver/stackdriver.h deleted file mode 100644 index 239a3ee31..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver.h +++ /dev/null @@ -1,241 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_STACKDRIVER_H -#define FLB_OUT_STACKDRIVER_H - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_oauth2.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_pthread.h> -#include <fluent-bit/flb_regex.h> -#include <fluent-bit/flb_metrics.h> - -/* refresh token every 50 minutes */ -#define FLB_STD_TOKEN_REFRESH 3000 - -/* Stackdriver Logging write scope */ -#define FLB_STD_SCOPE "https://www.googleapis.com/auth/logging.write" - -/* Stackdriver authorization URL */ -#define FLB_STD_AUTH_URL "https://oauth2.googleapis.com/token" - -/* Stackdriver Logging 'write' end-point */ -#define FLB_STD_WRITE_URI "/v2/entries:write" -#define FLB_STD_WRITE_URL "https://logging.googleapis.com" FLB_STD_WRITE_URI - -/* Timestamp format */ -#define FLB_STD_TIME_FMT "%Y-%m-%dT%H:%M:%S" - -/* Default Resource type */ -#define FLB_SDS_RESOURCE_TYPE "global" -#define OPERATION_FIELD_IN_JSON "logging.googleapis.com/operation" -#define MONITORED_RESOURCE_KEY "logging.googleapis.com/monitored_resource" -#define LOCAL_RESOURCE_ID_KEY "logging.googleapis.com/local_resource_id" -#define DEFAULT_LABELS_KEY "logging.googleapis.com/labels" -#define DEFAULT_SEVERITY_KEY "logging.googleapis.com/severity" -#define DEFAULT_TRACE_KEY "logging.googleapis.com/trace" -#define DEFAULT_SPAN_ID_KEY "logging.googleapis.com/spanId" -#define DEFAULT_TRACE_SAMPLED_KEY "logging.googleapis.com/traceSampled" -#define DEFAULT_LOG_NAME_KEY "logging.googleapis.com/logName" -#define DEFAULT_INSERT_ID_KEY "logging.googleapis.com/insertId" -#define SOURCELOCATION_FIELD_IN_JSON "logging.googleapis.com/sourceLocation" -#define HTTPREQUEST_FIELD_IN_JSON "logging.googleapis.com/http_request" -#define INSERT_ID_SIZE 31 -#define LEN_LOCAL_RESOURCE_ID_KEY 40 -#define OPERATION_KEY_SIZE 32 -#define SOURCE_LOCATION_SIZE 37 -#define HTTP_REQUEST_KEY_SIZE 35 - -/* - * Stackdriver implements a specific HTTP status code that is used internally in clients to keep - * track of networking errors that could happen before a successful HTTP request/response. For - * metrics counting purposes, every failed networking connection will use a 502 HTTP status code - * that can be used later to query the metrics by using labels with that value. - */ -#define STACKDRIVER_NET_ERROR 502 - -#define K8S_CONTAINER "k8s_container" -#define K8S_NODE "k8s_node" -#define K8S_POD "k8s_pod" - -#define STDOUT "stdout" -#define STDERR "stderr" - -#define DEFAULT_TAG_REGEX "(?<pod_name>[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace_name>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\\.log$" - -/* Metrics */ -#ifdef FLB_HAVE_METRICS -#define FLB_STACKDRIVER_SUCCESSFUL_REQUESTS 1000 /* successful requests */ -#define FLB_STACKDRIVER_FAILED_REQUESTS 1001 /* failed requests */ -#endif - -struct flb_stackdriver_oauth_credentials { - /* parsed credentials file */ - flb_sds_t type; - flb_sds_t private_key_id; - flb_sds_t private_key; - flb_sds_t client_email; - flb_sds_t client_id; - flb_sds_t auth_uri; - flb_sds_t token_uri; -}; - -struct flb_stackdriver_env { - flb_sds_t creds_file; - flb_sds_t metadata_server; -}; - -struct flb_stackdriver { - /* credentials */ - flb_sds_t credentials_file; - - /* parsed credentials file */ - flb_sds_t type; - flb_sds_t project_id; - flb_sds_t private_key_id; - flb_sds_t private_key; - flb_sds_t client_email; - flb_sds_t client_id; - flb_sds_t auth_uri; - flb_sds_t token_uri; - bool metadata_server_auth; - - /* metadata server (GCP specific, WIP) */ - flb_sds_t metadata_server; - flb_sds_t zone; - flb_sds_t instance_id; - flb_sds_t instance_name; - - /* kubernetes specific */ - flb_sds_t cluster_name; - flb_sds_t cluster_location; - flb_sds_t namespace_name; - flb_sds_t pod_name; - flb_sds_t container_name; - flb_sds_t node_name; - - flb_sds_t local_resource_id; - flb_sds_t tag_prefix; - /* shadow tag_prefix for safe deallocation */ - flb_sds_t tag_prefix_k8s; - - /* labels */ - flb_sds_t labels_key; - struct mk_list *labels; - struct mk_list config_labels; - - /* resource type flag */ - int resource_type; - - /* resource labels api */ - struct mk_list *resource_labels; - struct mk_list resource_labels_kvs; - int should_skip_resource_labels_api; - - /* generic resources */ - flb_sds_t location; - flb_sds_t namespace_id; - - /* generic_node specific */ - flb_sds_t node_id; - - /* generic_task specific */ - flb_sds_t job; - flb_sds_t task_id; - - /* Internal variable to reduce string comparisons */ - int compress_gzip; - - /* other */ - flb_sds_t export_to_project_id; - flb_sds_t resource; - flb_sds_t severity_key; - flb_sds_t trace_key; - flb_sds_t span_id_key; - flb_sds_t trace_sampled_key; - flb_sds_t log_name_key; - flb_sds_t http_request_key; - int http_request_key_size; - bool autoformat_stackdriver_trace; - - flb_sds_t stackdriver_agent; - - /* Regex context to parse tags */ - flb_sds_t custom_k8s_regex; - struct flb_regex *regex; - - /* oauth2 context */ - struct flb_oauth2 *o; - - /* parsed oauth2 credentials */ - struct flb_stackdriver_oauth_credentials *creds; - - /* environment variable settings */ - struct flb_stackdriver_env *env; - - /* mutex for acquiring oauth tokens */ - pthread_mutex_t token_mutex; - - /* upstream context for stackdriver write end-point */ - struct flb_upstream *u; - - /* upstream context for metadata end-point */ - struct flb_upstream *metadata_u; - -#ifdef FLB_HAVE_METRICS - /* metrics */ - struct cmt_counter *cmt_successful_requests; - struct cmt_counter *cmt_failed_requests; - struct cmt_counter *cmt_requests_total; - struct cmt_counter *cmt_proc_records_total; - struct cmt_counter *cmt_retried_records_total; -#endif - - /* plugin instance */ - struct flb_output_instance *ins; - - /* Fluent Bit context */ - struct flb_config *config; -}; - -typedef enum { - FLB_STD_EMERGENCY = 800, - FLB_STD_ALERT = 700, - FLB_STD_CRITICAL = 600, - FLB_STD_ERROR = 500, - FLB_STD_WARNING = 400, - FLB_STD_NOTICE = 300, - FLB_STD_INFO = 200, - FLB_STD_DEBUG = 100, - FLB_STD_DEFAULT = 0 -} severity_t; - -struct local_resource_id_list { - flb_sds_t val; - struct mk_list _head; -}; - -typedef enum { - INSERTID_VALID = 0, - INSERTID_INVALID = 1, - INSERTID_NOT_PRESENT = 2 -} insert_id_status; - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c b/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c deleted file mode 100644 index 9f3f28a35..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c +++ /dev/null @@ -1,667 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_unescape.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_jsmn.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_kv.h> - -#include <sys/types.h> -#include <sys/stat.h> - -#include "gce_metadata.h" -#include "stackdriver.h" -#include "stackdriver_conf.h" -#include "stackdriver_resource_types.h" - -static inline int key_cmp(const char *str, int len, const char *cmp) { - - if (strlen(cmp) != len) { - return -1; - } - - return strncasecmp(str, cmp, len); -} - -static int read_credentials_file(const char *cred_file, struct flb_stackdriver *ctx) -{ - int i; - int ret; - int key_len; - int val_len; - int tok_size = 32; - char *buf; - char *key; - char *val; - flb_sds_t tmp; - struct stat st; - jsmn_parser parser; - jsmntok_t *t; - jsmntok_t *tokens; - - /* Validate credentials path */ - ret = stat(cred_file, &st); - if (ret == -1) { - flb_errno(); - flb_plg_error(ctx->ins, "cannot open credentials file: %s", - cred_file); - return -1; - } - - if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) { - flb_plg_error(ctx->ins, "credentials file " - "is not a valid file: %s", cred_file); - return -1; - } - - /* Read file content */ - buf = mk_file_to_buffer(cred_file); - if (!buf) { - flb_plg_error(ctx->ins, "error reading credentials file: %s", - cred_file); - return -1; - } - - /* Parse content */ - jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); - if (!tokens) { - flb_errno(); - flb_free(buf); - return -1; - } - - ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size); - if (ret <= 0) { - flb_plg_error(ctx->ins, "invalid JSON credentials file: %s", - cred_file); - flb_free(buf); - flb_free(tokens); - return -1; - } - - t = &tokens[0]; - if (t->type != JSMN_OBJECT) { - flb_plg_error(ctx->ins, "invalid JSON map on file: %s", - cred_file); - flb_free(buf); - flb_free(tokens); - return -1; - } - - /* Parse JSON tokens */ - for (i = 1; i < ret; i++) { - t = &tokens[i]; - if (t->type != JSMN_STRING) { - continue; - } - - if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){ - break; - } - - /* Key */ - key = buf + t->start; - key_len = (t->end - t->start); - - /* Value */ - i++; - t = &tokens[i]; - val = buf + t->start; - val_len = (t->end - t->start); - - if (key_cmp(key, key_len, "type") == 0) { - ctx->creds->type = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "project_id") == 0) { - ctx->project_id = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "private_key_id") == 0) { - ctx->creds->private_key_id = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "private_key") == 0) { - tmp = flb_sds_create_len(val, val_len); - if (tmp) { - /* Unescape private key */ - ctx->creds->private_key = flb_sds_create_size(val_len); - flb_unescape_string(tmp, flb_sds_len(tmp), - &ctx->creds->private_key); - flb_sds_destroy(tmp); - } - } - else if (key_cmp(key, key_len, "client_email") == 0) { - ctx->creds->client_email = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "client_id") == 0) { - ctx->creds->client_id = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "auth_uri") == 0) { - ctx->creds->auth_uri = flb_sds_create_len(val, val_len); - } - else if (key_cmp(key, key_len, "token_uri") == 0) { - ctx->creds->token_uri = flb_sds_create_len(val, val_len); - } - } - - flb_free(buf); - flb_free(tokens); - - return 0; -} - -/* - * parse_key_value_list(): - * - Parses an origin list of comma seperated string specifying key=value. - * - Appends the parsed key value pairs into the destination list. - * - Returns the length of the destination list. - */ -static int parse_key_value_list(struct flb_stackdriver *ctx, - struct mk_list *origin, - struct mk_list *dest, - int shouldTrim) -{ - char *p; - flb_sds_t key; - flb_sds_t val; - struct flb_kv *kv; - struct mk_list *head; - struct flb_slist_entry *entry; - - if (origin) { - mk_list_foreach(head, origin) { - entry = mk_list_entry(head, struct flb_slist_entry, _head); - - p = strchr(entry->str, '='); - if (!p) { - flb_plg_error(ctx->ins, "invalid key value pair on '%s'", - entry->str); - return -1; - } - - key = flb_sds_create_size((p - entry->str) + 1); - flb_sds_cat(key, entry->str, p - entry->str); - val = flb_sds_create(p + 1); - if (shouldTrim) { - flb_sds_trim(key); - flb_sds_trim(val); - } - if (!key || flb_sds_len(key) == 0) { - flb_plg_error(ctx->ins, - "invalid key value pair on '%s'", - entry->str); - return -1; - } - if (!val || flb_sds_len(val) == 0) { - flb_plg_error(ctx->ins, - "invalid key value pair on '%s'", - entry->str); - flb_sds_destroy(key); - return -1; - } - - kv = flb_kv_item_create(dest, key, val); - flb_sds_destroy(key); - flb_sds_destroy(val); - - if (!kv) { - return -1; - } - } - } - - return mk_list_size(dest); -} - -/* - * parse_configuration_labels - * - Parse labels set in configuration - * - Returns the number of configuration labels - */ -static int parse_configuration_labels(struct flb_stackdriver *ctx) -{ - return parse_key_value_list(ctx, ctx->labels, - &ctx->config_labels, FLB_FALSE); -} - -/* - * parse_resource_labels(): - * - Parses resource labels set in configuration. - * - Returns the number of resource label mappings. - */ -static int parse_resource_labels(struct flb_stackdriver *ctx) -{ - return parse_key_value_list(ctx, ctx->resource_labels, - &ctx->resource_labels_kvs, FLB_TRUE); -} - -struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance *ins, - struct flb_config *config) -{ - int ret; - const char *tmp; - const char *backwards_compatible_env_var; - struct flb_stackdriver *ctx; - size_t http_request_key_size; - - /* Allocate config context */ - ctx = flb_calloc(1, sizeof(struct flb_stackdriver)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - ctx->config = config; - - ret = flb_output_config_map_set(ins, (void *)ctx); - if (ret == -1) { - flb_plg_error(ins, "unable to load configuration"); - flb_free(ctx); - return NULL; - } - - /* Compress (gzip) */ - tmp = flb_output_get_property("compress", ins); - ctx->compress_gzip = FLB_FALSE; - if (tmp && strcasecmp(tmp, "gzip") == 0) { - ctx->compress_gzip = FLB_TRUE; - } - - /* labels */ - flb_kv_init(&ctx->config_labels); - ret = parse_configuration_labels((void *)ctx); - if (ret == -1) { - flb_plg_error(ins, "unable to parse configuration labels"); - flb_kv_release(&ctx->config_labels); - flb_free(ctx); - return NULL; - } - - /* resource labels */ - flb_kv_init(&ctx->resource_labels_kvs); - ret = parse_resource_labels((void *)ctx); - if (ret == -1) { - flb_plg_error(ins, "unable to parse resource label list"); - flb_kv_release(&ctx->resource_labels_kvs); - flb_free(ctx); - return NULL; - } - - /* Lookup metadata server URL */ - ctx->metadata_server = NULL; - tmp = flb_output_get_property("metadata_server", ins); - if (tmp == NULL) { - tmp = getenv("METADATA_SERVER"); - if(tmp) { - if (ctx->env == NULL) { - ctx->env = flb_calloc(1, sizeof(struct flb_stackdriver_env)); - if (ctx->env == NULL) { - flb_plg_error(ins, "unable to allocate env variables"); - flb_free(ctx); - return NULL; - } - } - ctx->env->metadata_server = flb_sds_create(tmp); - ctx->metadata_server = ctx->env->metadata_server; - } - else { - ctx->metadata_server = flb_sds_create(FLB_STD_METADATA_SERVER); - } - } - else { - ctx->metadata_server = flb_sds_create(tmp); - } - flb_plg_info(ctx->ins, "metadata_server set to %s", ctx->metadata_server); - - /* Lookup credentials file */ - if (ctx->credentials_file == NULL) { - /* - * Use GOOGLE_APPLICATION_CREDENTIALS to fetch the credentials. - * GOOGLE_SERVICE_CREDENTIALS is checked for backwards compatibility. - */ - tmp = getenv("GOOGLE_APPLICATION_CREDENTIALS"); - backwards_compatible_env_var = getenv("GOOGLE_SERVICE_CREDENTIALS"); - if (tmp && backwards_compatible_env_var) { - flb_plg_warn(ctx->ins, "GOOGLE_APPLICATION_CREDENTIALS and " - "GOOGLE_SERVICE_CREDENTIALS are both defined. " - "Defaulting to GOOGLE_APPLICATION_CREDENTIALS"); - } - if ((tmp || backwards_compatible_env_var) && (ctx->env == NULL)) { - ctx->env = flb_calloc(1, sizeof(struct flb_stackdriver_env)); - if (ctx->env == NULL) { - flb_plg_error(ins, "unable to allocate env variables"); - flb_free(ctx); - return NULL; - } - } - if (tmp) { - ctx->env->creds_file = flb_sds_create(tmp); - ctx->credentials_file = ctx->env->creds_file; - } - else if (backwards_compatible_env_var) { - ctx->env->creds_file = flb_sds_create(backwards_compatible_env_var); - ctx->credentials_file = ctx->env->creds_file; - } - } - - if (ctx->credentials_file) { - ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials)); - if (ctx->creds == NULL) { - flb_plg_error(ctx->ins, "unable to allocate credentials"); - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - ret = read_credentials_file(ctx->credentials_file, ctx); - if (ret != 0) { - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - ctx->type = ctx->creds->type; - ctx->private_key_id = ctx->creds->private_key_id; - ctx->private_key = ctx->creds->private_key; - ctx->client_email = ctx->creds->client_email; - ctx->client_id = ctx->creds->client_id; - ctx->auth_uri = ctx->creds->auth_uri; - ctx->token_uri = ctx->creds->token_uri; - } - else { - /* - * If no credentials file has been defined, do manual lookup of the - * client email and the private key - */ - ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials)); - if (ctx->creds == NULL) { - flb_plg_error(ctx->ins, "unable to allocate credentials"); - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - - /* Service Account Email */ - if (ctx->client_email == NULL) { - tmp = getenv("SERVICE_ACCOUNT_EMAIL"); - if (tmp) { - ctx->creds->client_email = flb_sds_create(tmp); - } - } - - /* Service Account Secret */ - if (ctx->private_key == NULL) { - tmp = getenv("SERVICE_ACCOUNT_SECRET"); - if (tmp) { - ctx->creds->private_key = flb_sds_create(tmp); - } - } - - ctx->private_key = ctx->creds->private_key; - ctx->client_email = ctx->creds->client_email; - } - - /* - * If only client email has been provided, fetch token from - * the GCE metadata server. - * - * If no credentials have been provided, fetch token from the GCE - * metadata server for default account. - */ - if (!ctx->client_email && ctx->private_key) { - flb_plg_error(ctx->ins, "client_email is not defined"); - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - - if (!ctx->client_email) { - flb_plg_warn(ctx->ins, "client_email is not defined, using " - "a default one"); - if (ctx->creds == NULL) { - ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials)); - if (ctx->creds == NULL) { - flb_plg_error(ctx->ins, "unable to allocate credentials"); - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - } - ctx->creds->client_email = flb_sds_create("default"); - ctx->client_email = ctx->creds->client_email; - } - if (!ctx->private_key) { - flb_plg_warn(ctx->ins, "private_key is not defined, fetching " - "it from metadata server"); - ctx->metadata_server_auth = true; - } - - if (ctx->http_request_key) { - http_request_key_size = flb_sds_len(ctx->http_request_key); - if (http_request_key_size >= INT_MAX) { - flb_plg_error(ctx->ins, "http_request_key is too long"); - flb_sds_destroy(ctx->http_request_key); - ctx->http_request_key = NULL; - ctx->http_request_key_size = 0; - } else { - ctx->http_request_key_size = http_request_key_size; - } - } - - set_resource_type(ctx); - - if (resource_api_has_required_labels(ctx) == FLB_FALSE) { - - if (ctx->resource_type == RESOURCE_TYPE_K8S) { - if (!ctx->cluster_name || !ctx->cluster_location) { - flb_plg_error(ctx->ins, "missing k8s_cluster_name " - "or k8s_cluster_location in configuration"); - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - } - - else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE - || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) { - - if (ctx->location == NULL) { - flb_plg_error(ctx->ins, "missing generic resource's location"); - } - - if (ctx->namespace_id == NULL) { - flb_plg_error(ctx->ins, "missing generic resource's namespace"); - } - - if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE) { - if (ctx->node_id == NULL) { - flb_plg_error(ctx->ins, "missing generic_node's node_id"); - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - } - else { - if (ctx->job == NULL) { - flb_plg_error(ctx->ins, "missing generic_task's job"); - } - - if (ctx->task_id == NULL) { - flb_plg_error(ctx->ins, "missing generic_task's task_id"); - } - - if (!ctx->job || !ctx->task_id) { - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - } - - if (!ctx->location || !ctx->namespace_id) { - flb_stackdriver_conf_destroy(ctx); - return NULL; - } - } - } - - - if (ctx->tag_prefix == NULL && ctx->resource_type == RESOURCE_TYPE_K8S) { - /* allocate the flb_sds_t to tag_prefix_k8s so we can safely deallocate it */ - ctx->tag_prefix_k8s = flb_sds_create(ctx->resource); - ctx->tag_prefix_k8s = flb_sds_cat(ctx->tag_prefix_k8s, ".", 1); - ctx->tag_prefix = ctx->tag_prefix_k8s; - } - - /* Register metrics */ -#ifdef FLB_HAVE_METRICS - ctx->cmt_successful_requests = cmt_counter_create(ins->cmt, - "fluentbit", - "stackdriver", - "successful_requests", - "Total number of successful " - "requests.", - 1, (char *[]) {"name"}); - - ctx->cmt_failed_requests = cmt_counter_create(ins->cmt, - "fluentbit", - "stackdriver", - "failed_requests", - "Total number of failed " - "requests.", - 1, (char *[]) {"name"}); - - ctx->cmt_requests_total = cmt_counter_create(ins->cmt, - "fluentbit", - "stackdriver", - "requests_total", - "Total number of requests.", - 2, (char *[]) {"status", "name"}); - - ctx->cmt_proc_records_total = cmt_counter_create(ins->cmt, - "fluentbit", - "stackdriver", - "proc_records_total", - "Total number of processed records.", - 2, (char *[]) {"status", "name"}); - - ctx->cmt_retried_records_total = cmt_counter_create(ins->cmt, - "fluentbit", - "stackdriver", - "retried_records_total", - "Total number of retried records.", - 2, (char *[]) {"status", "name"}); - - /* OLD api */ - flb_metrics_add(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, - "stackdriver_successful_requests", ctx->ins->metrics); - flb_metrics_add(FLB_STACKDRIVER_FAILED_REQUESTS, - "stackdriver_failed_requests", ctx->ins->metrics); -#endif - - return ctx; -} - -int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx) -{ - if (!ctx) { - return -1; - } - - if (ctx->creds) { - if (ctx->creds->type) { - flb_sds_destroy(ctx->creds->type); - } - if (ctx->creds->private_key_id) { - flb_sds_destroy(ctx->creds->private_key_id); - } - if (ctx->creds->private_key) { - flb_sds_destroy(ctx->creds->private_key); - } - if (ctx->creds->client_email) { - flb_sds_destroy(ctx->creds->client_email); - } - if (ctx->creds->client_id) { - flb_sds_destroy(ctx->creds->client_id); - } - if (ctx->creds->auth_uri) { - flb_sds_destroy(ctx->creds->auth_uri); - } - if (ctx->creds->token_uri) { - flb_sds_destroy(ctx->creds->token_uri); - } - flb_free(ctx->creds); - } - - if (ctx->env) { - if (ctx->env->creds_file) { - flb_sds_destroy(ctx->env->creds_file); - } - if (ctx->env->metadata_server) { - flb_sds_destroy(ctx->env->metadata_server); - /* - * If ctx->env is not NULL, - * ctx->metadata_server points ctx->env->metadata_server. - * - * We set ctx->metadata_server to NULL to prevent double free. - */ - ctx->metadata_server = NULL; - } - flb_free(ctx->env); - } - - if (ctx->metadata_server) { - flb_sds_destroy(ctx->metadata_server); - } - - if (ctx->resource_type == RESOURCE_TYPE_K8S){ - flb_sds_destroy(ctx->namespace_name); - flb_sds_destroy(ctx->pod_name); - flb_sds_destroy(ctx->container_name); - flb_sds_destroy(ctx->node_name); - flb_sds_destroy(ctx->local_resource_id); - } - - if (ctx->metadata_server_auth) { - flb_sds_destroy(ctx->zone); - flb_sds_destroy(ctx->instance_id); - } - - if (ctx->metadata_u) { - flb_upstream_destroy(ctx->metadata_u); - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - if (ctx->o) { - flb_oauth2_destroy(ctx->o); - } - - if (ctx->regex) { - flb_regex_destroy(ctx->regex); - } - - if (ctx->project_id) { - flb_sds_destroy(ctx->project_id); - } - - if (ctx->tag_prefix_k8s) { - flb_sds_destroy(ctx->tag_prefix_k8s); - } - - flb_kv_release(&ctx->config_labels); - flb_kv_release(&ctx->resource_labels_kvs); - flb_free(ctx); - - return 0; -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.h b/fluent-bit/plugins/out_stackdriver/stackdriver_conf.h deleted file mode 100644 index 6244e6851..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.h +++ /dev/null @@ -1,29 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_STACKDRIVER_CONF_H -#define FLB_OUT_STACKDRIVER_CONF_H - -#include "stackdriver.h" - -struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx); - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.c b/fluent-bit/plugins/out_stackdriver/stackdriver_helper.c deleted file mode 100644 index e5b94c481..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.c +++ /dev/null @@ -1,63 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include "stackdriver.h" - -int equal_obj_str(msgpack_object obj, const char *str, const int size) { - if (obj.type != MSGPACK_OBJECT_STR) { - return FLB_FALSE; - } - if (size != obj.via.str.size - || strncmp(str, obj.via.str.ptr, obj.via.str.size) != 0) { - return FLB_FALSE; - } - return FLB_TRUE; -} - -int validate_key(msgpack_object obj, const char *str, const int size) { - return equal_obj_str(obj, str, size); -} - -void try_assign_subfield_str(msgpack_object obj, flb_sds_t *subfield) { - if (obj.type == MSGPACK_OBJECT_STR) { - *subfield = flb_sds_copy(*subfield, obj.via.str.ptr, - obj.via.str.size); - } -} - -void try_assign_subfield_bool(msgpack_object obj, int *subfield) { - if (obj.type == MSGPACK_OBJECT_BOOLEAN) { - if (obj.via.boolean) { - *subfield = FLB_TRUE; - } - else { - *subfield = FLB_FALSE; - } - } -} - -void try_assign_subfield_int(msgpack_object obj, int64_t *subfield) { - if (obj.type == MSGPACK_OBJECT_STR) { - *subfield = atoll(obj.via.str.ptr); - } - else if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - *subfield = obj.via.i64; - } -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.h b/fluent-bit/plugins/out_stackdriver/stackdriver_helper.h deleted file mode 100644 index ab8ac30a8..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.h +++ /dev/null @@ -1,51 +0,0 @@ -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef FLB_STD_HELPER_H -#define FLB_STD_HELPER_H - -#include "stackdriver.h" - -/* - * Compare obj->via.str and str. - * Return FLB_TRUE if they are equal. - * Return FLB_FALSE if obj->type is not string or they are not equal - */ -int equal_obj_str(msgpack_object obj, const char *str, const int size); - -int validate_key(msgpack_object obj, const char *str, const int size); - -/* - * if obj->type is string, assign obj->val to subfield - * Otherwise leave the subfield untouched - */ -void try_assign_subfield_str(msgpack_object obj, flb_sds_t *subfield); - -/* - * if obj->type is boolean, assign obj->val to subfield - * Otherwise leave the subfield untouched - */ -void try_assign_subfield_bool(msgpack_object obj, int *subfield); - -/* - * if obj->type is valid, assign obj->val to subfield - * Otherwise leave the subfield untouched - */ -void try_assign_subfield_int(msgpack_object obj, int64_t *subfield); - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c b/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c deleted file mode 100644 index 9a1c814c0..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c +++ /dev/null @@ -1,393 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <fluent-bit/flb_regex.h> -#include "stackdriver.h" -#include "stackdriver_helper.h" -#include "stackdriver_http_request.h" - -#include <ctype.h> - -typedef enum { - NO_HTTPREQUEST = 1, - HTTPREQUEST_EXISTS = 2 -} http_request_status; - -void init_http_request(struct http_request_field *http_request) -{ - http_request->latency = flb_sds_create(""); - http_request->protocol = flb_sds_create(""); - http_request->referer = flb_sds_create(""); - http_request->remoteIp = flb_sds_create(""); - http_request->requestMethod = flb_sds_create(""); - http_request->requestUrl = flb_sds_create(""); - http_request->serverIp = flb_sds_create(""); - http_request->userAgent = flb_sds_create(""); - - http_request->cacheFillBytes = 0; - http_request->requestSize = 0; - http_request->responseSize = 0; - http_request->status = 0; - - http_request->cacheHit = FLB_FALSE; - http_request->cacheLookup = FLB_FALSE; - http_request->cacheValidatedWithOriginServer = FLB_FALSE; -} - -void destroy_http_request(struct http_request_field *http_request) -{ - flb_sds_destroy(http_request->latency); - flb_sds_destroy(http_request->protocol); - flb_sds_destroy(http_request->referer); - flb_sds_destroy(http_request->remoteIp); - flb_sds_destroy(http_request->requestMethod); - flb_sds_destroy(http_request->requestUrl); - flb_sds_destroy(http_request->serverIp); - flb_sds_destroy(http_request->userAgent); -} - -void add_http_request_field(struct http_request_field *http_request, - msgpack_packer *mp_pck) -{ - msgpack_pack_str(mp_pck, 11); - msgpack_pack_str_body(mp_pck, "httpRequest", 11); - - if (flb_sds_is_empty(http_request->latency) == FLB_TRUE) { - msgpack_pack_map(mp_pck, 14); - } - else { - msgpack_pack_map(mp_pck, 15); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_LATENCY_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_LATENCY, - HTTP_REQUEST_LATENCY_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->latency)); - msgpack_pack_str_body(mp_pck, http_request->latency, - flb_sds_len(http_request->latency)); - } - - /* String sub-fields */ - msgpack_pack_str(mp_pck, HTTP_REQUEST_REQUEST_METHOD_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REQUEST_METHOD, - HTTP_REQUEST_REQUEST_METHOD_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->requestMethod)); - msgpack_pack_str_body(mp_pck, http_request->requestMethod, - flb_sds_len(http_request->requestMethod)); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_REQUEST_URL_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REQUEST_URL, - HTTP_REQUEST_REQUEST_URL_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->requestUrl)); - msgpack_pack_str_body(mp_pck, http_request->requestUrl, - flb_sds_len(http_request->requestUrl)); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_USER_AGENT_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_USER_AGENT, - HTTP_REQUEST_USER_AGENT_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->userAgent)); - msgpack_pack_str_body(mp_pck, http_request->userAgent, - flb_sds_len(http_request->userAgent)); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_REMOTE_IP_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REMOTE_IP, - HTTP_REQUEST_REMOTE_IP_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->remoteIp)); - msgpack_pack_str_body(mp_pck, http_request->remoteIp, - flb_sds_len(http_request->remoteIp)); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_SERVER_IP_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_SERVER_IP, - HTTP_REQUEST_SERVER_IP_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->serverIp)); - msgpack_pack_str_body(mp_pck, http_request->serverIp, - flb_sds_len(http_request->serverIp)); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_REFERER_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REFERER, - HTTP_REQUEST_REFERER_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->referer)); - msgpack_pack_str_body(mp_pck, http_request->referer, - flb_sds_len(http_request->referer)); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_PROTOCOL_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_PROTOCOL, - HTTP_REQUEST_PROTOCOL_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(http_request->protocol)); - msgpack_pack_str_body(mp_pck, http_request->protocol, - flb_sds_len(http_request->protocol)); - - /* Integer sub-fields */ - msgpack_pack_str(mp_pck, HTTP_REQUEST_REQUESTSIZE_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REQUESTSIZE, - HTTP_REQUEST_REQUESTSIZE_SIZE); - msgpack_pack_int64(mp_pck, http_request->requestSize); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_RESPONSESIZE_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_RESPONSESIZE, - HTTP_REQUEST_RESPONSESIZE_SIZE); - msgpack_pack_int64(mp_pck, http_request->responseSize); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_STATUS_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_STATUS, HTTP_REQUEST_STATUS_SIZE); - msgpack_pack_int64(mp_pck, http_request->status); - - msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_FILL_BYTES_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_FILL_BYTES, - HTTP_REQUEST_CACHE_FILL_BYTES_SIZE); - msgpack_pack_int64(mp_pck, http_request->cacheFillBytes); - - /* Boolean sub-fields */ - msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_LOOKUP_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_LOOKUP, - HTTP_REQUEST_CACHE_LOOKUP_SIZE); - if (http_request->cacheLookup == FLB_TRUE) { - msgpack_pack_true(mp_pck); - } - else { - msgpack_pack_false(mp_pck); - } - - msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_HIT_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_HIT, - HTTP_REQUEST_CACHE_HIT_SIZE); - if (http_request->cacheLookup == FLB_TRUE) { - msgpack_pack_true(mp_pck); - } - else { - msgpack_pack_false(mp_pck); - } - - msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE); - msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER, - HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE); - if (http_request->cacheValidatedWithOriginServer == FLB_TRUE) { - msgpack_pack_true(mp_pck); - } - else { - msgpack_pack_false(mp_pck); - } -} - -/* latency should be in the format: - * whitespace (opt.) + integer + point & decimal (opt.) - * + whitespace (opt.) + "s" + whitespace (opt.) - * - * latency is Duration, so the maximum value is "315576000000.999999999s". - * (23 characters in length) - */ -static void validate_latency(msgpack_object_str latency_in_payload, - struct http_request_field *http_request) { - int i = 0; - int j = 0; - int status = 0; - char extract_latency[32]; - flb_sds_t pattern; - struct flb_regex *regex; - - pattern = flb_sds_create("^\\s*\\d+(.\\d+)?\\s*s\\s*$"); - if (!pattern) { - return; - } - - if (latency_in_payload.size > sizeof(extract_latency)) { - flb_sds_destroy(pattern); - return; - } - - regex = flb_regex_create(pattern); - status = flb_regex_match(regex, - (unsigned char *) latency_in_payload.ptr, - latency_in_payload.size); - flb_regex_destroy(regex); - flb_sds_destroy(pattern); - - if (status == 1) { - for (; i < latency_in_payload.size; ++ i) { - if (latency_in_payload.ptr[i] == '.' || latency_in_payload.ptr[i] == 's' - || isdigit(latency_in_payload.ptr[i])) { - extract_latency[j] = latency_in_payload.ptr[i]; - ++ j; - } - } - http_request->latency = flb_sds_copy(http_request->latency, extract_latency, j); - } -} - -/* Return true if httpRequest extracted */ -int extract_http_request(struct http_request_field *http_request, - flb_sds_t http_request_key, - int http_request_key_size, - msgpack_object *obj, int *extra_subfields) -{ - http_request_status op_status = NO_HTTPREQUEST; - msgpack_object_kv *p; - msgpack_object_kv *pend; - msgpack_object_kv *tmp_p; - msgpack_object_kv *tmp_pend; - - if (obj->via.map.size == 0) { - return FLB_FALSE; - } - - p = obj->via.map.ptr; - pend = obj->via.map.ptr + obj->via.map.size; - - for (; p < pend && op_status == NO_HTTPREQUEST; ++p) { - - if (p->val.type != MSGPACK_OBJECT_MAP - || !validate_key(p->key, http_request_key, - http_request_key_size)) { - - continue; - } - - op_status = HTTPREQUEST_EXISTS; - msgpack_object sub_field = p->val; - - tmp_p = sub_field.via.map.ptr; - tmp_pend = sub_field.via.map.ptr + sub_field.via.map.size; - - /* Validate the subfields of httpRequest */ - for (; tmp_p < tmp_pend; ++tmp_p) { - if (tmp_p->key.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (validate_key(tmp_p->key, HTTP_REQUEST_LATENCY, - HTTP_REQUEST_LATENCY_SIZE)) { - if (tmp_p->val.type != MSGPACK_OBJECT_STR) { - continue; - } - validate_latency(tmp_p->val.via.str, http_request); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_PROTOCOL, - HTTP_REQUEST_PROTOCOL_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->protocol); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_REFERER, - HTTP_REQUEST_REFERER_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->referer); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_REMOTE_IP, - HTTP_REQUEST_REMOTE_IP_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->remoteIp); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_REQUEST_METHOD, - HTTP_REQUEST_REQUEST_METHOD_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->requestMethod); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_REQUEST_URL, - HTTP_REQUEST_REQUEST_URL_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->requestUrl); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_SERVER_IP, - HTTP_REQUEST_SERVER_IP_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->serverIp); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_USER_AGENT, - HTTP_REQUEST_USER_AGENT_SIZE)) { - try_assign_subfield_str(tmp_p->val, &http_request->userAgent); - } - - else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_FILL_BYTES, - HTTP_REQUEST_CACHE_FILL_BYTES_SIZE)) { - try_assign_subfield_int(tmp_p->val, &http_request->cacheFillBytes); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_REQUESTSIZE, - HTTP_REQUEST_REQUESTSIZE_SIZE)) { - try_assign_subfield_int(tmp_p->val, &http_request->requestSize); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_RESPONSESIZE, - HTTP_REQUEST_RESPONSESIZE_SIZE)) { - try_assign_subfield_int(tmp_p->val, &http_request->responseSize); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_STATUS, - HTTP_REQUEST_STATUS_SIZE)) { - try_assign_subfield_int(tmp_p->val, &http_request->status); - } - - else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_HIT, - HTTP_REQUEST_CACHE_HIT_SIZE)) { - try_assign_subfield_bool(tmp_p->val, &http_request->cacheHit); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_LOOKUP, - HTTP_REQUEST_CACHE_LOOKUP_SIZE)) { - try_assign_subfield_bool(tmp_p->val, &http_request->cacheLookup); - } - else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER, - HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE)) { - try_assign_subfield_bool(tmp_p->val, - &http_request->cacheValidatedWithOriginServer); - } - - else { - *extra_subfields += 1; - } - } - } - - return op_status == HTTPREQUEST_EXISTS; -} - -void pack_extra_http_request_subfields(msgpack_packer *mp_pck, - msgpack_object *http_request, - int extra_subfields) { - msgpack_object_kv *p = http_request->via.map.ptr; - msgpack_object_kv *const pend = http_request->via.map.ptr + http_request->via.map.size; - - msgpack_pack_map(mp_pck, extra_subfields); - - for (; p < pend; ++p) { - if (validate_key(p->key, HTTP_REQUEST_LATENCY, - HTTP_REQUEST_LATENCY_SIZE) - || validate_key(p->key, HTTP_REQUEST_PROTOCOL, - HTTP_REQUEST_PROTOCOL_SIZE) - || validate_key(p->key, HTTP_REQUEST_REFERER, - HTTP_REQUEST_REFERER_SIZE) - || validate_key(p->key, HTTP_REQUEST_REMOTE_IP, - HTTP_REQUEST_REMOTE_IP_SIZE) - || validate_key(p->key, HTTP_REQUEST_REQUEST_METHOD, - HTTP_REQUEST_REQUEST_METHOD_SIZE) - || validate_key(p->key, HTTP_REQUEST_REQUEST_URL, - HTTP_REQUEST_REQUEST_URL_SIZE) - || validate_key(p->key, HTTP_REQUEST_SERVER_IP, - HTTP_REQUEST_SERVER_IP_SIZE) - || validate_key(p->key, HTTP_REQUEST_USER_AGENT, - HTTP_REQUEST_USER_AGENT_SIZE) - || validate_key(p->key, HTTP_REQUEST_CACHE_FILL_BYTES, - HTTP_REQUEST_CACHE_FILL_BYTES_SIZE) - || validate_key(p->key, HTTP_REQUEST_REQUESTSIZE, - HTTP_REQUEST_REQUESTSIZE_SIZE) - || validate_key(p->key, HTTP_REQUEST_RESPONSESIZE, - HTTP_REQUEST_RESPONSESIZE_SIZE) - || validate_key(p->key, HTTP_REQUEST_STATUS, - HTTP_REQUEST_STATUS_SIZE) - || validate_key(p->key, HTTP_REQUEST_CACHE_HIT, - HTTP_REQUEST_CACHE_HIT_SIZE) - || validate_key(p->key, HTTP_REQUEST_CACHE_LOOKUP, - HTTP_REQUEST_CACHE_LOOKUP_SIZE) - || validate_key(p->key, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER, - HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE)) { - - continue; - } - - msgpack_pack_object(mp_pck, p->key); - msgpack_pack_object(mp_pck, p->val); - } -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h b/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h deleted file mode 100644 index 8b935c3f7..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h +++ /dev/null @@ -1,120 +0,0 @@ -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef FLB_STD_HTTPREQUEST_H -#define FLB_STD_HTTPREQUEST_H - -#include "stackdriver.h" - -/* subfield name and size */ -#define HTTP_REQUEST_LATENCY "latency" -#define HTTP_REQUEST_PROTOCOL "protocol" -#define HTTP_REQUEST_REFERER "referer" -#define HTTP_REQUEST_REMOTE_IP "remoteIp" -#define HTTP_REQUEST_REQUEST_METHOD "requestMethod" -#define HTTP_REQUEST_REQUEST_URL "requestUrl" -#define HTTP_REQUEST_SERVER_IP "serverIp" -#define HTTP_REQUEST_USER_AGENT "userAgent" -#define HTTP_REQUEST_CACHE_FILL_BYTES "cacheFillBytes" -#define HTTP_REQUEST_REQUESTSIZE "requestSize" -#define HTTP_REQUEST_RESPONSESIZE "responseSize" -#define HTTP_REQUEST_STATUS "status" -#define HTTP_REQUEST_CACHE_HIT "cacheHit" -#define HTTP_REQUEST_CACHE_LOOKUP "cacheLookup" -#define HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER "cacheValidatedWithOriginServer" - -#define HTTP_REQUEST_LATENCY_SIZE 7 -#define HTTP_REQUEST_PROTOCOL_SIZE 8 -#define HTTP_REQUEST_REFERER_SIZE 7 -#define HTTP_REQUEST_REMOTE_IP_SIZE 8 -#define HTTP_REQUEST_REQUEST_METHOD_SIZE 13 -#define HTTP_REQUEST_REQUEST_URL_SIZE 10 -#define HTTP_REQUEST_SERVER_IP_SIZE 8 -#define HTTP_REQUEST_USER_AGENT_SIZE 9 -#define HTTP_REQUEST_CACHE_FILL_BYTES_SIZE 14 -#define HTTP_REQUEST_REQUESTSIZE_SIZE 11 -#define HTTP_REQUEST_RESPONSESIZE_SIZE 12 -#define HTTP_REQUEST_STATUS_SIZE 6 -#define HTTP_REQUEST_CACHE_HIT_SIZE 8 -#define HTTP_REQUEST_CACHE_LOOKUP_SIZE 11 -#define HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE 30 - - -struct http_request_field { - flb_sds_t latency; - flb_sds_t protocol; - flb_sds_t referer; - flb_sds_t remoteIp; - flb_sds_t requestMethod; - flb_sds_t requestUrl; - flb_sds_t serverIp; - flb_sds_t userAgent; - - int64_t cacheFillBytes; - int64_t requestSize; - int64_t responseSize; - int64_t status; - - int cacheHit; - int cacheLookup; - int cacheValidatedWithOriginServer; -}; - -void init_http_request(struct http_request_field *http_request); -void destroy_http_request(struct http_request_field *http_request); - -/* - * Add httpRequest field to the entries. - * The structure of httpRequest is as shown in struct http_request_field - */ -void add_http_request_field(struct http_request_field *http_request, - msgpack_packer *mp_pck); - -/* - * Extract the httpRequest field from the jsonPayload. - * If the httpRequest field exists, return TRUE and store the subfields. - * If there are extra subfields, count the number. - */ -int extract_http_request(struct http_request_field *http_request, - flb_sds_t http_request_key, - int http_request_key_size, - msgpack_object *obj, int *extra_subfields); - -/* - * When there are extra subfields, we will preserve the extra subfields inside jsonPayload - * For example, if the jsonPayload is as followed: - * jsonPayload { - * "logging.googleapis.com/http_request": { - * "requestMethod": "GET", - * "latency": "1s", - * "cacheLookup": true, - * "extra": "some string" #extra subfield - * } - * } - * We will preserve the extra subfields. The jsonPayload after extracting is: - * jsonPayload { - * "logging.googleapis.com/http_request": { - * "extra": "some string" - * } - * } - */ -void pack_extra_http_request_subfields(msgpack_packer *mp_pck, - msgpack_object *http_request, - int extra_subfields); - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.c b/fluent-bit/plugins/out_stackdriver/stackdriver_operation.c deleted file mode 100644 index 548e8b473..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.c +++ /dev/null @@ -1,147 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> - -#include "stackdriver.h" -#include "stackdriver_helper.h" -#include "stackdriver_operation.h" - -typedef enum { - NO_OPERATION = 1, - OPERATION_EXISTED = 2 -} operation_status; - -void add_operation_field(flb_sds_t *operation_id, flb_sds_t *operation_producer, - int *operation_first, int *operation_last, - msgpack_packer *mp_pck) -{ - msgpack_pack_str(mp_pck, 9); - msgpack_pack_str_body(mp_pck, "operation", 9); - - msgpack_pack_map(mp_pck, 4); - - msgpack_pack_str(mp_pck, OPERATION_ID_SIZE); - msgpack_pack_str_body(mp_pck, OPERATION_ID, OPERATION_ID_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(*operation_id)); - msgpack_pack_str_body(mp_pck, *operation_id, flb_sds_len(*operation_id)); - - msgpack_pack_str(mp_pck, OPERATION_PRODUCER_SIZE); - msgpack_pack_str_body(mp_pck, OPERATION_PRODUCER, OPERATION_PRODUCER_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(*operation_producer)); - msgpack_pack_str_body(mp_pck, *operation_producer, - flb_sds_len(*operation_producer)); - - msgpack_pack_str(mp_pck, OPERATION_FIRST_SIZE); - msgpack_pack_str_body(mp_pck, OPERATION_FIRST, OPERATION_FIRST_SIZE); - if (*operation_first == FLB_TRUE) { - msgpack_pack_true(mp_pck); - } - else { - msgpack_pack_false(mp_pck); - } - - msgpack_pack_str(mp_pck, OPERATION_LAST_SIZE); - msgpack_pack_str_body(mp_pck, OPERATION_LAST, OPERATION_LAST_SIZE); - if (*operation_last == FLB_TRUE) { - msgpack_pack_true(mp_pck); - } - else { - msgpack_pack_false(mp_pck); - } -} - -/* Return true if operation extracted */ -int extract_operation(flb_sds_t *operation_id, flb_sds_t *operation_producer, - int *operation_first, int *operation_last, - msgpack_object *obj, int *extra_subfields) -{ - operation_status op_status = NO_OPERATION; - msgpack_object_kv *p; - msgpack_object_kv *pend; - msgpack_object_kv *tmp_p; - msgpack_object_kv *tmp_pend; - - if (obj->via.map.size == 0) { - return FLB_FALSE; - } - p = obj->via.map.ptr; - pend = obj->via.map.ptr + obj->via.map.size; - - for (; p < pend && op_status == NO_OPERATION; ++p) { - - if (p->val.type != MSGPACK_OBJECT_MAP - || !validate_key(p->key, OPERATION_FIELD_IN_JSON, - OPERATION_KEY_SIZE)) { - continue; - } - - op_status = OPERATION_EXISTED; - msgpack_object sub_field = p->val; - - tmp_p = sub_field.via.map.ptr; - tmp_pend = sub_field.via.map.ptr + sub_field.via.map.size; - - /* Validate the subfields of operation */ - for (; tmp_p < tmp_pend; ++tmp_p) { - if (tmp_p->key.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (validate_key(tmp_p->key, OPERATION_ID, OPERATION_ID_SIZE)) { - try_assign_subfield_str(tmp_p->val, operation_id); - } - else if (validate_key(tmp_p->key, OPERATION_PRODUCER, - OPERATION_PRODUCER_SIZE)) { - try_assign_subfield_str(tmp_p->val, operation_producer); - } - else if (validate_key(tmp_p->key, OPERATION_FIRST, OPERATION_FIRST_SIZE)) { - try_assign_subfield_bool(tmp_p->val, operation_first); - } - else if (validate_key(tmp_p->key, OPERATION_LAST, OPERATION_LAST_SIZE)) { - try_assign_subfield_bool(tmp_p->val, operation_last); - } - else { - *extra_subfields += 1; - } - } - } - - return op_status == OPERATION_EXISTED; -} - -void pack_extra_operation_subfields(msgpack_packer *mp_pck, - msgpack_object *operation, int extra_subfields) { - msgpack_object_kv *p = operation->via.map.ptr; - msgpack_object_kv *const pend = operation->via.map.ptr + operation->via.map.size; - - msgpack_pack_map(mp_pck, extra_subfields); - - for (; p < pend; ++p) { - if (validate_key(p->key, OPERATION_ID, OPERATION_ID_SIZE) - || validate_key(p->key, OPERATION_PRODUCER, OPERATION_PRODUCER_SIZE) - || validate_key(p->key, OPERATION_FIRST, OPERATION_FIRST_SIZE) - || validate_key(p->key, OPERATION_LAST, OPERATION_LAST_SIZE)) { - continue; - } - - msgpack_pack_object(mp_pck, p->key); - msgpack_pack_object(mp_pck, p->val); - } -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.h b/fluent-bit/plugins/out_stackdriver/stackdriver_operation.h deleted file mode 100644 index ded886c3b..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.h +++ /dev/null @@ -1,82 +0,0 @@ -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef FLB_STD_OPERATION_H -#define FLB_STD_OPERATION_H - -#include "stackdriver.h" - -/* subfield name and size */ -#define OPERATION_ID "id" -#define OPERATION_PRODUCER "producer" -#define OPERATION_FIRST "first" -#define OPERATION_LAST "last" - -#define OPERATION_ID_SIZE 2 -#define OPERATION_PRODUCER_SIZE 8 -#define OPERATION_FIRST_SIZE 5 -#define OPERATION_LAST_SIZE 4 - -/* - * Add operation field to the entries. - * The structure of operation is: - * { - * "id": string, - * "producer": string, - * "first": boolean, - * "last": boolean - * } - * - */ -void add_operation_field(flb_sds_t *operation_id, flb_sds_t *operation_producer, - int *operation_first, int *operation_last, - msgpack_packer *mp_pck); - -/* - * Extract the operation field from the jsonPayload. - * If the operation field exists, return TRUE and store the subfields. - * If there are extra subfields, count the number. - */ -int extract_operation(flb_sds_t *operation_id, flb_sds_t *operation_producer, - int *operation_first, int *operation_last, - msgpack_object *obj, int *extra_subfields); - -/* - * When there are extra subfields, we will preserve the extra subfields inside jsonPayload - * For example, if the jsonPayload is as followed: - * jsonPayload { - * "logging.googleapis.com/operation": { - * "id": "id1", - * "producer": "id2", - * "first": true, - * "last": true, - * "extra": "some string" #extra subfield - * } - * } - * We will preserve the extra subfields. The jsonPayload after extracting is: - * jsonPayload { - * "logging.googleapis.com/operation": { - * "extra": "some string" - * } - * } - */ -void pack_extra_operation_subfields(msgpack_packer *mp_pck, msgpack_object *operation, - int extra_subfields); - - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c b/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c deleted file mode 100644 index b114e4922..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c +++ /dev/null @@ -1,143 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_kv.h> - -#include "stackdriver.h" -#include "stackdriver_resource_types.h" - -static const struct resource_type resource_types[] = { - { - .id = RESOURCE_TYPE_K8S, - .resources = {"k8s_container", "k8s_node", "k8s_pod"}, - .required_labels = {"cluster_name", "location"} - }, - { - .id = RESOURCE_TYPE_GENERIC_NODE, - .resources = {"generic_node"}, - .required_labels = {"location", "namespace", "node_id"} - }, - { - .id = RESOURCE_TYPE_GENERIC_TASK, - .resources = {"generic_task"}, - .required_labels = {"location", "namespace", "job", "task_id"} - } -}; - -static char **get_required_labels(int resource_type) -{ - int i; - int len; - - len = sizeof(resource_types) / sizeof(resource_types[0]); - for(i = 0; i < len; i++) { - if (resource_types[i].id == resource_type) { - return (char **) resource_types[i].required_labels; - } - } - return NULL; -} - -/* - * set_resource_type(): - * - Iterates through resource_types that are set up for validation and sets the - * resource_type if it matches one of them. - * - A resource may not be in the resource types list but still be accepted - * and processed (e.g. global) if it does not require / is not set up for validation. - */ -void set_resource_type(struct flb_stackdriver *ctx) -{ - int i; - int j; - int len; - char *resource; - struct resource_type resource_type; - - len = sizeof(resource_types) / sizeof(resource_types[0]); - for(i = 0; i < len; i++) { - resource_type = resource_types[i]; - for(j = 0; j < MAX_RESOURCE_ENTRIES; j++) { - if (resource_type.resources[j] != NULL) { - resource = resource_type.resources[j]; - if (flb_sds_cmp(ctx->resource, resource, strlen(resource)) == 0) { - ctx->resource_type = resource_type.id; - return; - } - } - } - } -} - -/* - * resource_api_has_required_labels(): - * - Determines if all required labels for the set resource type are present as - * keys on the resource labels key-value pairs. - */ -int resource_api_has_required_labels(struct flb_stackdriver *ctx) -{ - struct mk_list *head; - struct flb_hash_table *ht; - struct flb_kv *label_kv; - char** required_labels; - int i; - int found; - void *tmp_buf; - size_t tmp_size; - - if (mk_list_size(&ctx->resource_labels_kvs) == 0) { - return FLB_FALSE; - } - - required_labels = get_required_labels(ctx->resource_type); - if (required_labels == NULL) { - flb_plg_warn(ctx->ins, "no validation applied to resource_labels " - "for set resource type"); - return FLB_FALSE; - } - - ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, MAX_REQUIRED_LABEL_ENTRIES, 0); - mk_list_foreach(head, &ctx->resource_labels_kvs) { - label_kv = mk_list_entry(head, struct flb_kv, _head); - for (i = 0; i < MAX_REQUIRED_LABEL_ENTRIES; i++) { - if (required_labels[i] != NULL && flb_sds_cmp(label_kv->key, - required_labels[i], strlen(required_labels[i])) == 0) { - flb_hash_table_add(ht, required_labels[i], strlen(required_labels[i]), - NULL, 0); - } - } - } - - for (i = 0; i < MAX_REQUIRED_LABEL_ENTRIES; i++) { - if (required_labels[i] != NULL) { - found = flb_hash_table_get(ht, required_labels[i], strlen(required_labels[i]), - &tmp_buf, &tmp_size); - if (found == -1) { - flb_plg_warn(ctx->ins, "labels set in resource_labels will not be applied" - ", as the required resource label [%s] is missing", required_labels[i]); - ctx->should_skip_resource_labels_api = FLB_TRUE; - flb_hash_table_destroy(ht); - return FLB_FALSE; - } - } - } - flb_hash_table_destroy(ht); - return FLB_TRUE; -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h b/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h deleted file mode 100644 index 5e45c8745..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h +++ /dev/null @@ -1,41 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_STACKDRIVER_RESOURCE_TYPES_H -#define FLB_OUT_STACKDRIVER_RESOURCE_TYPES_H - -#include "stackdriver.h" - -#define MAX_RESOURCE_ENTRIES 10 -#define MAX_REQUIRED_LABEL_ENTRIES 10 - -#define RESOURCE_TYPE_K8S 1 -#define RESOURCE_TYPE_GENERIC_NODE 2 -#define RESOURCE_TYPE_GENERIC_TASK 3 - -struct resource_type { - int id; - char* resources[MAX_RESOURCE_ENTRIES]; - char* required_labels[MAX_REQUIRED_LABEL_ENTRIES]; -}; - -void set_resource_type(struct flb_stackdriver *ctx); -int resource_api_has_required_labels(struct flb_stackdriver *ctx); - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c b/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c deleted file mode 100644 index 58102c91e..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c +++ /dev/null @@ -1,139 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "stackdriver.h" -#include "stackdriver_helper.h" -#include "stackdriver_source_location.h" - -typedef enum { - NO_SOURCELOCATION = 1, - SOURCELOCATION_EXISTED = 2 -} source_location_status; - - -void add_source_location_field(flb_sds_t *source_location_file, - int64_t source_location_line, - flb_sds_t *source_location_function, - msgpack_packer *mp_pck) -{ - msgpack_pack_str(mp_pck, 14); - msgpack_pack_str_body(mp_pck, "sourceLocation", 14); - msgpack_pack_map(mp_pck, 3); - - msgpack_pack_str(mp_pck, SOURCE_LOCATION_FILE_SIZE); - msgpack_pack_str_body(mp_pck, SOURCE_LOCATION_FILE, SOURCE_LOCATION_FILE_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(*source_location_file)); - msgpack_pack_str_body(mp_pck, *source_location_file, - flb_sds_len(*source_location_file)); - - msgpack_pack_str(mp_pck, SOURCE_LOCATION_LINE_SIZE); - msgpack_pack_str_body(mp_pck, SOURCE_LOCATION_LINE, SOURCE_LOCATION_LINE_SIZE); - msgpack_pack_int64(mp_pck, source_location_line); - - msgpack_pack_str(mp_pck, SOURCE_LOCATION_FUNCTION_SIZE); - msgpack_pack_str_body(mp_pck, SOURCE_LOCATION_FUNCTION, - SOURCE_LOCATION_FUNCTION_SIZE); - msgpack_pack_str(mp_pck, flb_sds_len(*source_location_function)); - msgpack_pack_str_body(mp_pck, *source_location_function, - flb_sds_len(*source_location_function)); -} - -/* Return FLB_TRUE if sourceLocation extracted */ -int extract_source_location(flb_sds_t *source_location_file, - int64_t *source_location_line, - flb_sds_t *source_location_function, - msgpack_object *obj, int *extra_subfields) -{ - source_location_status op_status = NO_SOURCELOCATION; - msgpack_object_kv *p; - msgpack_object_kv *pend; - msgpack_object_kv *tmp_p; - msgpack_object_kv *tmp_pend; - - if (obj->via.map.size == 0) { - return FLB_FALSE; - } - p = obj->via.map.ptr; - pend = obj->via.map.ptr + obj->via.map.size; - - for (; p < pend && op_status == NO_SOURCELOCATION; ++p) { - - if (p->val.type != MSGPACK_OBJECT_MAP - || p->key.type != MSGPACK_OBJECT_STR - || !validate_key(p->key, SOURCELOCATION_FIELD_IN_JSON, - SOURCE_LOCATION_SIZE)) { - - continue; - } - - op_status = SOURCELOCATION_EXISTED; - msgpack_object sub_field = p->val; - - tmp_p = sub_field.via.map.ptr; - tmp_pend = sub_field.via.map.ptr + sub_field.via.map.size; - - /* Validate the subfields of sourceLocation */ - for (; tmp_p < tmp_pend; ++tmp_p) { - if (tmp_p->key.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (validate_key(tmp_p->key, - SOURCE_LOCATION_FILE, - SOURCE_LOCATION_FILE_SIZE)) { - try_assign_subfield_str(tmp_p->val, source_location_file); - } - else if (validate_key(tmp_p->key, - SOURCE_LOCATION_FUNCTION, - SOURCE_LOCATION_FUNCTION_SIZE)) { - try_assign_subfield_str(tmp_p->val, source_location_function); - } - else if (validate_key(tmp_p->key, - SOURCE_LOCATION_LINE, - SOURCE_LOCATION_LINE_SIZE)) { - try_assign_subfield_int(tmp_p->val, source_location_line); - } - else { - *extra_subfields += 1; - } - } - } - - return op_status == SOURCELOCATION_EXISTED; -} - -void pack_extra_source_location_subfields(msgpack_packer *mp_pck, - msgpack_object *source_location, - int extra_subfields) { - msgpack_object_kv *p = source_location->via.map.ptr; - msgpack_object_kv *const pend = source_location->via.map.ptr + source_location->via.map.size; - - msgpack_pack_map(mp_pck, extra_subfields); - - for (; p < pend; ++p) { - if (validate_key(p->key, SOURCE_LOCATION_FILE, SOURCE_LOCATION_FILE_SIZE) - || validate_key(p->key, SOURCE_LOCATION_LINE, SOURCE_LOCATION_LINE_SIZE) - || validate_key(p->key, SOURCE_LOCATION_FUNCTION, - SOURCE_LOCATION_FUNCTION_SIZE)) { - continue; - } - - msgpack_pack_object(mp_pck, p->key); - msgpack_pack_object(mp_pck, p->val); - } -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h b/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h deleted file mode 100644 index 4f703d330..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h +++ /dev/null @@ -1,80 +0,0 @@ -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef FLB_STD_SOURCELOCATION_H -#define FLB_STD_SOURCELOCATION_H - -#include "stackdriver.h" - -/* subfield name and size */ -#define SOURCE_LOCATION_FILE "file" -#define SOURCE_LOCATION_LINE "line" -#define SOURCE_LOCATION_FUNCTION "function" - -#define SOURCE_LOCATION_FILE_SIZE 4 -#define SOURCE_LOCATION_LINE_SIZE 4 -#define SOURCE_LOCATION_FUNCTION_SIZE 8 - -/* - * Add sourceLocation field to the entries. - * The structure of sourceLocation is: - * { - * "file": string, - * "line": int, - * "function": string - * } - */ -void add_source_location_field(flb_sds_t *source_location_file, - int64_t source_location_line, - flb_sds_t *source_location_function, - msgpack_packer *mp_pck); - -/* - * Extract the sourceLocation field from the jsonPayload. - * If the sourceLocation field exists, return TRUE and store the subfields. - * If there are extra subfields, count the number. - */ -int extract_source_location(flb_sds_t *source_location_file, - int64_t *source_location_line, - flb_sds_t *source_location_function, - msgpack_object *obj, int *extra_subfields); - -/* - * When there are extra subfields, we will preserve the extra subfields inside jsonPayload - * For example, if the jsonPayload is as followed: - * jsonPayload { - * "logging.googleapis.com/sourceLocation": { - * "file": "file1", - * "line": 1, - * "function": "func1", - * "extra": "some string" #extra subfield - * } - * } - * We will preserve the extra subfields. The jsonPayload after extracting is: - * jsonPayload { - * "logging.googleapis.com/sourceLocation": { - * "extra": "some string" - * } - * } - */ -void pack_extra_source_location_subfields(msgpack_packer *mp_pck, - msgpack_object *source_location, - int extra_subfields); - - -#endif diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c b/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c deleted file mode 100644 index a9b350d22..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c +++ /dev/null @@ -1,180 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> - -#include "stackdriver.h" -#include "stackdriver_helper.h" -#include "stackdriver_timestamp.h" -#include <fluent-bit/flb_regex.h> - -#include <ctype.h> - -static int is_integer(char *str, int size) { - int i; - for (i = 0; i < size; ++ i) { - if (!isdigit(str[i])) { - return FLB_FALSE; - } - } - return FLB_TRUE; -} - -static void try_assign_time(long long seconds, long long nanos, - struct flb_time *tms) -{ - if (seconds != 0) { - tms->tm.tv_sec = seconds; - tms->tm.tv_nsec = nanos; - } -} - -static long long get_integer(msgpack_object obj) -{ - char tmp[32]; - - if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - return obj.via.i64; - } - else if (obj.type == MSGPACK_OBJECT_STR - && is_integer((char *) obj.via.str.ptr, - obj.via.str.size)) { - - /* - * use an intermediary buffer to perform the conversion to avoid any - * overflow by atoll. LLONG_MAX value is +9,223,372,036,854,775,807, - * so using a 32 bytes buffer is enough. - */ - if (obj.via.str.size > sizeof(tmp) - 1) { - return 0; - } - - memcpy(tmp, obj.via.str.ptr, obj.via.str.size); - tmp[obj.via.str.size] = '\0'; - - return atoll(tmp); - } - - return 0; -} - -static int extract_format_timestamp_object(msgpack_object *obj, - struct flb_time *tms) -{ - int seconds_found = FLB_FALSE; - int nanos_found = FLB_FALSE; - long long seconds = 0; - long long nanos = 0; - - msgpack_object_kv *p; - msgpack_object_kv *pend; - msgpack_object_kv *tmp_p; - msgpack_object_kv *tmp_pend; - - if (obj->via.map.size == 0) { - return FLB_FALSE; - } - p = obj->via.map.ptr; - pend = obj->via.map.ptr + obj->via.map.size; - - for (; p < pend; ++p) { - if (!validate_key(p->key, "timestamp", 9) - || p->val.type != MSGPACK_OBJECT_MAP) { - continue; - } - - tmp_p = p->val.via.map.ptr; - tmp_pend = p->val.via.map.ptr + p->val.via.map.size; - - for (; tmp_p < tmp_pend; ++tmp_p) { - if (validate_key(tmp_p->key, "seconds", 7)) { - seconds_found = FLB_TRUE; - seconds = get_integer(tmp_p->val); - - if (nanos_found == FLB_TRUE) { - try_assign_time(seconds, nanos, tms); - return FLB_TRUE; - } - } - else if (validate_key(tmp_p->key, "nanos", 5)) { - nanos_found = FLB_TRUE; - nanos = get_integer(tmp_p->val); - - if (seconds_found == FLB_TRUE) { - try_assign_time(seconds, nanos, tms); - return FLB_TRUE; - } - } - } - } - return FLB_FALSE; -} - -static int extract_format_timestamp_duo_fields(msgpack_object *obj, - struct flb_time *tms) -{ - int seconds_found = FLB_FALSE; - int nanos_found = FLB_FALSE; - long long seconds = 0; - long long nanos = 0; - - msgpack_object_kv *p; - msgpack_object_kv *pend; - - if (obj->via.map.size == 0) { - return FLB_FALSE; - } - p = obj->via.map.ptr; - pend = obj->via.map.ptr + obj->via.map.size; - - for (; p < pend; ++p) { - if (validate_key(p->key, "timestampSeconds", 16)) { - seconds_found = FLB_TRUE; - seconds = get_integer(p->val); - - if (nanos_found == FLB_TRUE) { - try_assign_time(seconds, nanos, tms); - return FLB_TRUE; - } - } - else if (validate_key(p->key, "timestampNanos", 14)) { - nanos_found = FLB_TRUE; - nanos = get_integer(p->val); - - if (seconds_found == FLB_TRUE) { - try_assign_time(seconds, nanos, tms); - return FLB_TRUE; - } - } - } - - return FLB_FALSE; -} - -timestamp_status extract_timestamp(msgpack_object *obj, - struct flb_time *tms) -{ - if (extract_format_timestamp_object(obj, tms)) { - return FORMAT_TIMESTAMP_OBJECT; - } - if (extract_format_timestamp_duo_fields(obj, tms)) { - return FORMAT_TIMESTAMP_DUO_FIELDS; - } - return TIMESTAMP_NOT_PRESENT; -} diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h b/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h deleted file mode 100644 index f3c025864..000000000 --- a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h +++ /dev/null @@ -1,47 +0,0 @@ -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef FLB_STD_TIMESTAMP_H -#define FLB_STD_TIMESTAMP_H - -#include "stackdriver.h" -#include <fluent-bit/flb_time.h> - -typedef enum { - TIMESTAMP_NOT_PRESENT = 0, - FORMAT_TIMESTAMP_OBJECT = 1, - FORMAT_TIMESTAMP_DUO_FIELDS = 2 -} timestamp_status; - -/* - * Currently support two formats of time-related fields - * - "timestamp":{"seconds", "nanos"} - * - "timestampSeconds"/"timestampNanos" - * - * If timestamp field is not existed, return TIMESTAMP_NOT_PRESENT - * If timestamp format is "timestamp":{"seconds", "nanos"}, - * set the time and return FORMAT_TIMESTAMP - * - * If timestamp format is "timestampSeconds"/"timestampNanos", - * set the time and return FORMAT_TIMESTAMPSECONDS - */ -timestamp_status extract_timestamp(msgpack_object *obj, - struct flb_time *tms); - - -#endif |