summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_stackdriver
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/out_stackdriver
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_stackdriver')
-rw-r--r--fluent-bit/plugins/out_stackdriver/CMakeLists.txt13
-rw-r--r--fluent-bit/plugins/out_stackdriver/gce_metadata.c222
-rw-r--r--fluent-bit/plugins/out_stackdriver/gce_metadata.h48
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver.c2867
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver.h241
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_conf.c667
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_conf.h29
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_helper.c63
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_helper.h51
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c393
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h120
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_operation.c147
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_operation.h82
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c143
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h41
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c139
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h80
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c180
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h47
19 files changed, 0 insertions, 5573 deletions
diff --git a/fluent-bit/plugins/out_stackdriver/CMakeLists.txt b/fluent-bit/plugins/out_stackdriver/CMakeLists.txt
deleted file mode 100644
index 2d7fa71bb..000000000
--- a/fluent-bit/plugins/out_stackdriver/CMakeLists.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-set(src
- gce_metadata.c
- stackdriver_conf.c
- stackdriver.c
- stackdriver_operation.c
- stackdriver_source_location.c
- stackdriver_http_request.c
- stackdriver_timestamp.c
- stackdriver_helper.c
- stackdriver_resource_types.c
- )
-
-FLB_PLUGIN(out_stackdriver "${src}" "")
diff --git a/fluent-bit/plugins/out_stackdriver/gce_metadata.c b/fluent-bit/plugins/out_stackdriver/gce_metadata.c
deleted file mode 100644
index fb942213b..000000000
--- a/fluent-bit/plugins/out_stackdriver/gce_metadata.c
+++ /dev/null
@@ -1,222 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_http_client.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_oauth2.h>
-
-#include <msgpack.h>
-
-#include "gce_metadata.h"
-#include "stackdriver.h"
-#include "stackdriver_conf.h"
-
-
-static int fetch_metadata(struct flb_stackdriver *ctx,
- struct flb_upstream *upstream, char *uri,
- char *payload)
-{
- int ret;
- int ret_code;
- size_t b_sent;
- struct flb_connection *metadata_conn;
- struct flb_http_client *c;
-
- /* If runtime test mode is enabled, add test data */
- if (ctx->ins->test_mode == FLB_TRUE) {
- if (strcmp(uri, FLB_STD_METADATA_PROJECT_ID_URI) == 0) {
- flb_sds_cat(payload, "fluent-bit-test", 15);
- return 0;
- }
- else if (strcmp(uri, FLB_STD_METADATA_ZONE_URI) == 0) {
- flb_sds_cat(payload, "projects/0123456789/zones/fluent", 32);
- return 0;
- }
- else if (strcmp(uri, FLB_STD_METADATA_INSTANCE_ID_URI) == 0) {
- flb_sds_cat(payload, "333222111", 9);
- return 0;
- }
- return -1;
- }
-
- /* Get metadata connection */
- metadata_conn = flb_upstream_conn_get(upstream);
- if (!metadata_conn) {
- flb_plg_error(ctx->ins, "failed to create metadata connection");
- return -1;
- }
-
- /* Compose HTTP Client request */
- c = flb_http_client(metadata_conn, FLB_HTTP_GET, uri,
- "", 0, NULL, 0, NULL, 0);
-
- flb_http_buffer_size(c, FLB_STD_METADATA_TOKEN_SIZE_MAX);
-
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
- flb_http_add_header(c, "Content-Type", 12, "application/text", 16);
- flb_http_add_header(c, "Metadata-Flavor", 15, "Google", 6);
-
- /* Send HTTP request */
- ret = flb_http_do(c, &b_sent);
-
- /* validate response */
- if (ret != 0) {
- flb_plg_warn(ctx->ins, "http_do=%i", ret);
- ret_code = -1;
- }
- else {
- /* The request was issued successfully, validate the 'error' field */
- flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
- if (c->resp.status == 200) {
- ret_code = 0;
- flb_sds_copy(payload, c->resp.payload, c->resp.payload_size);
- }
- else {
- if (c->resp.payload_size > 0) {
- /* we got an error */
- flb_plg_warn(ctx->ins, "error\n%s", c->resp.payload);
- }
- else {
- flb_plg_debug(ctx->ins, "response\n%s", c->resp.payload);
- }
- ret_code = -1;
- }
- }
-
- /* Cleanup */
- flb_http_client_destroy(c);
- flb_upstream_conn_release(metadata_conn);
-
- return ret_code;
-}
-
-int gce_metadata_read_token(struct flb_stackdriver *ctx)
-{
- int ret;
- flb_sds_t uri = flb_sds_create(FLB_STD_METADATA_SERVICE_ACCOUNT_URI);
- flb_sds_t payload = flb_sds_create_size(FLB_STD_METADATA_TOKEN_SIZE_MAX);
-
- uri = flb_sds_cat(uri, ctx->client_email, flb_sds_len(ctx->client_email));
- uri = flb_sds_cat(uri, "/token", 6);
- ret = fetch_metadata(ctx, ctx->metadata_u, uri, payload);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "can't fetch token from the metadata server");
- flb_sds_destroy(payload);
- flb_sds_destroy(uri);
- return -1;
- }
-
- ret = flb_oauth2_parse_json_response(payload, flb_sds_len(payload), ctx->o);
- flb_sds_destroy(payload);
- flb_sds_destroy(uri);
-
- if (ret != 0) {
- flb_plg_error(ctx->ins, "unable to parse token body");
- return -1;
- }
- ctx->o->expires = time(NULL) + ctx->o->expires_in;
- return 0;
-}
-
-int gce_metadata_read_zone(struct flb_stackdriver *ctx)
-{
- int ret;
- int i;
- int j;
- int part = 0;
- flb_sds_t payload = flb_sds_create_size(4096);
- flb_sds_t zone = NULL;
-
- ret = fetch_metadata(ctx, ctx->metadata_u, FLB_STD_METADATA_ZONE_URI,
- payload);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "can't fetch zone from the metadata server");
- flb_sds_destroy(payload);
- return -1;
- }
-
- /* Data returned in the format projects/{project-id}/zones/{name} */
- for (i = 0; i < flb_sds_len(payload); ++i) {
- if (payload[i] == '/') {
- part++;
- }
- if (part == 3) {
- i++;
- break;
- }
- }
-
- if (part != 3) {
- flb_plg_error(ctx->ins, "wrong format of zone response");
- flb_sds_destroy(payload);
- return -1;
- }
-
- zone = flb_sds_create_size(flb_sds_len(payload) - i);
-
- j = 0;
- while (i != flb_sds_len(payload)) {
- zone[j] = payload[i];
- i++;
- j++;
- }
- zone[j] = '\0';
- ctx->zone = flb_sds_create(zone);
- flb_sds_destroy(zone);
- flb_sds_destroy(payload);
-
- return 0;
-}
-
-int gce_metadata_read_project_id(struct flb_stackdriver *ctx)
-{
- int ret;
- flb_sds_t payload = flb_sds_create_size(4096);
-
- ret = fetch_metadata(ctx, ctx->metadata_u,
- FLB_STD_METADATA_PROJECT_ID_URI, payload);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "can't fetch project id from the metadata server");
- flb_sds_destroy(payload);
- return -1;
- }
- ctx->project_id = flb_sds_create(payload);
- flb_sds_destroy(payload);
- return 0;
-}
-
-int gce_metadata_read_instance_id(struct flb_stackdriver *ctx)
-{
- int ret;
- flb_sds_t payload = flb_sds_create_size(4096);
-
- ret = fetch_metadata(ctx, ctx->metadata_u,
- FLB_STD_METADATA_INSTANCE_ID_URI, payload);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "can't fetch instance id from the metadata server");
- flb_sds_destroy(payload);
- return -1;
- }
- ctx->instance_id = flb_sds_create(payload);
- flb_sds_destroy(payload);
- return 0;
-}
diff --git a/fluent-bit/plugins/out_stackdriver/gce_metadata.h b/fluent-bit/plugins/out_stackdriver/gce_metadata.h
deleted file mode 100644
index 65009588d..000000000
--- a/fluent-bit/plugins/out_stackdriver/gce_metadata.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLUENT_BIT_GCE_METADATA_H
-#define FLUENT_BIT_GCE_METADATA_H
-
-#include "stackdriver.h"
-
-/* Metadata server URL */
-#define FLB_STD_METADATA_SERVER "http://metadata.google.internal"
-
-/* Project ID metadata URI */
-#define FLB_STD_METADATA_PROJECT_ID_URI "/computeMetadata/v1/project/project-id"
-
-/* Zone metadata URI */
-#define FLB_STD_METADATA_ZONE_URI "/computeMetadata/v1/instance/zone"
-
-/* Instance ID metadata URI */
-#define FLB_STD_METADATA_INSTANCE_ID_URI "/computeMetadata/v1/instance/id"
-
-/* Service account metadata URI */
-#define FLB_STD_METADATA_SERVICE_ACCOUNT_URI "/computeMetadata/v1/instance/service-accounts/"
-
-/* Max size of token response from metadata server */
-#define FLB_STD_METADATA_TOKEN_SIZE_MAX 14336
-
-int gce_metadata_read_token(struct flb_stackdriver *ctx);
-int gce_metadata_read_zone(struct flb_stackdriver *ctx);
-int gce_metadata_read_project_id(struct flb_stackdriver *ctx);
-int gce_metadata_read_instance_id(struct flb_stackdriver *ctx);
-
-#endif //FLUENT_BIT_GCE_METADATA_H
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver.c b/fluent-bit/plugins/out_stackdriver/stackdriver.c
deleted file mode 100644
index 5c48a338b..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver.c
+++ /dev/null
@@ -1,2867 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_http_client.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_oauth2.h>
-#include <fluent-bit/flb_regex.h>
-#include <fluent-bit/flb_pthread.h>
-#include <fluent-bit/flb_crypto.h>
-#include <fluent-bit/flb_hash.h>
-#include <fluent-bit/flb_base64.h>
-#include <fluent-bit/flb_kv.h>
-#include <fluent-bit/flb_ra_key.h>
-#include <fluent-bit/flb_record_accessor.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-#include <fluent-bit/flb_gzip.h>
-
-#include <msgpack.h>
-
-#include "gce_metadata.h"
-#include "stackdriver.h"
-#include "stackdriver_conf.h"
-#include "stackdriver_operation.h"
-#include "stackdriver_source_location.h"
-#include "stackdriver_http_request.h"
-#include "stackdriver_timestamp.h"
-#include "stackdriver_helper.h"
-#include "stackdriver_resource_types.h"
-
-pthread_key_t oauth2_type;
-pthread_key_t oauth2_token;
-pthread_key_t oauth2_token_expires;
-
-static void oauth2_cache_exit(void *ptr)
-{
- if (ptr) {
- flb_sds_destroy(ptr);
- }
-}
-
-static void oauth2_cache_free_expiration(void *ptr)
-{
- if (ptr) {
- flb_free(ptr);
- }
-}
-
-static void oauth2_cache_init()
-{
- /* oauth2 pthread key */
- pthread_key_create(&oauth2_type, oauth2_cache_exit);
- pthread_key_create(&oauth2_token, oauth2_cache_exit);
- pthread_key_create(&oauth2_token_expires, oauth2_cache_free_expiration);
-}
-
-/* Set oauth2 type and token in pthread keys */
-static void oauth2_cache_set(char *type, char *token, time_t expires)
-{
- flb_sds_t tmp;
- time_t *tmp_expires;
-
- /* oauth2 type */
- tmp = pthread_getspecific(oauth2_type);
- if (tmp) {
- flb_sds_destroy(tmp);
- }
- tmp = flb_sds_create(type);
- pthread_setspecific(oauth2_type, tmp);
-
- /* oauth2 access token */
- tmp = pthread_getspecific(oauth2_token);
- if (tmp) {
- flb_sds_destroy(tmp);
- }
- tmp = flb_sds_create(token);
- pthread_setspecific(oauth2_token, tmp);
-
- /* oauth2 access token expiration */
- tmp_expires = pthread_getspecific(oauth2_token_expires);
- if (tmp_expires) {
- flb_free(tmp_expires);
- }
- tmp_expires = flb_calloc(1, sizeof(time_t));
- if (!tmp_expires) {
- flb_errno();
- return;
- }
- *tmp_expires = expires;
- pthread_setspecific(oauth2_token_expires, tmp_expires);
-}
-
-/* By using pthread keys cached values, compose the authorizatoin token */
-static time_t oauth2_cache_get_expiration()
-{
- time_t *expires = pthread_getspecific(oauth2_token_expires);
- if (expires) {
- return *expires;
- }
- return 0;
-}
-
-/* By using pthread keys cached values, compose the authorizatoin token */
-static flb_sds_t oauth2_cache_to_token()
-{
- flb_sds_t type;
- flb_sds_t token;
- flb_sds_t output;
-
- type = pthread_getspecific(oauth2_type);
- if (!type) {
- return NULL;
- }
-
- output = flb_sds_create(type);
- if (!output) {
- return NULL;
- }
-
- token = pthread_getspecific(oauth2_token);
- flb_sds_printf(&output, " %s", token);
- return output;
-}
-
-/*
- * Base64 Encoding in JWT must:
- *
- * - remove any trailing padding '=' character
- * - replace '+' with '-'
- * - replace '/' with '_'
- *
- * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C
- */
-int jwt_base64_url_encode(unsigned char *out_buf, size_t out_size,
- unsigned char *in_buf, size_t in_size,
- size_t *olen)
-
-{
- int i;
- size_t len;
- int result;
-
-
- /* do normal base64 encoding */
- result = flb_base64_encode((unsigned char *) out_buf, out_size - 1,
- &len, in_buf, in_size);
- if (result != 0) {
- return -1;
- }
-
- /* Replace '+' and '/' characters */
- for (i = 0; i < len && out_buf[i] != '='; i++) {
- if (out_buf[i] == '+') {
- out_buf[i] = '-';
- }
- else if (out_buf[i] == '/') {
- out_buf[i] = '_';
- }
- }
-
- /* Now 'i' becomes the new length */
- *olen = i;
- return 0;
-}
-
-static int jwt_encode(char *payload, char *secret,
- char **out_signature, size_t *out_size,
- struct flb_stackdriver *ctx)
-{
- int ret;
- int len;
- int buf_size;
- size_t olen;
- char *buf;
- char *sigd;
- char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}";
- unsigned char sha256_buf[32] = {0};
- flb_sds_t out;
- unsigned char sig[256] = {0};
- size_t sig_len;
-
- buf_size = (strlen(payload) + strlen(secret)) * 2;
- buf = flb_malloc(buf_size);
- if (!buf) {
- flb_errno();
- return -1;
- }
-
- /* Encode header */
- len = strlen(headers);
- ret = flb_base64_encode((unsigned char *) buf, buf_size - 1,
- &olen, (unsigned char *) headers, len);
- if (ret != 0) {
- flb_free(buf);
-
- return ret;
- }
-
- /* Create buffer to store JWT */
- out = flb_sds_create_size(2048);
- if (!out) {
- flb_errno();
- flb_free(buf);
- return -1;
- }
-
- /* Append header */
- flb_sds_cat(out, buf, olen);
- flb_sds_cat(out, ".", 1);
-
- /* Encode Payload */
- len = strlen(payload);
- jwt_base64_url_encode((unsigned char *) buf, buf_size,
- (unsigned char *) payload, len, &olen);
-
- /* Append Payload */
- flb_sds_cat(out, buf, olen);
-
- /* do sha256() of base64(header).base64(payload) */
- ret = flb_hash_simple(FLB_HASH_SHA256,
- (unsigned char *) out, flb_sds_len(out),
- sha256_buf, sizeof(sha256_buf));
-
- if (ret != FLB_CRYPTO_SUCCESS) {
- flb_plg_error(ctx->ins, "error hashing token");
- flb_free(buf);
- flb_sds_destroy(out);
- return -1;
- }
-
- len = strlen(secret);
- sig_len = sizeof(sig);
-
- ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY,
- FLB_CRYPTO_PADDING_PKCS1,
- FLB_HASH_SHA256,
- (unsigned char *) secret, len,
- sha256_buf, sizeof(sha256_buf),
- sig, &sig_len);
-
- if (ret != FLB_CRYPTO_SUCCESS) {
- flb_plg_error(ctx->ins, "error creating RSA context");
- flb_free(buf);
- flb_sds_destroy(out);
- return -1;
- }
-
- sigd = flb_malloc(2048);
- if (!sigd) {
- flb_errno();
- flb_free(buf);
- flb_sds_destroy(out);
- return -1;
- }
-
- jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen);
-
- flb_sds_cat(out, ".", 1);
- flb_sds_cat(out, sigd, olen);
-
- *out_signature = out;
- *out_size = flb_sds_len(out);
-
- flb_free(buf);
- flb_free(sigd);
-
- return 0;
-}
-
-/* Create a new oauth2 context and get a oauth2 token */
-static int get_oauth2_token(struct flb_stackdriver *ctx)
-{
- int ret;
- char *token;
- char *sig_data;
- size_t sig_size;
- time_t issued;
- time_t expires;
- char payload[1024];
-
- flb_oauth2_payload_clear(ctx->o);
-
- /* In case of using metadata server, fetch token from there */
- if (ctx->metadata_server_auth) {
- return gce_metadata_read_token(ctx);
- }
-
- /* JWT encode for oauth2 */
- issued = time(NULL);
- expires = issued + FLB_STD_TOKEN_REFRESH;
-
- snprintf(payload, sizeof(payload) - 1,
- "{\"iss\": \"%s\", \"scope\": \"%s\", "
- "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}",
- ctx->client_email, FLB_STD_SCOPE,
- FLB_STD_AUTH_URL,
- expires, issued);
-
- /* Compose JWT signature */
- ret = jwt_encode(payload, ctx->private_key, &sig_data, &sig_size, ctx);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "JWT signature generation failed");
- return -1;
- }
- flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data);
-
- ret = flb_oauth2_payload_append(ctx->o,
- "grant_type", -1,
- "urn%3Aietf%3Aparams%3Aoauth%3A"
- "grant-type%3Ajwt-bearer", -1);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "error appending oauth2 params");
- flb_sds_destroy(sig_data);
- return -1;
- }
-
- ret = flb_oauth2_payload_append(ctx->o,
- "assertion", -1,
- sig_data, sig_size);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "error appending oauth2 params");
- flb_sds_destroy(sig_data);
- return -1;
- }
- flb_sds_destroy(sig_data);
-
- /* Retrieve access token */
- token = flb_oauth2_token_get(ctx->o);
- if (!token) {
- flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
- return -1;
- }
-
- return 0;
-}
-
-static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
-{
- int ret = 0;
- flb_sds_t output = NULL;
- time_t cached_expiration = 0;
-
- ret = pthread_mutex_trylock(&ctx->token_mutex);
- if (ret == EBUSY) {
- /*
- * If the routine is locked we just use our pre-cached values and
- * compose the expected authorization value.
- *
- * If the routine fails it will return NULL and the caller will just
- * issue a FLB_RETRY.
- */
- output = oauth2_cache_to_token();
- cached_expiration = oauth2_cache_get_expiration();
- if (time(NULL) >= cached_expiration) {
- return output;
- } else {
- /*
- * Cached token is expired. Wait on lock to use up-to-date token
- * by either waiting for it to be refreshed or refresh it ourselves.
- */
- flb_plg_info(ctx->ins, "Cached token is expired. Waiting on lock.");
- ret = pthread_mutex_lock(&ctx->token_mutex);
- }
- }
-
- if (ret != 0) {
- flb_plg_error(ctx->ins, "error locking mutex");
- return NULL;
- }
-
- if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
- ret = get_oauth2_token(ctx);
- }
-
- /* Copy string to prevent race conditions (get_oauth2 can free the string) */
- if (ret == 0) {
- /* Update pthread keys cached values */
- oauth2_cache_set(ctx->o->token_type, ctx->o->access_token, ctx->o->expires);
-
- /* Compose outgoing buffer using cached values */
- output = oauth2_cache_to_token();
- }
-
- if (pthread_mutex_unlock(&ctx->token_mutex)){
- flb_plg_error(ctx->ins, "error unlocking mutex");
- if (output) {
- flb_sds_destroy(output);
- }
- return NULL;
- }
-
-
- return output;
-}
-
-void replace_prefix_dot(flb_sds_t s, int tag_prefix_len)
-{
- int i;
- int str_len;
- char c;
-
- if (!s) {
- return;
- }
-
- str_len = flb_sds_len(s);
- if (tag_prefix_len > str_len) {
- flb_error("[output] tag_prefix shouldn't be longer than local_resource_id");
- return;
- }
-
- for (i = 0; i < tag_prefix_len; i++) {
- c = s[i];
-
- if (c == '.') {
- s[i] = '_';
- }
- }
-}
-
-static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map,
- const char *key, int key_size)
-{
- int i;
- msgpack_object k;
- msgpack_object v;
- flb_sds_t ptr = NULL;
-
- for (i = 0; i < map.size; i++) {
- k = map.ptr[i].key;
- v = map.ptr[i].val;
-
- if (k.type != MSGPACK_OBJECT_STR) {
- continue;
- }
-
- if (k.via.str.size == key_size &&
- strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) {
- /* make sure to free it after use */
- ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size);
- break;
- }
- }
-
- return ptr;
-}
-
-/* parse_monitored_resource is to extract the monitoired resource labels
- * from "logging.googleapis.com/monitored_resource" in log data
- * and append to 'resource'/'labels' in log entry.
- * Monitored resource type is already read from resource field in stackdriver
- * output plugin configuration parameters.
- *
- * The structure of monitored_resource is:
- * {
- * "logging.googleapis.com/monitored_resource": {
- * "labels": {
- * "resource_label": <label_value>,
- * }
- * }
- * }
- * See https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types
- * for required labels for each monitored resource.
- */
-
-static int parse_monitored_resource(struct flb_stackdriver *ctx, const void *data, size_t bytes, msgpack_packer *mp_pck)
-{
- int ret = -1;
- msgpack_object *obj;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- return -1;
- }
-
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- obj = log_event.body;
-
- msgpack_object_kv *kv = obj->via.map.ptr;
- msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
- for (; kv < kvend; ++kv) {
- if (kv->val.type == MSGPACK_OBJECT_MAP && kv->key.type == MSGPACK_OBJECT_STR
- && strncmp (MONITORED_RESOURCE_KEY, kv->key.via.str.ptr, kv->key.via.str.size) == 0) {
- msgpack_object subobj = kv->val;
- msgpack_object_kv *p = subobj.via.map.ptr;
- msgpack_object_kv *pend = subobj.via.map.ptr + subobj.via.map.size;
- for (; p < pend; ++p) {
- if (p->key.type != MSGPACK_OBJECT_STR || p->val.type != MSGPACK_OBJECT_MAP) {
- continue;
- }
- if (strncmp("labels", p->key.via.str.ptr, p->key.via.str.size) == 0) {
- msgpack_object labels = p->val;
- msgpack_object_kv *q = labels.via.map.ptr;
- msgpack_object_kv *qend = labels.via.map.ptr + labels.via.map.size;
- int fields = 0;
- for (; q < qend; ++q) {
- if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
- flb_plg_error(ctx->ins, "Key and value should be string in the %s/labels", MONITORED_RESOURCE_KEY);
- }
- ++fields;
- }
- if (fields > 0) {
- msgpack_pack_map(mp_pck, fields);
- q = labels.via.map.ptr;
- for (; q < qend; ++q) {
- if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
- continue;
- }
- flb_plg_debug(ctx->ins, "[%s] found in the payload", MONITORED_RESOURCE_KEY);
- msgpack_pack_str(mp_pck, q->key.via.str.size);
- msgpack_pack_str_body(mp_pck, q->key.via.str.ptr, q->key.via.str.size);
- msgpack_pack_str(mp_pck, q->val.via.str.size);
- msgpack_pack_str_body(mp_pck, q->val.via.str.ptr, q->val.via.str.size);
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- return 0;
- }
- }
- }
- }
- }
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- flb_plg_debug(ctx->ins, "[%s] not found in the payload", MONITORED_RESOURCE_KEY);
-
- return ret;
-}
-
-/*
- * Given a local_resource_id, split the content using the proper separator generating
- * a linked list to store the spliited string
- */
-static struct mk_list *parse_local_resource_id_to_list(char *local_resource_id, char *type)
-{
- int ret = -1;
- int max_split = -1;
- int len_k8s_container;
- int len_k8s_node;
- int len_k8s_pod;
- struct mk_list *list;
-
- len_k8s_container = sizeof(K8S_CONTAINER) - 1;
- len_k8s_node = sizeof(K8S_NODE) - 1;
- len_k8s_pod = sizeof(K8S_POD) - 1;
-
- /* Allocate list head */
- list = flb_malloc(sizeof(struct mk_list));
- if (!list) {
- flb_errno();
- return NULL;
- }
- mk_list_init(list);
-
- /* Determinate the max split value based on type */
- if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
- /* including the prefix of tag */
- max_split = 4;
- }
- else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
- max_split = 2;
- }
- else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
- max_split = 3;
- }
-
- /* The local_resource_id is splitted by '.' */
- ret = flb_slist_split_string(list, local_resource_id, '.', max_split);
-
- if (ret == -1 || mk_list_size(list) != max_split) {
- flb_error("error parsing local_resource_id [%s] for type %s", local_resource_id, type);
- flb_slist_destroy(list);
- flb_free(list);
- return NULL;
- }
-
- return list;
-}
-
-/*
- * extract_local_resource_id():
- * - extract the value from "logging.googleapis.com/local_resource_id" field
- * - if local_resource_id is missing from the payLoad, use the tag of the log
- */
-static int extract_local_resource_id(const void *data, size_t bytes,
- struct flb_stackdriver *ctx, const char *tag) {
- msgpack_object_map map;
- flb_sds_t local_resource_id;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
- int ret;
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- return -1;
- }
-
- if ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- map = log_event.body->via.map;
- local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY,
- LEN_LOCAL_RESOURCE_ID_KEY);
-
- if (local_resource_id == NULL) {
- /* if local_resource_id is not found, use the tag of the log */
- flb_plg_debug(ctx->ins, "local_resource_id not found, "
- "tag [%s] is assigned for local_resource_id", tag);
- local_resource_id = flb_sds_create(tag);
- }
-
- /* we need to create up the local_resource_id from previous log */
- if (ctx->local_resource_id) {
- flb_sds_destroy(ctx->local_resource_id);
- }
-
- ctx->local_resource_id = flb_sds_create(local_resource_id);
-
- flb_sds_destroy(local_resource_id);
-
- ret = 0;
- }
- else {
- flb_plg_error(ctx->ins, "failed to unpack data");
-
- ret = -1;
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- return ret;
-}
-
-/*
- * set_monitored_resource_labels():
- * - use the extracted local_resource_id to assign the label keys for different
- * resource types that are specified in the configuration of stackdriver_out plugin
- */
-static int set_monitored_resource_labels(struct flb_stackdriver *ctx, char *type)
-{
- int ret = -1;
- int first = FLB_TRUE;
- int counter = 0;
- int len_k8s_container;
- int len_k8s_node;
- int len_k8s_pod;
- size_t prefix_len = 0;
- struct local_resource_id_list *ptr;
- struct mk_list *list = NULL;
- struct mk_list *head;
- flb_sds_t new_local_resource_id;
-
- if (!ctx->local_resource_id) {
- flb_plg_error(ctx->ins, "local_resource_is is not assigned");
- return -1;
- }
-
- len_k8s_container = sizeof(K8S_CONTAINER) - 1;
- len_k8s_node = sizeof(K8S_NODE) - 1;
- len_k8s_pod = sizeof(K8S_POD) - 1;
-
- prefix_len = flb_sds_len(ctx->tag_prefix);
- if (flb_sds_casecmp(ctx->tag_prefix, ctx->local_resource_id, prefix_len) != 0) {
- flb_plg_error(ctx->ins, "tag_prefix [%s] doesn't match the prefix of"
- " local_resource_id [%s]", ctx->tag_prefix,
- ctx->local_resource_id);
- return -1;
- }
-
- new_local_resource_id = flb_sds_create_len(ctx->local_resource_id,
- flb_sds_len(ctx->local_resource_id));
- replace_prefix_dot(new_local_resource_id, prefix_len - 1);
-
- if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
- list = parse_local_resource_id_to_list(new_local_resource_id, K8S_CONTAINER);
- if (!list) {
- goto error;
- }
-
- /* iterate through the list */
- mk_list_foreach(head, list) {
- ptr = mk_list_entry(head, struct local_resource_id_list, _head);
- if (first) {
- first = FLB_FALSE;
- continue;
- }
-
- /* Follow the order of fields in local_resource_id */
- if (counter == 0) {
- if (ctx->namespace_name) {
- flb_sds_destroy(ctx->namespace_name);
- }
- ctx->namespace_name = flb_sds_create(ptr->val);
- }
- else if (counter == 1) {
- if (ctx->pod_name) {
- flb_sds_destroy(ctx->pod_name);
- }
- ctx->pod_name = flb_sds_create(ptr->val);
- }
- else if (counter == 2) {
- if (ctx->container_name) {
- flb_sds_destroy(ctx->container_name);
- }
- ctx->container_name = flb_sds_create(ptr->val);
- }
-
- counter++;
- }
-
- if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) {
- goto error;
- }
- }
- else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
- list = parse_local_resource_id_to_list(new_local_resource_id, K8S_NODE);
- if (!list) {
- goto error;
- }
-
- mk_list_foreach(head, list) {
- ptr = mk_list_entry(head, struct local_resource_id_list, _head);
- if (first) {
- first = FLB_FALSE;
- continue;
- }
-
- if (ptr != NULL) {
- if (ctx->node_name) {
- flb_sds_destroy(ctx->node_name);
- }
- ctx->node_name = flb_sds_create(ptr->val);
- }
- }
-
- if (!ctx->node_name) {
- goto error;
- }
- }
- else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
- list = parse_local_resource_id_to_list(new_local_resource_id, K8S_POD);
- if (!list) {
- goto error;
- }
-
- mk_list_foreach(head, list) {
- ptr = mk_list_entry(head, struct local_resource_id_list, _head);
- if (first) {
- first = FLB_FALSE;
- continue;
- }
-
- /* Follow the order of fields in local_resource_id */
- if (counter == 0) {
- if (ctx->namespace_name) {
- flb_sds_destroy(ctx->namespace_name);
- }
- ctx->namespace_name = flb_sds_create(ptr->val);
- }
- else if (counter == 1) {
- if (ctx->pod_name) {
- flb_sds_destroy(ctx->pod_name);
- }
- ctx->pod_name = flb_sds_create(ptr->val);
- }
-
- counter++;
- }
-
- if (!ctx->namespace_name || !ctx->pod_name) {
- goto error;
- }
- }
-
- ret = 0;
-
- if (list) {
- flb_slist_destroy(list);
- flb_free(list);
- }
- flb_sds_destroy(new_local_resource_id);
-
- return ret;
-
- error:
- if (list) {
- flb_slist_destroy(list);
- flb_free(list);
- }
-
- if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
- if (ctx->namespace_name) {
- flb_sds_destroy(ctx->namespace_name);
- }
-
- if (ctx->pod_name) {
- flb_sds_destroy(ctx->pod_name);
- }
-
- if (ctx->container_name) {
- flb_sds_destroy(ctx->container_name);
- }
- }
- else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
- if (ctx->node_name) {
- flb_sds_destroy(ctx->node_name);
- }
- }
- else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
- if (ctx->namespace_name) {
- flb_sds_destroy(ctx->namespace_name);
- }
-
- if (ctx->pod_name) {
- flb_sds_destroy(ctx->pod_name);
- }
- }
-
- flb_sds_destroy(new_local_resource_id);
- return -1;
-}
-
-static int is_tag_match_regex(struct flb_stackdriver *ctx,
- const char *tag, int tag_len)
-{
- int ret;
- int tag_prefix_len;
- int len_to_be_matched;
- const char *tag_str_to_be_matcheds;
-
- tag_prefix_len = flb_sds_len(ctx->tag_prefix);
- if (tag_len > tag_prefix_len &&
- flb_sds_cmp(ctx->tag_prefix, tag, tag_prefix_len) != 0) {
- return 0;
- }
-
- tag_str_to_be_matcheds = tag + tag_prefix_len;
- len_to_be_matched = tag_len - tag_prefix_len;
- ret = flb_regex_match(ctx->regex,
- (unsigned char *) tag_str_to_be_matcheds,
- len_to_be_matched);
-
- /* 1 -> match; 0 -> doesn't match; < 0 -> error */
- return ret;
-}
-
-static int is_local_resource_id_match_regex(struct flb_stackdriver *ctx)
-{
- int ret;
- int prefix_len;
- int len_to_be_matched;
- const char *str_to_be_matcheds;
-
- if (!ctx->local_resource_id) {
- flb_plg_warn(ctx->ins, "local_resource_id not found in the payload");
- return -1;
- }
-
- prefix_len = flb_sds_len(ctx->tag_prefix);
- str_to_be_matcheds = ctx->local_resource_id + prefix_len;
- len_to_be_matched = flb_sds_len(ctx->local_resource_id) - prefix_len;
-
- ret = flb_regex_match(ctx->regex,
- (unsigned char *) str_to_be_matcheds,
- len_to_be_matched);
-
- /* 1 -> match; 0 -> doesn't match; < 0 -> error */
- return ret;
-}
-
-static void cb_results(const char *name, const char *value,
- size_t vlen, void *data);
-/*
- * extract_resource_labels_from_regex(4) will only be called if the
- * tag or local_resource_id field matches the regex rule
- */
-static int extract_resource_labels_from_regex(struct flb_stackdriver *ctx,
- const char *tag, int tag_len,
- int from_tag)
-{
- int ret = 1;
- int prefix_len;
- int len_to_be_matched;
- int local_resource_id_len;
- const char *str_to_be_matcheds;
- struct flb_regex_search result;
-
- prefix_len = flb_sds_len(ctx->tag_prefix);
- if (from_tag == FLB_TRUE) {
- local_resource_id_len = tag_len;
- str_to_be_matcheds = tag + prefix_len;
- }
- else {
- // this will be called only if the payload contains local_resource_id
- local_resource_id_len = flb_sds_len(ctx->local_resource_id);
- str_to_be_matcheds = ctx->local_resource_id + prefix_len;
- }
-
- len_to_be_matched = local_resource_id_len - prefix_len;
- ret = flb_regex_do(ctx->regex, str_to_be_matcheds, len_to_be_matched, &result);
- if (ret <= 0) {
- flb_plg_warn(ctx->ins, "invalid pattern for given value %s when"
- " extracting resource labels", str_to_be_matcheds);
- return -1;
- }
-
- flb_regex_parse(ctx->regex, &result, cb_results, ctx);
-
- return ret;
-}
-
-static int process_local_resource_id(struct flb_stackdriver *ctx,
- const char *tag, int tag_len, char *type)
-{
- int ret;
-
- // parsing local_resource_id from tag takes higher priority
- if (is_tag_match_regex(ctx, tag, tag_len) > 0) {
- ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_TRUE);
- }
- else if (is_local_resource_id_match_regex(ctx) > 0) {
- ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_FALSE);
- }
- else {
- ret = set_monitored_resource_labels(ctx, type);
- }
-
- return ret;
-}
-
-/*
- * get_payload_labels
- * - Iterate throught the original payload (obj) and find out the entry that matches
- * the labels_key
- * - Used to convert all labels under labels_key to root-level `labels` field
- */
-static msgpack_object *get_payload_labels(struct flb_stackdriver *ctx, msgpack_object *obj)
-{
- int i;
- int len;
- msgpack_object_kv *kv = NULL;
-
- if (!obj || obj->type != MSGPACK_OBJECT_MAP) {
- return NULL;
- }
-
- len = flb_sds_len(ctx->labels_key);
- for (i = 0; i < obj->via.map.size; i++) {
- kv = &obj->via.map.ptr[i];
- if (flb_sds_casecmp(ctx->labels_key, kv->key.via.str.ptr, len) == 0) {
- /* only the first matching entry will be returned */
- return &kv->val;
- }
- }
-
- //flb_plg_debug(ctx->ins, "labels_key [%s] not found in the payload",
- // ctx->labels_key);
- return NULL;
-}
-
-/*
- * pack_resource_labels():
- * - Looks through the resource_labels parameter and appends new key value
- * pair to the log entry.
- * - Supports field access, plaintext assignment and environment variables.
- */
-static int pack_resource_labels(struct flb_stackdriver *ctx,
- struct flb_mp_map_header *mh,
- msgpack_packer *mp_pck,
- const void *data,
- size_t bytes)
-{
- struct mk_list *head;
- struct flb_kv *label_kv;
- struct flb_record_accessor *ra;
- struct flb_ra_value *rval;
- int len;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
- int ret;
-
- if (ctx->should_skip_resource_labels_api == FLB_TRUE) {
- return -1;
- }
-
- len = mk_list_size(&ctx->resource_labels_kvs);
- if (len == 0) {
- return -1;
- }
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- return -1;
- }
-
- if ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
-
- flb_mp_map_header_init(mh, mp_pck);
- mk_list_foreach(head, &ctx->resource_labels_kvs) {
- label_kv = mk_list_entry(head, struct flb_kv, _head);
- /*
- * KVs have the form destination=original, so the original key is the value.
- * If the value starts with '$', it will be processed using record accessor.
- * Otherwise, it will be treated as a plaintext assignment.
- */
- if (label_kv->val[0] == '$') {
- ra = flb_ra_create(label_kv->val, FLB_TRUE);
- rval = flb_ra_get_value_object(ra, *log_event.body);
-
- if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) {
- flb_mp_map_header_append(mh);
- msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
- msgpack_pack_str_body(mp_pck, label_kv->key,
- flb_sds_len(label_kv->key));
- msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string));
- msgpack_pack_str_body(mp_pck, rval->val.string,
- flb_sds_len(rval->val.string));
- flb_ra_key_value_destroy(rval);
- } else {
- flb_plg_warn(ctx->ins, "failed to find a corresponding entry for "
- "resource label entry [%s=%s]", label_kv->key, label_kv->val);
- }
- flb_ra_destroy(ra);
- } else {
- flb_mp_map_header_append(mh);
- msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
- msgpack_pack_str_body(mp_pck, label_kv->key,
- flb_sds_len(label_kv->key));
- msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val));
- msgpack_pack_str_body(mp_pck, label_kv->val,
- flb_sds_len(label_kv->val));
- }
- }
- }
- else {
- flb_plg_error(ctx->ins, "failed to unpack data");
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- return -1;
- }
-
- /* project_id should always be packed from config parameter */
- flb_mp_map_header_append(mh);
- msgpack_pack_str(mp_pck, 10);
- msgpack_pack_str_body(mp_pck, "project_id", 10);
- msgpack_pack_str(mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
-
- flb_log_event_decoder_destroy(&log_decoder);
- flb_mp_map_header_end(mh);
-
- return 0;
-}
-
-static void pack_labels(struct flb_stackdriver *ctx,
- msgpack_packer *mp_pck,
- msgpack_object *payload_labels_ptr)
-{
- int i;
- int labels_size = 0;
- struct mk_list *head;
- struct flb_kv *list_kv;
- msgpack_object_kv *obj_kv = NULL;
-
- /* Determine size of labels map */
- labels_size = mk_list_size(&ctx->config_labels);
- if (payload_labels_ptr != NULL &&
- payload_labels_ptr->type == MSGPACK_OBJECT_MAP) {
- labels_size += payload_labels_ptr->via.map.size;
- }
-
- msgpack_pack_map(mp_pck, labels_size);
-
- /* pack labels from the payload */
- if (payload_labels_ptr != NULL &&
- payload_labels_ptr->type == MSGPACK_OBJECT_MAP) {
-
- for (i = 0; i < payload_labels_ptr->via.map.size; i++) {
- obj_kv = &payload_labels_ptr->via.map.ptr[i];
- msgpack_pack_object(mp_pck, obj_kv->key);
- msgpack_pack_object(mp_pck, obj_kv->val);
- }
- }
-
- /* pack labels set in configuration */
- /* in msgpack duplicate keys are overriden by the last set */
- /* static label keys override payload labels */
- mk_list_foreach(head, &ctx->config_labels){
- list_kv = mk_list_entry(head, struct flb_kv, _head);
- msgpack_pack_str(mp_pck, flb_sds_len(list_kv->key));
- msgpack_pack_str_body(mp_pck, list_kv->key, flb_sds_len(list_kv->key));
- msgpack_pack_str(mp_pck, flb_sds_len(list_kv->val));
- msgpack_pack_str_body(mp_pck, list_kv->val, flb_sds_len(list_kv->val));
- }
-}
-
-static void cb_results(const char *name, const char *value,
- size_t vlen, void *data)
-{
- struct flb_stackdriver *ctx = data;
-
- if (vlen == 0) {
- return;
- }
-
- if (strcmp(name, "pod_name") == 0) {
- if (ctx->pod_name != NULL) {
- flb_sds_destroy(ctx->pod_name);
- }
- ctx->pod_name = flb_sds_create_len(value, vlen);
- }
- else if (strcmp(name, "namespace_name") == 0) {
- if (ctx->namespace_name != NULL) {
- flb_sds_destroy(ctx->namespace_name);
- }
- ctx->namespace_name = flb_sds_create_len(value, vlen);
- }
- else if (strcmp(name, "container_name") == 0) {
- if (ctx->container_name != NULL) {
- flb_sds_destroy(ctx->container_name);
- }
- ctx->container_name = flb_sds_create_len(value, vlen);
- }
- else if (strcmp(name, "node_name") == 0) {
- if (ctx->node_name != NULL) {
- flb_sds_destroy(ctx->node_name);
- }
- ctx->node_name = flb_sds_create_len(value, vlen);
- }
-
- return;
-}
-
-int flb_stackdriver_regex_init(struct flb_stackdriver *ctx)
-{
- /* If a custom regex is not set, use the defaults */
- ctx->regex = flb_regex_create(ctx->custom_k8s_regex);
- if (!ctx->regex) {
- return -1;
- }
-
- return 0;
-}
-
-static int cb_stackdriver_init(struct flb_output_instance *ins,
- struct flb_config *config, void *data)
-{
- int ret;
- int io_flags = FLB_IO_TLS;
- char *token;
- struct flb_stackdriver *ctx;
-
- /* Create config context */
- ctx = flb_stackdriver_conf_create(ins, config);
- if (!ctx) {
- flb_plg_error(ins, "configuration failed");
- return -1;
- }
-
- /* Load config map */
- ret = flb_output_config_map_set(ins, (void *) ctx);
- if (ret == -1) {
- return -1;
- }
-
- /* Set context */
- flb_output_set_context(ins, ctx);
-
- /* Network mode IPv6 */
- if (ins->host.ipv6 == FLB_TRUE) {
- io_flags |= FLB_IO_IPV6;
- }
-
- /* Initialize oauth2 cache pthread keys */
- oauth2_cache_init();
-
- /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */
- pthread_mutex_init(&ctx->token_mutex, NULL);
-
- /* Create Upstream context for Stackdriver Logging (no oauth2 service) */
- ctx->u = flb_upstream_create_url(config, FLB_STD_WRITE_URL,
- io_flags, ins->tls);
- ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_server,
- FLB_IO_TCP, NULL);
-
- /* Create oauth2 context */
- ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000);
-
- if (!ctx->u) {
- flb_plg_error(ctx->ins, "upstream creation failed");
- return -1;
- }
- if (!ctx->metadata_u) {
- flb_plg_error(ctx->ins, "metadata upstream creation failed");
- return -1;
- }
- if (!ctx->o) {
- flb_plg_error(ctx->ins, "cannot create oauth2 context");
- return -1;
- }
- flb_output_upstream_set(ctx->u, ins);
-
- /* Metadata Upstream Sync flags */
- flb_stream_disable_async_mode(&ctx->metadata_u->base);
-
- if (ins->test_mode == FLB_FALSE) {
- /* Retrieve oauth2 token */
- token = get_google_token(ctx);
- if (!token) {
- flb_plg_warn(ctx->ins, "token retrieval failed");
- }
- else {
- flb_sds_destroy(token);
- }
- }
-
- if (ctx->metadata_server_auth) {
- ret = gce_metadata_read_project_id(ctx);
- if (ret == -1) {
- return -1;
- }
-
- if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
- && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) {
- ret = gce_metadata_read_zone(ctx);
- if (ret == -1) {
- return -1;
- }
-
- ret = gce_metadata_read_instance_id(ctx);
- if (ret == -1) {
- return -1;
- }
- }
- }
-
- /* Validate project_id */
- if (!ctx->project_id) {
- flb_plg_error(ctx->ins, "property 'project_id' is not set");
- return -1;
- }
-
- if (!ctx->export_to_project_id) {
- ctx->export_to_project_id = ctx->project_id;
- }
-
- ret = flb_stackdriver_regex_init(ctx);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "failed to init stackdriver custom regex");
- return -1;
- }
-
- return 0;
-}
-
-static int validate_severity_level(severity_t * s,
- const char * str,
- const unsigned int str_size)
-{
- int i = 0;
-
- const static struct {
- severity_t s;
- const unsigned int str_size;
- const char * str;
- } enum_mapping[] = {
- {FLB_STD_EMERGENCY, 9, "EMERGENCY"},
- {FLB_STD_EMERGENCY, 5, "EMERG" },
-
- {FLB_STD_ALERT , 1, "A" },
- {FLB_STD_ALERT , 5, "ALERT" },
-
- {FLB_STD_CRITICAL , 1, "C" },
- {FLB_STD_CRITICAL , 1, "F" },
- {FLB_STD_CRITICAL , 4, "CRIT" },
- {FLB_STD_CRITICAL , 5, "FATAL" },
- {FLB_STD_CRITICAL , 8, "CRITICAL" },
-
- {FLB_STD_ERROR , 1, "E" },
- {FLB_STD_ERROR , 3, "ERR" },
- {FLB_STD_ERROR , 5, "ERROR" },
- {FLB_STD_ERROR , 6, "SEVERE" },
-
- {FLB_STD_WARNING , 1, "W" },
- {FLB_STD_WARNING , 4, "WARN" },
- {FLB_STD_WARNING , 7, "WARNING" },
-
- {FLB_STD_NOTICE , 1, "N" },
- {FLB_STD_NOTICE , 6, "NOTICE" },
-
- {FLB_STD_INFO , 1, "I" },
- {FLB_STD_INFO , 4, "INFO" },
-
- {FLB_STD_DEBUG , 1, "D" },
- {FLB_STD_DEBUG , 5, "DEBUG" },
- {FLB_STD_DEBUG , 5, "TRACE" },
- {FLB_STD_DEBUG , 9, "TRACE_INT"},
- {FLB_STD_DEBUG , 4, "FINE" },
- {FLB_STD_DEBUG , 5, "FINER" },
- {FLB_STD_DEBUG , 6, "FINEST" },
- {FLB_STD_DEBUG , 6, "CONFIG" },
-
- {FLB_STD_DEFAULT , 7, "DEFAULT" }
- };
-
- for (i = 0; i < sizeof (enum_mapping) / sizeof (enum_mapping[0]); ++i) {
- if (enum_mapping[i].str_size != str_size) {
- continue;
- }
-
- if (strncasecmp(str, enum_mapping[i].str, str_size) == 0) {
- *s = enum_mapping[i].s;
- return 0;
- }
- }
- return -1;
-}
-
-static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o,
- const flb_sds_t key, const int key_size,
- msgpack_object_type type)
-{
- int i = 0;
- msgpack_object_kv * p = NULL;
-
- if (o == NULL || subobj == NULL) {
- return -1;
- }
-
- for (i = 0; i < o->via.map.size; i++) {
- p = &o->via.map.ptr[i];
- if (p->val.type != type) {
- continue;
- }
-
- if (flb_sds_cmp(key, p->key.via.str.ptr, p->key.via.str.size) == 0) {
- *subobj = p->val;
- return 0;
- }
- }
- return -1;
-}
-
-static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key)
-{
- msgpack_object tmp;
- if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) {
- *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size);
- return 0;
- }
-
- *s = 0;
- return -1;
-}
-
-static int get_severity_level(severity_t * s, const msgpack_object * o,
- const flb_sds_t key)
-{
- msgpack_object tmp;
- if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0
- && validate_severity_level(s, tmp.via.str.ptr, tmp.via.str.size) == 0) {
- return 0;
- }
- *s = 0;
- return -1;
-}
-
-static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * src_obj,
- const flb_sds_t key)
-{
- msgpack_object tmp;
- int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN);
-
- if (ret == 0 && tmp.via.boolean == true) {
- *trace_sampled_value = FLB_TRUE;
- return 0;
- } else if (ret == 0 && tmp.via.boolean == false) {
- *trace_sampled_value = FLB_FALSE;
- return 0;
- }
-
- return -1;
-}
-
-static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
- const msgpack_object * obj)
-{
- int i = 0;
- msgpack_object_kv * p = NULL;
- insert_id_status ret = INSERTID_NOT_PRESENT;
-
- if (obj == NULL) {
- return ret;
- }
-
- for (i = 0; i < obj->via.map.size; i++) {
- p = &obj->via.map.ptr[i];
- if (p->key.type != MSGPACK_OBJECT_STR) {
- continue;
- }
- if (validate_key(p->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
- if (p->val.type == MSGPACK_OBJECT_STR && p->val.via.str.size > 0) {
- *insert_id_value = p->val;
- ret = INSERTID_VALID;
- }
- else {
- ret = INSERTID_INVALID;
- }
- break;
- }
- }
- return ret;
-}
-
-static int pack_json_payload(int insert_id_extracted,
- int operation_extracted, int operation_extra_size,
- int source_location_extracted,
- int source_location_extra_size,
- int http_request_extracted,
- int http_request_extra_size,
- timestamp_status tms_status,
- msgpack_packer *mp_pck, msgpack_object *obj,
- struct flb_stackdriver *ctx)
-{
- /* Specified fields include local_resource_id, operation, sourceLocation ... */
- int i, j;
- int to_remove = 0;
- int ret;
- int map_size;
- int new_map_size;
- int len;
- int len_to_be_removed;
- int key_not_found;
- flb_sds_t removed;
- flb_sds_t monitored_resource_key;
- flb_sds_t local_resource_id_key;
- flb_sds_t stream;
- msgpack_object_kv *kv = obj->via.map.ptr;
- msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
-
- monitored_resource_key = flb_sds_create(MONITORED_RESOURCE_KEY);
- local_resource_id_key = flb_sds_create(LOCAL_RESOURCE_ID_KEY);
- stream = flb_sds_create("stream");
- /*
- * array of elements that need to be removed from payload,
- * special field 'operation' will be processed individually
- */
- flb_sds_t to_be_removed[] =
- {
- monitored_resource_key,
- local_resource_id_key,
- ctx->labels_key,
- ctx->severity_key,
- ctx->trace_key,
- ctx->span_id_key,
- ctx->trace_sampled_key,
- ctx->log_name_key,
- stream
- /* more special fields are required to be added, but, if this grows with more
- than a few records, it might need to be converted to flb_hash
- */
- };
-
- if (insert_id_extracted == FLB_TRUE) {
- to_remove += 1;
- }
- if (operation_extracted == FLB_TRUE && operation_extra_size == 0) {
- to_remove += 1;
- }
- if (source_location_extracted == FLB_TRUE && source_location_extra_size == 0) {
- to_remove += 1;
- }
- if (http_request_extracted == FLB_TRUE && http_request_extra_size == 0) {
- to_remove += 1;
- }
- if (tms_status == FORMAT_TIMESTAMP_OBJECT) {
- to_remove += 1;
- }
- if (tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
- to_remove += 2;
- }
-
- map_size = obj->via.map.size;
- len_to_be_removed = sizeof(to_be_removed) / sizeof(to_be_removed[0]);
- for (i = 0; i < map_size; i++) {
- kv = &obj->via.map.ptr[i];
- len = kv->key.via.str.size;
- for (j = 0; j < len_to_be_removed; j++) {
- removed = to_be_removed[j];
- /*
- * check length of key to avoid partial matching
- * e.g. labels key = labels && kv->key = labelss
- */
- if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
- to_remove += 1;
- break;
- }
- }
- }
-
- new_map_size = map_size - to_remove;
-
- ret = msgpack_pack_map(mp_pck, new_map_size);
- if (ret < 0) {
- goto error;
- }
-
- /* points back to the beginning of map */
- kv = obj->via.map.ptr;
- for(; kv != kvend; ++kv ) {
- key_not_found = 1;
-
- /* processing logging.googleapis.com/insertId */
- if (insert_id_extracted == FLB_TRUE
- && validate_key(kv->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
- continue;
- }
-
- /* processing logging.googleapis.com/operation */
- if (validate_key(kv->key, OPERATION_FIELD_IN_JSON,
- OPERATION_KEY_SIZE)
- && kv->val.type == MSGPACK_OBJECT_MAP) {
- if (operation_extra_size > 0) {
- msgpack_pack_object(mp_pck, kv->key);
- pack_extra_operation_subfields(mp_pck, &kv->val, operation_extra_size);
- }
- continue;
- }
-
- if (validate_key(kv->key, SOURCELOCATION_FIELD_IN_JSON,
- SOURCE_LOCATION_SIZE)
- && kv->val.type == MSGPACK_OBJECT_MAP) {
-
- if (source_location_extra_size > 0) {
- msgpack_pack_object(mp_pck, kv->key);
- pack_extra_source_location_subfields(mp_pck, &kv->val,
- source_location_extra_size);
- }
- continue;
- }
-
- if (validate_key(kv->key, ctx->http_request_key,
- ctx->http_request_key_size)
- && kv->val.type == MSGPACK_OBJECT_MAP) {
-
- if(http_request_extra_size > 0) {
- msgpack_pack_object(mp_pck, kv->key);
- pack_extra_http_request_subfields(mp_pck, &kv->val,
- http_request_extra_size);
- }
- continue;
- }
-
- if (validate_key(kv->key, "timestamp", 9)
- && tms_status == FORMAT_TIMESTAMP_OBJECT) {
- continue;
- }
-
- if (validate_key(kv->key, "timestampSeconds", 16)
- && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
- continue;
- }
- if (validate_key(kv->key, "timestampNanos", 14)
- && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
- continue;
- }
-
- len = kv->key.via.str.size;
- for (j = 0; j < len_to_be_removed; j++) {
- removed = to_be_removed[j];
- if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
- key_not_found = 0;
- break;
- }
- }
-
- if (key_not_found) {
- ret = msgpack_pack_object(mp_pck, kv->key);
- if (ret < 0) {
- goto error;
- }
- ret = msgpack_pack_object(mp_pck, kv->val);
- if (ret < 0) {
- goto error;
- }
- }
- }
-
- flb_sds_destroy(monitored_resource_key);
- flb_sds_destroy(local_resource_id_key);
- flb_sds_destroy(stream);
- return 0;
-
- error:
- flb_sds_destroy(monitored_resource_key);
- flb_sds_destroy(local_resource_id_key);
- flb_sds_destroy(stream);
- return ret;
-}
-
-static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
- int total_records,
- const char *tag, int tag_len,
- const void *data, size_t bytes)
-{
- int len;
- int ret;
- int array_size = 0;
- /* The default value is 3: timestamp, jsonPayload, logName. */
- int entry_size = 3;
- size_t s;
- // size_t off = 0;
- char path[PATH_MAX];
- char time_formatted[255];
- const char *newtag;
- const char *new_log_name;
- msgpack_object *obj;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- flb_sds_t out_buf;
- struct flb_mp_map_header mh;
-
- /* Parameters for severity */
- int severity_extracted = FLB_FALSE;
- severity_t severity;
-
- /* Parameters for trace */
- int trace_extracted = FLB_FALSE;
- flb_sds_t trace;
- char stackdriver_trace[PATH_MAX];
- const char *new_trace;
-
- /* Parameters for span id */
- int span_id_extracted = FLB_FALSE;
- flb_sds_t span_id;
-
- /* Parameters for trace sampled */
- int trace_sampled_extracted = FLB_FALSE;
- int trace_sampled = FLB_FALSE;
-
- /* Parameters for log name */
- int log_name_extracted = FLB_FALSE;
- flb_sds_t log_name = NULL;
- flb_sds_t stream = NULL;
- flb_sds_t stream_key;
-
- /* Parameters for insertId */
- msgpack_object insert_id_obj;
- insert_id_status in_status;
- int insert_id_extracted;
-
- /* Parameters in Operation */
- flb_sds_t operation_id;
- flb_sds_t operation_producer;
- int operation_first = FLB_FALSE;
- int operation_last = FLB_FALSE;
- int operation_extracted = FLB_FALSE;
- int operation_extra_size = 0;
-
- /* Parameters for sourceLocation */
- flb_sds_t source_location_file;
- int64_t source_location_line = 0;
- flb_sds_t source_location_function;
- int source_location_extracted = FLB_FALSE;
- int source_location_extra_size = 0;
-
- /* Parameters for httpRequest */
- struct http_request_field http_request;
- int http_request_extracted = FLB_FALSE;
- int http_request_extra_size = 0;
-
- /* Parameters for Timestamp */
- struct tm tm;
- // struct flb_time tms;
- timestamp_status tms_status;
- /* Count number of records */
- array_size = total_records;
-
- /* Parameters for labels */
- msgpack_object *payload_labels_ptr;
- int labels_size = 0;
-
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- return NULL;
- }
-
- /*
- * Search each entry and validate insertId.
- * Reject the entry if insertId is invalid.
- * If all the entries are rejected, stop formatting.
- *
- */
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- /* Extract insertId */
- in_status = validate_insert_id(&insert_id_obj, log_event.body);
-
- if (in_status == INSERTID_INVALID) {
- flb_plg_error(ctx->ins,
- "Incorrect insertId received. InsertId should be non-empty string.");
- array_size -= 1;
- }
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- /* Sounds like this should compare to -1 instead of zero */
- if (array_size == 0) {
- return NULL;
- }
-
- /* Create temporal msgpack buffer */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /*
- * Pack root map (resource & entries):
- *
- * {"resource": {"type": "...", "labels": {...},
- * "entries": []
- */
- msgpack_pack_map(&mp_pck, 2);
-
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "resource", 8);
-
- /* type & labels */
- msgpack_pack_map(&mp_pck, 2);
-
- /* type */
- msgpack_pack_str(&mp_pck, 4);
- msgpack_pack_str_body(&mp_pck, "type", 4);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->resource));
- msgpack_pack_str_body(&mp_pck, ctx->resource,
- flb_sds_len(ctx->resource));
-
- msgpack_pack_str(&mp_pck, 6);
- msgpack_pack_str_body(&mp_pck, "labels", 6);
-
- ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes);
- if (ret != 0) {
- if (ctx->resource_type == RESOURCE_TYPE_K8S) {
- ret = extract_local_resource_id(data, bytes, ctx, tag);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "fail to construct local_resource_id");
- msgpack_sbuffer_destroy(&mp_sbuf);
- return NULL;
- }
- }
- ret = parse_monitored_resource(ctx, data, bytes, &mp_pck);
- if (ret != 0) {
- if (strcmp(ctx->resource, "global") == 0) {
- /* global resource has field project_id */
- msgpack_pack_map(&mp_pck, 1);
- msgpack_pack_str(&mp_pck, 10);
- msgpack_pack_str_body(&mp_pck, "project_id", 10);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
- }
- else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE
- || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) {
- flb_mp_map_header_init(&mh, &mp_pck);
-
- if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE && ctx->node_id) {
- /* generic_node has fields project_id, location, namespace, node_id */
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "node_id", 7);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->node_id, flb_sds_len(ctx->node_id));
- }
- else {
- /* generic_task has fields project_id, location, namespace, job, task_id */
- if (ctx->job) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 3);
- msgpack_pack_str_body(&mp_pck, "job", 3);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->job));
- msgpack_pack_str_body(&mp_pck,
- ctx->job, flb_sds_len(ctx->job));
- }
-
- if (ctx->task_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "task_id", 7);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->task_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->task_id, flb_sds_len(ctx->task_id));
- }
- }
-
- if (ctx->project_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 10);
- msgpack_pack_str_body(&mp_pck, "project_id", 10);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
- }
-
- if (ctx->location) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "location", 8);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->location));
- msgpack_pack_str_body(&mp_pck,
- ctx->location, flb_sds_len(ctx->location));
- }
-
- if (ctx->namespace_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "namespace", 9);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->namespace_id, flb_sds_len(ctx->namespace_id));
- }
-
- flb_mp_map_header_end(&mh);
- }
- else if (strcmp(ctx->resource, "gce_instance") == 0) {
- /* gce_instance resource has fields project_id, zone, instance_id */
- flb_mp_map_header_init(&mh, &mp_pck);
-
- if (ctx->project_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 10);
- msgpack_pack_str_body(&mp_pck, "project_id", 10);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
- }
-
- if (ctx->zone) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 4);
- msgpack_pack_str_body(&mp_pck, "zone", 4);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->zone));
- msgpack_pack_str_body(&mp_pck, ctx->zone, flb_sds_len(ctx->zone));
- }
-
- if (ctx->instance_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 11);
- msgpack_pack_str_body(&mp_pck, "instance_id", 11);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->instance_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->instance_id, flb_sds_len(ctx->instance_id));
- }
- flb_mp_map_header_end(&mh);
- }
- else if (strcmp(ctx->resource, K8S_CONTAINER) == 0) {
- /* k8s_container resource has fields project_id, location, cluster_name,
- * namespace_name, pod_name, container_name
- *
- * The local_resource_id for k8s_container is in format:
- * k8s_container.<namespace_name>.<pod_name>.<container_name>
- */
-
- ret = process_local_resource_id(ctx, tag, tag_len, K8S_CONTAINER);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "fail to extract resource labels "
- "for k8s_container resource type");
- msgpack_sbuffer_destroy(&mp_sbuf);
- return NULL;
- }
-
- flb_mp_map_header_init(&mh, &mp_pck);
-
- if (ctx->project_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 10);
- msgpack_pack_str_body(&mp_pck, "project_id", 10);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
- }
-
- if (ctx->cluster_location) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "location", 8);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
- msgpack_pack_str_body(&mp_pck,
- ctx->cluster_location,
- flb_sds_len(ctx->cluster_location));
- }
-
- if (ctx->cluster_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 12);
- msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->cluster_name, flb_sds_len(ctx->cluster_name));
- }
-
- if (ctx->namespace_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->namespace_name,
- flb_sds_len(ctx->namespace_name));
- }
-
- if (ctx->pod_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "pod_name", 8);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->pod_name, flb_sds_len(ctx->pod_name));
- }
-
- if (ctx->container_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "container_name", 14);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->container_name,
- flb_sds_len(ctx->container_name));
- }
-
- flb_mp_map_header_end(&mh);
- }
- else if (strcmp(ctx->resource, K8S_NODE) == 0) {
- /* k8s_node resource has fields project_id, location, cluster_name, node_name
- *
- * The local_resource_id for k8s_node is in format:
- * k8s_node.<node_name>
- */
-
- ret = process_local_resource_id(ctx, tag, tag_len, K8S_NODE);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "fail to process local_resource_id from "
- "log entry for k8s_node");
- msgpack_sbuffer_destroy(&mp_sbuf);
- return NULL;
- }
-
- flb_mp_map_header_init(&mh, &mp_pck);
-
- if (ctx->project_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 10);
- msgpack_pack_str_body(&mp_pck, "project_id", 10);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
- }
-
- if (ctx->cluster_location) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "location", 8);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
- msgpack_pack_str_body(&mp_pck,
- ctx->cluster_location,
- flb_sds_len(ctx->cluster_location));
- }
-
- if (ctx->cluster_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 12);
- msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->cluster_name, flb_sds_len(ctx->cluster_name));
- }
-
- if (ctx->node_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "node_name", 9);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->node_name, flb_sds_len(ctx->node_name));
- }
-
- flb_mp_map_header_end(&mh);
- }
- else if (strcmp(ctx->resource, K8S_POD) == 0) {
- /* k8s_pod resource has fields project_id, location, cluster_name,
- * namespace_name, pod_name.
- *
- * The local_resource_id for k8s_pod is in format:
- * k8s_pod.<namespace_name>.<pod_name>
- */
-
- ret = process_local_resource_id(ctx, tag, tag_len, K8S_POD);
- if (ret != 0) {
- flb_plg_error(ctx->ins, "fail to process local_resource_id from "
- "log entry for k8s_pod");
- msgpack_sbuffer_destroy(&mp_sbuf);
- return NULL;
- }
-
- flb_mp_map_header_init(&mh, &mp_pck);
-
- if (ctx->project_id) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 10);
- msgpack_pack_str_body(&mp_pck, "project_id", 10);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
- msgpack_pack_str_body(&mp_pck,
- ctx->project_id, flb_sds_len(ctx->project_id));
- }
-
- if (ctx->cluster_location) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "location", 8);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
- msgpack_pack_str_body(&mp_pck,
- ctx->cluster_location,
- flb_sds_len(ctx->cluster_location));
- }
-
- if (ctx->cluster_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 12);
- msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->cluster_name, flb_sds_len(ctx->cluster_name));
- }
-
- if (ctx->namespace_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->namespace_name,
- flb_sds_len(ctx->namespace_name));
- }
-
- if (ctx->pod_name) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "pod_name", 8);
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
- msgpack_pack_str_body(&mp_pck,
- ctx->pod_name, flb_sds_len(ctx->pod_name));
- }
-
- flb_mp_map_header_end(&mh);
- }
- else {
- flb_plg_error(ctx->ins, "unsupported resource type '%s'",
- ctx->resource);
- msgpack_sbuffer_destroy(&mp_sbuf);
- return NULL;
- }
- }
- }
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "entries", 7);
-
- /* Append entries */
- msgpack_pack_array(&mp_pck, array_size);
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
- msgpack_sbuffer_destroy(&mp_sbuf);
-
- return NULL;
- }
-
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- obj = log_event.body;
- tms_status = extract_timestamp(obj, &log_event.timestamp);
-
- /*
- * Pack entry
- *
- * {
- * "severity": "...",
- * "labels": "...",
- * "logName": "...",
- * "jsonPayload": {...},
- * "timestamp": "...",
- * "spanId": "...",
- * "traceSampled": <true or false>,
- * "trace": "..."
- * }
- */
- entry_size = 3;
-
- /* Extract severity */
- severity_extracted = FLB_FALSE;
- if (ctx->severity_key
- && get_severity_level(&severity, obj, ctx->severity_key) == 0) {
- severity_extracted = FLB_TRUE;
- entry_size += 1;
- }
-
- /* Extract trace */
- trace_extracted = FLB_FALSE;
- if (ctx->trace_key
- && get_string(&trace, obj, ctx->trace_key) == 0) {
- trace_extracted = FLB_TRUE;
- entry_size += 1;
- }
-
- /* Extract span id */
- span_id_extracted = FLB_FALSE;
- if (ctx->span_id_key
- && get_string(&span_id, obj, ctx->span_id_key) == 0) {
- span_id_extracted = FLB_TRUE;
- entry_size += 1;
- }
-
- /* Extract trace sampled */
- trace_sampled_extracted = FLB_FALSE;
- if (ctx->trace_sampled_key
- && get_trace_sampled(&trace_sampled, obj, ctx->trace_sampled_key) == 0) {
- trace_sampled_extracted = FLB_TRUE;
- entry_size += 1;
- }
-
- /* Extract log name */
- log_name_extracted = FLB_FALSE;
- if (ctx->log_name_key
- && get_string(&log_name, obj, ctx->log_name_key) == 0) {
- log_name_extracted = FLB_TRUE;
- }
-
- /* Extract insertId */
- in_status = validate_insert_id(&insert_id_obj, obj);
- if (in_status == INSERTID_VALID) {
- insert_id_extracted = FLB_TRUE;
- entry_size += 1;
- }
- else if (in_status == INSERTID_NOT_PRESENT) {
- insert_id_extracted = FLB_FALSE;
- }
- else {
- if (log_name_extracted == FLB_TRUE) {
- flb_sds_destroy(log_name);
- }
- continue;
- }
-
- /* Extract operation */
- operation_id = flb_sds_create("");
- operation_producer = flb_sds_create("");
- operation_first = FLB_FALSE;
- operation_last = FLB_FALSE;
- operation_extra_size = 0;
- operation_extracted = extract_operation(&operation_id, &operation_producer,
- &operation_first, &operation_last, obj,
- &operation_extra_size);
-
- if (operation_extracted == FLB_TRUE) {
- entry_size += 1;
- }
-
- /* Extract sourceLocation */
- source_location_file = flb_sds_create("");
- source_location_line = 0;
- source_location_function = flb_sds_create("");
- source_location_extra_size = 0;
- source_location_extracted = extract_source_location(&source_location_file,
- &source_location_line,
- &source_location_function,
- obj,
- &source_location_extra_size);
-
- if (source_location_extracted == FLB_TRUE) {
- entry_size += 1;
- }
-
- /* Extract httpRequest */
- init_http_request(&http_request);
- http_request_extra_size = 0;
- http_request_extracted = extract_http_request(&http_request,
- ctx->http_request_key,
- ctx->http_request_key_size,
- obj, &http_request_extra_size);
- if (http_request_extracted == FLB_TRUE) {
- entry_size += 1;
- }
-
- /* Extract payload labels */
- payload_labels_ptr = get_payload_labels(ctx, obj);
- if (payload_labels_ptr != NULL &&
- payload_labels_ptr->type != MSGPACK_OBJECT_MAP) {
- flb_plg_error(ctx->ins, "the type of payload labels should be map");
- flb_sds_destroy(operation_id);
- flb_sds_destroy(operation_producer);
- flb_log_event_decoder_destroy(&log_decoder);
- msgpack_sbuffer_destroy(&mp_sbuf);
- return NULL;
- }
-
- /* Number of parsed labels */
- labels_size = mk_list_size(&ctx->config_labels);
- if (payload_labels_ptr != NULL &&
- payload_labels_ptr->type == MSGPACK_OBJECT_MAP) {
- labels_size += payload_labels_ptr->via.map.size;
- }
-
- if (labels_size > 0) {
- entry_size += 1;
- }
-
- msgpack_pack_map(&mp_pck, entry_size);
-
- /* Add severity into the log entry */
- if (severity_extracted == FLB_TRUE) {
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "severity", 8);
- msgpack_pack_int(&mp_pck, severity);
- }
-
- /* Add trace into the log entry */
- if (trace_extracted == FLB_TRUE) {
- msgpack_pack_str(&mp_pck, 5);
- msgpack_pack_str_body(&mp_pck, "trace", 5);
-
- if (ctx->autoformat_stackdriver_trace) {
- len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1,
- "projects/%s/traces/%s", ctx->project_id, trace);
- new_trace = stackdriver_trace;
- }
- else {
- len = flb_sds_len(trace);
- new_trace = trace;
- }
-
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, new_trace, len);
- flb_sds_destroy(trace);
- }
-
- /* Add spanId field into the log entry */
- if (span_id_extracted == FLB_TRUE) {
- msgpack_pack_str_with_body(&mp_pck, "spanId", 6);
- len = flb_sds_len(span_id);
- msgpack_pack_str_with_body(&mp_pck, span_id, len);
- flb_sds_destroy(span_id);
- }
-
- /* Add traceSampled field into the log entry */
- if (trace_sampled_extracted == FLB_TRUE) {
- msgpack_pack_str_with_body(&mp_pck, "traceSampled", 12);
-
- if (trace_sampled == FLB_TRUE) {
- msgpack_pack_true(&mp_pck);
- } else {
- msgpack_pack_false(&mp_pck);
- }
-
- }
-
- /* Add insertId field into the log entry */
- if (insert_id_extracted == FLB_TRUE) {
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "insertId", 8);
- msgpack_pack_object(&mp_pck, insert_id_obj);
- }
-
- /* Add operation field into the log entry */
- if (operation_extracted == FLB_TRUE) {
- add_operation_field(&operation_id, &operation_producer,
- &operation_first, &operation_last, &mp_pck);
- }
-
- /* Add sourceLocation field into the log entry */
- if (source_location_extracted == FLB_TRUE) {
- add_source_location_field(&source_location_file, source_location_line,
- &source_location_function, &mp_pck);
- }
-
- /* Add httpRequest field into the log entry */
- if (http_request_extracted == FLB_TRUE) {
- add_http_request_field(&http_request, &mp_pck);
- }
-
- /* labels */
- if (labels_size > 0) {
- msgpack_pack_str(&mp_pck, 6);
- msgpack_pack_str_body(&mp_pck, "labels", 6);
- pack_labels(ctx, &mp_pck, payload_labels_ptr);
- }
-
- /* Clean up id and producer if operation extracted */
- flb_sds_destroy(operation_id);
- flb_sds_destroy(operation_producer);
- flb_sds_destroy(source_location_file);
- flb_sds_destroy(source_location_function);
- destroy_http_request(&http_request);
-
- /* jsonPayload */
- msgpack_pack_str(&mp_pck, 11);
- msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
- pack_json_payload(insert_id_extracted,
- operation_extracted, operation_extra_size,
- source_location_extracted,
- source_location_extra_size,
- http_request_extracted,
- http_request_extra_size,
- tms_status,
- &mp_pck, obj, ctx);
-
- /* avoid modifying the original tag */
- newtag = tag;
- stream_key = flb_sds_create("stream");
- if (ctx->resource_type == RESOURCE_TYPE_K8S
- && get_string(&stream, obj, stream_key) == 0) {
- if (flb_sds_cmp(stream, STDOUT, flb_sds_len(stream)) == 0) {
- newtag = "stdout";
- }
- else if (flb_sds_cmp(stream, STDERR, flb_sds_len(stream)) == 0) {
- newtag = "stderr";
- }
- }
-
- if (log_name_extracted == FLB_FALSE) {
- new_log_name = newtag;
- }
- else {
- new_log_name = log_name;
- }
-
- /* logName */
- len = snprintf(path, sizeof(path) - 1,
- "projects/%s/logs/%s", ctx->export_to_project_id, new_log_name);
-
- if (log_name_extracted == FLB_TRUE) {
- flb_sds_destroy(log_name);
- }
-
- msgpack_pack_str(&mp_pck, 7);
- msgpack_pack_str_body(&mp_pck, "logName", 7);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, path, len);
- flb_sds_destroy(stream_key);
- flb_sds_destroy(stream);
-
- /* timestamp */
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "timestamp", 9);
-
- /* Format the time */
- /*
- * If format is timestamp_object or timestamp_duo_fields,
- * tms has been updated.
- *
- * If timestamp is not presen,
- * use the default tms(current time).
- */
-
- gmtime_r(&log_event.timestamp.tm.tv_sec, &tm);
- s = strftime(time_formatted, sizeof(time_formatted) - 1,
- FLB_STD_TIME_FMT, &tm);
- len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
- ".%09" PRIu64 "Z",
- (uint64_t) log_event.timestamp.tm.tv_nsec);
- s += len;
-
- msgpack_pack_str(&mp_pck, s);
- msgpack_pack_str_body(&mp_pck, time_formatted, s);
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- /* Convert from msgpack to JSON */
- out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
- msgpack_sbuffer_destroy(&mp_sbuf);
-
- if (!out_buf) {
- flb_plg_error(ctx->ins, "error formatting JSON payload");
- return NULL;
- }
-
- return out_buf;
-}
-
-static int stackdriver_format_test(struct flb_config *config,
- struct flb_input_instance *ins,
- void *plugin_context,
- void *flush_ctx,
- int event_type,
- const char *tag, int tag_len,
- const void *data, size_t bytes,
- void **out_data, size_t *out_size)
-{
- int total_records;
- flb_sds_t payload = NULL;
- struct flb_stackdriver *ctx = plugin_context;
-
- /* Count number of records */
- total_records = flb_mp_count(data, bytes);
-
- payload = stackdriver_format(ctx, total_records,
- (char *) tag, tag_len, data, bytes);
- if (payload == NULL) {
- return -1;
- }
-
- *out_data = payload;
- *out_size = flb_sds_len(payload);
-
- return 0;
-
-}
-
-#ifdef FLB_HAVE_METRICS
-static void update_http_metrics(struct flb_stackdriver *ctx,
- struct flb_event_chunk *event_chunk,
- uint64_t ts,
- int http_status)
-{
- char tmp[32];
-
- /* convert status to string format */
- snprintf(tmp, sizeof(tmp) - 1, "%i", http_status);
- char *name = (char *) flb_output_name(ctx->ins);
-
- /* processed records total */
- cmt_counter_add(ctx->cmt_proc_records_total, ts, event_chunk->total_events,
- 2, (char *[]) {tmp, name});
-
- /* HTTP status */
- if (http_status != STACKDRIVER_NET_ERROR) {
- cmt_counter_inc(ctx->cmt_requests_total, ts, 2, (char *[]) {tmp, name});
- }
-}
-
-static void update_retry_metric(struct flb_stackdriver *ctx,
- struct flb_event_chunk *event_chunk,
- uint64_t ts,
- int http_status, int ret_code)
-{
- char tmp[32];
- char *name = (char *) flb_output_name(ctx->ins);
-
- if (ret_code != FLB_RETRY) {
- return;
- }
-
- /* convert status to string format */
- snprintf(tmp, sizeof(tmp) - 1, "%i", http_status);
- cmt_counter_add(ctx->cmt_retried_records_total,
- ts, event_chunk->total_events, 2, (char *[]) {tmp, name});
-
-}
-#endif
-
-static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *i_ins,
- void *out_context,
- struct flb_config *config)
-{
- (void) i_ins;
- (void) config;
- int ret;
- int ret_code = FLB_RETRY;
- size_t b_sent;
- flb_sds_t token;
- flb_sds_t payload_buf;
- void *compressed_payload_buffer = NULL;
- size_t compressed_payload_size;
- struct flb_stackdriver *ctx = out_context;
- struct flb_connection *u_conn;
- struct flb_http_client *c;
- int compressed = FLB_FALSE;
-#ifdef FLB_HAVE_METRICS
- char *name = (char *) flb_output_name(ctx->ins);
- uint64_t ts = cfl_time_now();
-#endif
-
- /* Get upstream connection */
- u_conn = flb_upstream_conn_get(ctx->u);
- if (!u_conn) {
-#ifdef FLB_HAVE_METRICS
- cmt_counter_inc(ctx->cmt_failed_requests,
- ts, 1, (char *[]) {name});
-
- /* OLD api */
- flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
-
- update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR);
- update_retry_metric(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR, FLB_RETRY);
-#endif
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* Reformat msgpack to stackdriver JSON payload */
- payload_buf = stackdriver_format(ctx,
- event_chunk->total_events,
- event_chunk->tag, flb_sds_len(event_chunk->tag),
- event_chunk->data, event_chunk->size);
- if (!payload_buf) {
-#ifdef FLB_HAVE_METRICS
- cmt_counter_inc(ctx->cmt_failed_requests,
- ts, 1, (char *[]) {name});
-
- /* OLD api */
- flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
-#endif
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* Get or renew Token */
- token = get_google_token(ctx);
- if (!token) {
- flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
- flb_upstream_conn_release(u_conn);
- flb_sds_destroy(payload_buf);
-#ifdef FLB_HAVE_METRICS
- cmt_counter_inc(ctx->cmt_failed_requests,
- ts, 1, (char *[]) {name});
-
- /* OLD api */
- flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
-#endif
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- compressed_payload_buffer = payload_buf;
- compressed_payload_size = flb_sds_len(payload_buf);
- if (ctx->compress_gzip == FLB_TRUE) {
- ret = flb_gzip_compress((void *) payload_buf, flb_sds_len(payload_buf),
- &compressed_payload_buffer, &compressed_payload_size);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
- } else {
- compressed = FLB_TRUE;
- flb_sds_destroy(payload_buf);
- }
- }
-
- /* Compose HTTP Client request */
- c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI,
- compressed_payload_buffer, compressed_payload_size, NULL, 0, NULL, 0);
-
- flb_http_buffer_size(c, 4192);
-
- if (ctx->stackdriver_agent) {
- flb_http_add_header(c, "User-Agent", 10,
- ctx->stackdriver_agent,
- flb_sds_len(ctx->stackdriver_agent));
- }
- else {
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
- }
-
- flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
- flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
- /* Content Encoding: gzip */
- if (compressed == FLB_TRUE) {
- flb_http_set_content_encoding_gzip(c);
- }
-
- /* Send HTTP request */
- ret = flb_http_do(c, &b_sent);
-
- /* validate response */
- if (ret != 0) {
- flb_plg_warn(ctx->ins, "http_do=%i", ret);
- ret_code = FLB_RETRY;
-#ifdef FLB_HAVE_METRICS
- update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR);
-#endif
- }
- else {
- /* The request was issued successfully, validate the 'error' field */
- flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
- if (c->resp.status == 200) {
- ret_code = FLB_OK;
- }
- else if (c->resp.status >= 400 && c->resp.status < 500) {
- ret_code = FLB_ERROR;
- flb_plg_warn(ctx->ins, "error\n%s",
- c->resp.payload);
- }
- else {
- if (c->resp.payload_size > 0) {
- /* we got an error */
- flb_plg_warn(ctx->ins, "error\n%s",
- c->resp.payload);
- }
- else {
- flb_plg_debug(ctx->ins, "response\n%s",
- c->resp.payload);
- }
- ret_code = FLB_RETRY;
- }
- }
-
- /* Update specific stackdriver metrics */
-#ifdef FLB_HAVE_METRICS
- if (ret_code == FLB_OK) {
- cmt_counter_inc(ctx->cmt_successful_requests,
- ts, 1, (char *[]) {name});
-
- /* OLD api */
- flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);
- }
- else {
- cmt_counter_inc(ctx->cmt_failed_requests,
- ts, 1, (char *[]) {name});
-
- /* OLD api */
- flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
- }
-
- /* Update metrics counter by using labels/http status code */
- if (ret == 0) {
- update_http_metrics(ctx, event_chunk, ts, c->resp.status);
- }
-
- /* Update retry count if necessary */
- update_retry_metric(ctx, event_chunk, ts, c->resp.status, ret_code);
-#endif
-
-
- /* Cleanup */
- if (compressed == FLB_TRUE) {
- flb_free(compressed_payload_buffer);
- }
- else {
- flb_sds_destroy(payload_buf);
- }
- flb_sds_destroy(token);
- flb_http_client_destroy(c);
- flb_upstream_conn_release(u_conn);
-
- /* Done */
- FLB_OUTPUT_RETURN(ret_code);
-}
-
-static int cb_stackdriver_exit(void *data, struct flb_config *config)
-{
- struct flb_stackdriver *ctx = data;
-
- if (!ctx) {
- return -1;
- }
-
- flb_stackdriver_conf_destroy(ctx);
- return 0;
-}
-
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, credentials_file),
- "Set the path for the google service credentials file"
- },
- {
- FLB_CONFIG_MAP_STR, "metadata_server", (char *)NULL,
- 0, FLB_FALSE, 0,
- "Set the metadata server"
- },
- {
- FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, client_email),
- "Set the service account email"
- },
- // set in flb_bigquery_oauth_credentials
- {
- FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, private_key),
- "Set the service account secret"
- },
- {
- FLB_CONFIG_MAP_STR, "export_to_project_id", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, export_to_project_id),
- "Export to project id"
- },
- {
- FLB_CONFIG_MAP_STR, "resource", FLB_SDS_RESOURCE_TYPE,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource),
- "Set the resource"
- },
- {
- FLB_CONFIG_MAP_STR, "severity_key", DEFAULT_SEVERITY_KEY,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, severity_key),
- "Set the severity key"
- },
- {
- FLB_CONFIG_MAP_BOOL, "autoformat_stackdriver_trace", "false",
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, autoformat_stackdriver_trace),
- "Autoformat the stackdriver trace"
- },
- {
- FLB_CONFIG_MAP_STR, "trace_key", DEFAULT_TRACE_KEY,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_key),
- "Set the trace key"
- },
- {
- FLB_CONFIG_MAP_STR, "span_id_key", DEFAULT_SPAN_ID_KEY,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, span_id_key),
- "Set the span id key"
- },
- {
- FLB_CONFIG_MAP_STR, "trace_sampled_key", DEFAULT_TRACE_SAMPLED_KEY,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_sampled_key),
- "Set the trace sampled key"
- },
- {
- FLB_CONFIG_MAP_STR, "log_name_key", DEFAULT_LOG_NAME_KEY,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, log_name_key),
- "Set the logname key"
- },
- {
- FLB_CONFIG_MAP_STR, "http_request_key", HTTPREQUEST_FIELD_IN_JSON,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, http_request_key),
- "Set the http request key"
- },
- {
- FLB_CONFIG_MAP_STR, "k8s_cluster_name", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_name),
- "Set the kubernetes cluster name"
- },
- {
- FLB_CONFIG_MAP_STR, "k8s_cluster_location", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_location),
- "Set the kubernetes cluster location"
- },
- {
- FLB_CONFIG_MAP_STR, "location", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, location),
- "Set the resource location"
- },
- {
- FLB_CONFIG_MAP_STR, "namespace", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, namespace_id),
- "Set the resource namespace"
- },
- {
- FLB_CONFIG_MAP_STR, "node_id", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, node_id),
- "Set the resource node id"
- },
- {
- FLB_CONFIG_MAP_STR, "job", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, job),
- "Set the resource job"
- },
- {
- FLB_CONFIG_MAP_STR, "task_id", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, task_id),
- "Set the resource task id"
- },
- {
- FLB_CONFIG_MAP_STR, "compress", NULL,
- 0, FLB_FALSE, 0,
- "Set log payload compression method. Option available is 'gzip'"
- },
- {
- FLB_CONFIG_MAP_CLIST, "labels", NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels),
- "Set the labels"
- },
- {
- FLB_CONFIG_MAP_STR, "labels_key", DEFAULT_LABELS_KEY,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels_key),
- "Set the labels key"
- },
- {
- FLB_CONFIG_MAP_STR, "tag_prefix", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, tag_prefix),
- "Set the tag prefix"
- },
- {
- FLB_CONFIG_MAP_STR, "stackdriver_agent", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, stackdriver_agent),
- "Set the stackdriver agent"
- },
- /* Custom Regex */
- {
- FLB_CONFIG_MAP_STR, "custom_k8s_regex", DEFAULT_TAG_REGEX,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, custom_k8s_regex),
- "Set a custom kubernetes regex filter"
- },
- {
- FLB_CONFIG_MAP_CLIST, "resource_labels", NULL,
- 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels),
- "Set the resource labels"
- },
- /* EOF */
- {0}
-};
-
-struct flb_output_plugin out_stackdriver_plugin = {
- .name = "stackdriver",
- .description = "Send events to Google Stackdriver Logging",
- .cb_init = cb_stackdriver_init,
- .cb_flush = cb_stackdriver_flush,
- .cb_exit = cb_stackdriver_exit,
- .workers = 1,
- .config_map = config_map,
-
- /* Test */
- .test_formatter.callback = stackdriver_format_test,
-
- /* Plugin flags */
- .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
-};
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver.h b/fluent-bit/plugins/out_stackdriver/stackdriver.h
deleted file mode 100644
index 239a3ee31..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver.h
+++ /dev/null
@@ -1,241 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_OUT_STACKDRIVER_H
-#define FLB_OUT_STACKDRIVER_H
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_oauth2.h>
-#include <fluent-bit/flb_sds.h>
-#include <fluent-bit/flb_pthread.h>
-#include <fluent-bit/flb_regex.h>
-#include <fluent-bit/flb_metrics.h>
-
-/* refresh token every 50 minutes */
-#define FLB_STD_TOKEN_REFRESH 3000
-
-/* Stackdriver Logging write scope */
-#define FLB_STD_SCOPE "https://www.googleapis.com/auth/logging.write"
-
-/* Stackdriver authorization URL */
-#define FLB_STD_AUTH_URL "https://oauth2.googleapis.com/token"
-
-/* Stackdriver Logging 'write' end-point */
-#define FLB_STD_WRITE_URI "/v2/entries:write"
-#define FLB_STD_WRITE_URL "https://logging.googleapis.com" FLB_STD_WRITE_URI
-
-/* Timestamp format */
-#define FLB_STD_TIME_FMT "%Y-%m-%dT%H:%M:%S"
-
-/* Default Resource type */
-#define FLB_SDS_RESOURCE_TYPE "global"
-#define OPERATION_FIELD_IN_JSON "logging.googleapis.com/operation"
-#define MONITORED_RESOURCE_KEY "logging.googleapis.com/monitored_resource"
-#define LOCAL_RESOURCE_ID_KEY "logging.googleapis.com/local_resource_id"
-#define DEFAULT_LABELS_KEY "logging.googleapis.com/labels"
-#define DEFAULT_SEVERITY_KEY "logging.googleapis.com/severity"
-#define DEFAULT_TRACE_KEY "logging.googleapis.com/trace"
-#define DEFAULT_SPAN_ID_KEY "logging.googleapis.com/spanId"
-#define DEFAULT_TRACE_SAMPLED_KEY "logging.googleapis.com/traceSampled"
-#define DEFAULT_LOG_NAME_KEY "logging.googleapis.com/logName"
-#define DEFAULT_INSERT_ID_KEY "logging.googleapis.com/insertId"
-#define SOURCELOCATION_FIELD_IN_JSON "logging.googleapis.com/sourceLocation"
-#define HTTPREQUEST_FIELD_IN_JSON "logging.googleapis.com/http_request"
-#define INSERT_ID_SIZE 31
-#define LEN_LOCAL_RESOURCE_ID_KEY 40
-#define OPERATION_KEY_SIZE 32
-#define SOURCE_LOCATION_SIZE 37
-#define HTTP_REQUEST_KEY_SIZE 35
-
-/*
- * Stackdriver implements a specific HTTP status code that is used internally in clients to keep
- * track of networking errors that could happen before a successful HTTP request/response. For
- * metrics counting purposes, every failed networking connection will use a 502 HTTP status code
- * that can be used later to query the metrics by using labels with that value.
- */
-#define STACKDRIVER_NET_ERROR 502
-
-#define K8S_CONTAINER "k8s_container"
-#define K8S_NODE "k8s_node"
-#define K8S_POD "k8s_pod"
-
-#define STDOUT "stdout"
-#define STDERR "stderr"
-
-#define DEFAULT_TAG_REGEX "(?<pod_name>[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace_name>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\\.log$"
-
-/* Metrics */
-#ifdef FLB_HAVE_METRICS
-#define FLB_STACKDRIVER_SUCCESSFUL_REQUESTS 1000 /* successful requests */
-#define FLB_STACKDRIVER_FAILED_REQUESTS 1001 /* failed requests */
-#endif
-
-struct flb_stackdriver_oauth_credentials {
- /* parsed credentials file */
- flb_sds_t type;
- flb_sds_t private_key_id;
- flb_sds_t private_key;
- flb_sds_t client_email;
- flb_sds_t client_id;
- flb_sds_t auth_uri;
- flb_sds_t token_uri;
-};
-
-struct flb_stackdriver_env {
- flb_sds_t creds_file;
- flb_sds_t metadata_server;
-};
-
-struct flb_stackdriver {
- /* credentials */
- flb_sds_t credentials_file;
-
- /* parsed credentials file */
- flb_sds_t type;
- flb_sds_t project_id;
- flb_sds_t private_key_id;
- flb_sds_t private_key;
- flb_sds_t client_email;
- flb_sds_t client_id;
- flb_sds_t auth_uri;
- flb_sds_t token_uri;
- bool metadata_server_auth;
-
- /* metadata server (GCP specific, WIP) */
- flb_sds_t metadata_server;
- flb_sds_t zone;
- flb_sds_t instance_id;
- flb_sds_t instance_name;
-
- /* kubernetes specific */
- flb_sds_t cluster_name;
- flb_sds_t cluster_location;
- flb_sds_t namespace_name;
- flb_sds_t pod_name;
- flb_sds_t container_name;
- flb_sds_t node_name;
-
- flb_sds_t local_resource_id;
- flb_sds_t tag_prefix;
- /* shadow tag_prefix for safe deallocation */
- flb_sds_t tag_prefix_k8s;
-
- /* labels */
- flb_sds_t labels_key;
- struct mk_list *labels;
- struct mk_list config_labels;
-
- /* resource type flag */
- int resource_type;
-
- /* resource labels api */
- struct mk_list *resource_labels;
- struct mk_list resource_labels_kvs;
- int should_skip_resource_labels_api;
-
- /* generic resources */
- flb_sds_t location;
- flb_sds_t namespace_id;
-
- /* generic_node specific */
- flb_sds_t node_id;
-
- /* generic_task specific */
- flb_sds_t job;
- flb_sds_t task_id;
-
- /* Internal variable to reduce string comparisons */
- int compress_gzip;
-
- /* other */
- flb_sds_t export_to_project_id;
- flb_sds_t resource;
- flb_sds_t severity_key;
- flb_sds_t trace_key;
- flb_sds_t span_id_key;
- flb_sds_t trace_sampled_key;
- flb_sds_t log_name_key;
- flb_sds_t http_request_key;
- int http_request_key_size;
- bool autoformat_stackdriver_trace;
-
- flb_sds_t stackdriver_agent;
-
- /* Regex context to parse tags */
- flb_sds_t custom_k8s_regex;
- struct flb_regex *regex;
-
- /* oauth2 context */
- struct flb_oauth2 *o;
-
- /* parsed oauth2 credentials */
- struct flb_stackdriver_oauth_credentials *creds;
-
- /* environment variable settings */
- struct flb_stackdriver_env *env;
-
- /* mutex for acquiring oauth tokens */
- pthread_mutex_t token_mutex;
-
- /* upstream context for stackdriver write end-point */
- struct flb_upstream *u;
-
- /* upstream context for metadata end-point */
- struct flb_upstream *metadata_u;
-
-#ifdef FLB_HAVE_METRICS
- /* metrics */
- struct cmt_counter *cmt_successful_requests;
- struct cmt_counter *cmt_failed_requests;
- struct cmt_counter *cmt_requests_total;
- struct cmt_counter *cmt_proc_records_total;
- struct cmt_counter *cmt_retried_records_total;
-#endif
-
- /* plugin instance */
- struct flb_output_instance *ins;
-
- /* Fluent Bit context */
- struct flb_config *config;
-};
-
-typedef enum {
- FLB_STD_EMERGENCY = 800,
- FLB_STD_ALERT = 700,
- FLB_STD_CRITICAL = 600,
- FLB_STD_ERROR = 500,
- FLB_STD_WARNING = 400,
- FLB_STD_NOTICE = 300,
- FLB_STD_INFO = 200,
- FLB_STD_DEBUG = 100,
- FLB_STD_DEFAULT = 0
-} severity_t;
-
-struct local_resource_id_list {
- flb_sds_t val;
- struct mk_list _head;
-};
-
-typedef enum {
- INSERTID_VALID = 0,
- INSERTID_INVALID = 1,
- INSERTID_NOT_PRESENT = 2
-} insert_id_status;
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c b/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c
deleted file mode 100644
index 9f3f28a35..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.c
+++ /dev/null
@@ -1,667 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_unescape.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_jsmn.h>
-#include <fluent-bit/flb_sds.h>
-#include <fluent-bit/flb_kv.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-
-#include "gce_metadata.h"
-#include "stackdriver.h"
-#include "stackdriver_conf.h"
-#include "stackdriver_resource_types.h"
-
-static inline int key_cmp(const char *str, int len, const char *cmp) {
-
- if (strlen(cmp) != len) {
- return -1;
- }
-
- return strncasecmp(str, cmp, len);
-}
-
-static int read_credentials_file(const char *cred_file, struct flb_stackdriver *ctx)
-{
- int i;
- int ret;
- int key_len;
- int val_len;
- int tok_size = 32;
- char *buf;
- char *key;
- char *val;
- flb_sds_t tmp;
- struct stat st;
- jsmn_parser parser;
- jsmntok_t *t;
- jsmntok_t *tokens;
-
- /* Validate credentials path */
- ret = stat(cred_file, &st);
- if (ret == -1) {
- flb_errno();
- flb_plg_error(ctx->ins, "cannot open credentials file: %s",
- cred_file);
- return -1;
- }
-
- if (!S_ISREG(st.st_mode) && !S_ISLNK(st.st_mode)) {
- flb_plg_error(ctx->ins, "credentials file "
- "is not a valid file: %s", cred_file);
- return -1;
- }
-
- /* Read file content */
- buf = mk_file_to_buffer(cred_file);
- if (!buf) {
- flb_plg_error(ctx->ins, "error reading credentials file: %s",
- cred_file);
- return -1;
- }
-
- /* Parse content */
- jsmn_init(&parser);
- tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);
- if (!tokens) {
- flb_errno();
- flb_free(buf);
- return -1;
- }
-
- ret = jsmn_parse(&parser, buf, st.st_size, tokens, tok_size);
- if (ret <= 0) {
- flb_plg_error(ctx->ins, "invalid JSON credentials file: %s",
- cred_file);
- flb_free(buf);
- flb_free(tokens);
- return -1;
- }
-
- t = &tokens[0];
- if (t->type != JSMN_OBJECT) {
- flb_plg_error(ctx->ins, "invalid JSON map on file: %s",
- cred_file);
- flb_free(buf);
- flb_free(tokens);
- return -1;
- }
-
- /* Parse JSON tokens */
- for (i = 1; i < ret; i++) {
- t = &tokens[i];
- if (t->type != JSMN_STRING) {
- continue;
- }
-
- if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)){
- break;
- }
-
- /* Key */
- key = buf + t->start;
- key_len = (t->end - t->start);
-
- /* Value */
- i++;
- t = &tokens[i];
- val = buf + t->start;
- val_len = (t->end - t->start);
-
- if (key_cmp(key, key_len, "type") == 0) {
- ctx->creds->type = flb_sds_create_len(val, val_len);
- }
- else if (key_cmp(key, key_len, "project_id") == 0) {
- ctx->project_id = flb_sds_create_len(val, val_len);
- }
- else if (key_cmp(key, key_len, "private_key_id") == 0) {
- ctx->creds->private_key_id = flb_sds_create_len(val, val_len);
- }
- else if (key_cmp(key, key_len, "private_key") == 0) {
- tmp = flb_sds_create_len(val, val_len);
- if (tmp) {
- /* Unescape private key */
- ctx->creds->private_key = flb_sds_create_size(val_len);
- flb_unescape_string(tmp, flb_sds_len(tmp),
- &ctx->creds->private_key);
- flb_sds_destroy(tmp);
- }
- }
- else if (key_cmp(key, key_len, "client_email") == 0) {
- ctx->creds->client_email = flb_sds_create_len(val, val_len);
- }
- else if (key_cmp(key, key_len, "client_id") == 0) {
- ctx->creds->client_id = flb_sds_create_len(val, val_len);
- }
- else if (key_cmp(key, key_len, "auth_uri") == 0) {
- ctx->creds->auth_uri = flb_sds_create_len(val, val_len);
- }
- else if (key_cmp(key, key_len, "token_uri") == 0) {
- ctx->creds->token_uri = flb_sds_create_len(val, val_len);
- }
- }
-
- flb_free(buf);
- flb_free(tokens);
-
- return 0;
-}
-
-/*
- * parse_key_value_list():
- * - Parses an origin list of comma seperated string specifying key=value.
- * - Appends the parsed key value pairs into the destination list.
- * - Returns the length of the destination list.
- */
-static int parse_key_value_list(struct flb_stackdriver *ctx,
- struct mk_list *origin,
- struct mk_list *dest,
- int shouldTrim)
-{
- char *p;
- flb_sds_t key;
- flb_sds_t val;
- struct flb_kv *kv;
- struct mk_list *head;
- struct flb_slist_entry *entry;
-
- if (origin) {
- mk_list_foreach(head, origin) {
- entry = mk_list_entry(head, struct flb_slist_entry, _head);
-
- p = strchr(entry->str, '=');
- if (!p) {
- flb_plg_error(ctx->ins, "invalid key value pair on '%s'",
- entry->str);
- return -1;
- }
-
- key = flb_sds_create_size((p - entry->str) + 1);
- flb_sds_cat(key, entry->str, p - entry->str);
- val = flb_sds_create(p + 1);
- if (shouldTrim) {
- flb_sds_trim(key);
- flb_sds_trim(val);
- }
- if (!key || flb_sds_len(key) == 0) {
- flb_plg_error(ctx->ins,
- "invalid key value pair on '%s'",
- entry->str);
- return -1;
- }
- if (!val || flb_sds_len(val) == 0) {
- flb_plg_error(ctx->ins,
- "invalid key value pair on '%s'",
- entry->str);
- flb_sds_destroy(key);
- return -1;
- }
-
- kv = flb_kv_item_create(dest, key, val);
- flb_sds_destroy(key);
- flb_sds_destroy(val);
-
- if (!kv) {
- return -1;
- }
- }
- }
-
- return mk_list_size(dest);
-}
-
-/*
- * parse_configuration_labels
- * - Parse labels set in configuration
- * - Returns the number of configuration labels
- */
-static int parse_configuration_labels(struct flb_stackdriver *ctx)
-{
- return parse_key_value_list(ctx, ctx->labels,
- &ctx->config_labels, FLB_FALSE);
-}
-
-/*
- * parse_resource_labels():
- * - Parses resource labels set in configuration.
- * - Returns the number of resource label mappings.
- */
-static int parse_resource_labels(struct flb_stackdriver *ctx)
-{
- return parse_key_value_list(ctx, ctx->resource_labels,
- &ctx->resource_labels_kvs, FLB_TRUE);
-}
-
-struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance *ins,
- struct flb_config *config)
-{
- int ret;
- const char *tmp;
- const char *backwards_compatible_env_var;
- struct flb_stackdriver *ctx;
- size_t http_request_key_size;
-
- /* Allocate config context */
- ctx = flb_calloc(1, sizeof(struct flb_stackdriver));
- if (!ctx) {
- flb_errno();
- return NULL;
- }
- ctx->ins = ins;
- ctx->config = config;
-
- ret = flb_output_config_map_set(ins, (void *)ctx);
- if (ret == -1) {
- flb_plg_error(ins, "unable to load configuration");
- flb_free(ctx);
- return NULL;
- }
-
- /* Compress (gzip) */
- tmp = flb_output_get_property("compress", ins);
- ctx->compress_gzip = FLB_FALSE;
- if (tmp && strcasecmp(tmp, "gzip") == 0) {
- ctx->compress_gzip = FLB_TRUE;
- }
-
- /* labels */
- flb_kv_init(&ctx->config_labels);
- ret = parse_configuration_labels((void *)ctx);
- if (ret == -1) {
- flb_plg_error(ins, "unable to parse configuration labels");
- flb_kv_release(&ctx->config_labels);
- flb_free(ctx);
- return NULL;
- }
-
- /* resource labels */
- flb_kv_init(&ctx->resource_labels_kvs);
- ret = parse_resource_labels((void *)ctx);
- if (ret == -1) {
- flb_plg_error(ins, "unable to parse resource label list");
- flb_kv_release(&ctx->resource_labels_kvs);
- flb_free(ctx);
- return NULL;
- }
-
- /* Lookup metadata server URL */
- ctx->metadata_server = NULL;
- tmp = flb_output_get_property("metadata_server", ins);
- if (tmp == NULL) {
- tmp = getenv("METADATA_SERVER");
- if(tmp) {
- if (ctx->env == NULL) {
- ctx->env = flb_calloc(1, sizeof(struct flb_stackdriver_env));
- if (ctx->env == NULL) {
- flb_plg_error(ins, "unable to allocate env variables");
- flb_free(ctx);
- return NULL;
- }
- }
- ctx->env->metadata_server = flb_sds_create(tmp);
- ctx->metadata_server = ctx->env->metadata_server;
- }
- else {
- ctx->metadata_server = flb_sds_create(FLB_STD_METADATA_SERVER);
- }
- }
- else {
- ctx->metadata_server = flb_sds_create(tmp);
- }
- flb_plg_info(ctx->ins, "metadata_server set to %s", ctx->metadata_server);
-
- /* Lookup credentials file */
- if (ctx->credentials_file == NULL) {
- /*
- * Use GOOGLE_APPLICATION_CREDENTIALS to fetch the credentials.
- * GOOGLE_SERVICE_CREDENTIALS is checked for backwards compatibility.
- */
- tmp = getenv("GOOGLE_APPLICATION_CREDENTIALS");
- backwards_compatible_env_var = getenv("GOOGLE_SERVICE_CREDENTIALS");
- if (tmp && backwards_compatible_env_var) {
- flb_plg_warn(ctx->ins, "GOOGLE_APPLICATION_CREDENTIALS and "
- "GOOGLE_SERVICE_CREDENTIALS are both defined. "
- "Defaulting to GOOGLE_APPLICATION_CREDENTIALS");
- }
- if ((tmp || backwards_compatible_env_var) && (ctx->env == NULL)) {
- ctx->env = flb_calloc(1, sizeof(struct flb_stackdriver_env));
- if (ctx->env == NULL) {
- flb_plg_error(ins, "unable to allocate env variables");
- flb_free(ctx);
- return NULL;
- }
- }
- if (tmp) {
- ctx->env->creds_file = flb_sds_create(tmp);
- ctx->credentials_file = ctx->env->creds_file;
- }
- else if (backwards_compatible_env_var) {
- ctx->env->creds_file = flb_sds_create(backwards_compatible_env_var);
- ctx->credentials_file = ctx->env->creds_file;
- }
- }
-
- if (ctx->credentials_file) {
- ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials));
- if (ctx->creds == NULL) {
- flb_plg_error(ctx->ins, "unable to allocate credentials");
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- ret = read_credentials_file(ctx->credentials_file, ctx);
- if (ret != 0) {
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- ctx->type = ctx->creds->type;
- ctx->private_key_id = ctx->creds->private_key_id;
- ctx->private_key = ctx->creds->private_key;
- ctx->client_email = ctx->creds->client_email;
- ctx->client_id = ctx->creds->client_id;
- ctx->auth_uri = ctx->creds->auth_uri;
- ctx->token_uri = ctx->creds->token_uri;
- }
- else {
- /*
- * If no credentials file has been defined, do manual lookup of the
- * client email and the private key
- */
- ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials));
- if (ctx->creds == NULL) {
- flb_plg_error(ctx->ins, "unable to allocate credentials");
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
-
- /* Service Account Email */
- if (ctx->client_email == NULL) {
- tmp = getenv("SERVICE_ACCOUNT_EMAIL");
- if (tmp) {
- ctx->creds->client_email = flb_sds_create(tmp);
- }
- }
-
- /* Service Account Secret */
- if (ctx->private_key == NULL) {
- tmp = getenv("SERVICE_ACCOUNT_SECRET");
- if (tmp) {
- ctx->creds->private_key = flb_sds_create(tmp);
- }
- }
-
- ctx->private_key = ctx->creds->private_key;
- ctx->client_email = ctx->creds->client_email;
- }
-
- /*
- * If only client email has been provided, fetch token from
- * the GCE metadata server.
- *
- * If no credentials have been provided, fetch token from the GCE
- * metadata server for default account.
- */
- if (!ctx->client_email && ctx->private_key) {
- flb_plg_error(ctx->ins, "client_email is not defined");
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
-
- if (!ctx->client_email) {
- flb_plg_warn(ctx->ins, "client_email is not defined, using "
- "a default one");
- if (ctx->creds == NULL) {
- ctx->creds = flb_calloc(1, sizeof(struct flb_stackdriver_oauth_credentials));
- if (ctx->creds == NULL) {
- flb_plg_error(ctx->ins, "unable to allocate credentials");
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- }
- ctx->creds->client_email = flb_sds_create("default");
- ctx->client_email = ctx->creds->client_email;
- }
- if (!ctx->private_key) {
- flb_plg_warn(ctx->ins, "private_key is not defined, fetching "
- "it from metadata server");
- ctx->metadata_server_auth = true;
- }
-
- if (ctx->http_request_key) {
- http_request_key_size = flb_sds_len(ctx->http_request_key);
- if (http_request_key_size >= INT_MAX) {
- flb_plg_error(ctx->ins, "http_request_key is too long");
- flb_sds_destroy(ctx->http_request_key);
- ctx->http_request_key = NULL;
- ctx->http_request_key_size = 0;
- } else {
- ctx->http_request_key_size = http_request_key_size;
- }
- }
-
- set_resource_type(ctx);
-
- if (resource_api_has_required_labels(ctx) == FLB_FALSE) {
-
- if (ctx->resource_type == RESOURCE_TYPE_K8S) {
- if (!ctx->cluster_name || !ctx->cluster_location) {
- flb_plg_error(ctx->ins, "missing k8s_cluster_name "
- "or k8s_cluster_location in configuration");
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- }
-
- else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE
- || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) {
-
- if (ctx->location == NULL) {
- flb_plg_error(ctx->ins, "missing generic resource's location");
- }
-
- if (ctx->namespace_id == NULL) {
- flb_plg_error(ctx->ins, "missing generic resource's namespace");
- }
-
- if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE) {
- if (ctx->node_id == NULL) {
- flb_plg_error(ctx->ins, "missing generic_node's node_id");
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- }
- else {
- if (ctx->job == NULL) {
- flb_plg_error(ctx->ins, "missing generic_task's job");
- }
-
- if (ctx->task_id == NULL) {
- flb_plg_error(ctx->ins, "missing generic_task's task_id");
- }
-
- if (!ctx->job || !ctx->task_id) {
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- }
-
- if (!ctx->location || !ctx->namespace_id) {
- flb_stackdriver_conf_destroy(ctx);
- return NULL;
- }
- }
- }
-
-
- if (ctx->tag_prefix == NULL && ctx->resource_type == RESOURCE_TYPE_K8S) {
- /* allocate the flb_sds_t to tag_prefix_k8s so we can safely deallocate it */
- ctx->tag_prefix_k8s = flb_sds_create(ctx->resource);
- ctx->tag_prefix_k8s = flb_sds_cat(ctx->tag_prefix_k8s, ".", 1);
- ctx->tag_prefix = ctx->tag_prefix_k8s;
- }
-
- /* Register metrics */
-#ifdef FLB_HAVE_METRICS
- ctx->cmt_successful_requests = cmt_counter_create(ins->cmt,
- "fluentbit",
- "stackdriver",
- "successful_requests",
- "Total number of successful "
- "requests.",
- 1, (char *[]) {"name"});
-
- ctx->cmt_failed_requests = cmt_counter_create(ins->cmt,
- "fluentbit",
- "stackdriver",
- "failed_requests",
- "Total number of failed "
- "requests.",
- 1, (char *[]) {"name"});
-
- ctx->cmt_requests_total = cmt_counter_create(ins->cmt,
- "fluentbit",
- "stackdriver",
- "requests_total",
- "Total number of requests.",
- 2, (char *[]) {"status", "name"});
-
- ctx->cmt_proc_records_total = cmt_counter_create(ins->cmt,
- "fluentbit",
- "stackdriver",
- "proc_records_total",
- "Total number of processed records.",
- 2, (char *[]) {"status", "name"});
-
- ctx->cmt_retried_records_total = cmt_counter_create(ins->cmt,
- "fluentbit",
- "stackdriver",
- "retried_records_total",
- "Total number of retried records.",
- 2, (char *[]) {"status", "name"});
-
- /* OLD api */
- flb_metrics_add(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS,
- "stackdriver_successful_requests", ctx->ins->metrics);
- flb_metrics_add(FLB_STACKDRIVER_FAILED_REQUESTS,
- "stackdriver_failed_requests", ctx->ins->metrics);
-#endif
-
- return ctx;
-}
-
-int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx)
-{
- if (!ctx) {
- return -1;
- }
-
- if (ctx->creds) {
- if (ctx->creds->type) {
- flb_sds_destroy(ctx->creds->type);
- }
- if (ctx->creds->private_key_id) {
- flb_sds_destroy(ctx->creds->private_key_id);
- }
- if (ctx->creds->private_key) {
- flb_sds_destroy(ctx->creds->private_key);
- }
- if (ctx->creds->client_email) {
- flb_sds_destroy(ctx->creds->client_email);
- }
- if (ctx->creds->client_id) {
- flb_sds_destroy(ctx->creds->client_id);
- }
- if (ctx->creds->auth_uri) {
- flb_sds_destroy(ctx->creds->auth_uri);
- }
- if (ctx->creds->token_uri) {
- flb_sds_destroy(ctx->creds->token_uri);
- }
- flb_free(ctx->creds);
- }
-
- if (ctx->env) {
- if (ctx->env->creds_file) {
- flb_sds_destroy(ctx->env->creds_file);
- }
- if (ctx->env->metadata_server) {
- flb_sds_destroy(ctx->env->metadata_server);
- /*
- * If ctx->env is not NULL,
- * ctx->metadata_server points ctx->env->metadata_server.
- *
- * We set ctx->metadata_server to NULL to prevent double free.
- */
- ctx->metadata_server = NULL;
- }
- flb_free(ctx->env);
- }
-
- if (ctx->metadata_server) {
- flb_sds_destroy(ctx->metadata_server);
- }
-
- if (ctx->resource_type == RESOURCE_TYPE_K8S){
- flb_sds_destroy(ctx->namespace_name);
- flb_sds_destroy(ctx->pod_name);
- flb_sds_destroy(ctx->container_name);
- flb_sds_destroy(ctx->node_name);
- flb_sds_destroy(ctx->local_resource_id);
- }
-
- if (ctx->metadata_server_auth) {
- flb_sds_destroy(ctx->zone);
- flb_sds_destroy(ctx->instance_id);
- }
-
- if (ctx->metadata_u) {
- flb_upstream_destroy(ctx->metadata_u);
- }
-
- if (ctx->u) {
- flb_upstream_destroy(ctx->u);
- }
-
- if (ctx->o) {
- flb_oauth2_destroy(ctx->o);
- }
-
- if (ctx->regex) {
- flb_regex_destroy(ctx->regex);
- }
-
- if (ctx->project_id) {
- flb_sds_destroy(ctx->project_id);
- }
-
- if (ctx->tag_prefix_k8s) {
- flb_sds_destroy(ctx->tag_prefix_k8s);
- }
-
- flb_kv_release(&ctx->config_labels);
- flb_kv_release(&ctx->resource_labels_kvs);
- flb_free(ctx);
-
- return 0;
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.h b/fluent-bit/plugins/out_stackdriver/stackdriver_conf.h
deleted file mode 100644
index 6244e6851..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_conf.h
+++ /dev/null
@@ -1,29 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_OUT_STACKDRIVER_CONF_H
-#define FLB_OUT_STACKDRIVER_CONF_H
-
-#include "stackdriver.h"
-
-struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance *ins,
- struct flb_config *config);
-int flb_stackdriver_conf_destroy(struct flb_stackdriver *ctx);
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.c b/fluent-bit/plugins/out_stackdriver/stackdriver_helper.c
deleted file mode 100644
index e5b94c481..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.c
+++ /dev/null
@@ -1,63 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include "stackdriver.h"
-
-int equal_obj_str(msgpack_object obj, const char *str, const int size) {
- if (obj.type != MSGPACK_OBJECT_STR) {
- return FLB_FALSE;
- }
- if (size != obj.via.str.size
- || strncmp(str, obj.via.str.ptr, obj.via.str.size) != 0) {
- return FLB_FALSE;
- }
- return FLB_TRUE;
-}
-
-int validate_key(msgpack_object obj, const char *str, const int size) {
- return equal_obj_str(obj, str, size);
-}
-
-void try_assign_subfield_str(msgpack_object obj, flb_sds_t *subfield) {
- if (obj.type == MSGPACK_OBJECT_STR) {
- *subfield = flb_sds_copy(*subfield, obj.via.str.ptr,
- obj.via.str.size);
- }
-}
-
-void try_assign_subfield_bool(msgpack_object obj, int *subfield) {
- if (obj.type == MSGPACK_OBJECT_BOOLEAN) {
- if (obj.via.boolean) {
- *subfield = FLB_TRUE;
- }
- else {
- *subfield = FLB_FALSE;
- }
- }
-}
-
-void try_assign_subfield_int(msgpack_object obj, int64_t *subfield) {
- if (obj.type == MSGPACK_OBJECT_STR) {
- *subfield = atoll(obj.via.str.ptr);
- }
- else if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
- *subfield = obj.via.i64;
- }
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.h b/fluent-bit/plugins/out_stackdriver/stackdriver_helper.h
deleted file mode 100644
index ab8ac30a8..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_helper.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef FLB_STD_HELPER_H
-#define FLB_STD_HELPER_H
-
-#include "stackdriver.h"
-
-/*
- * Compare obj->via.str and str.
- * Return FLB_TRUE if they are equal.
- * Return FLB_FALSE if obj->type is not string or they are not equal
- */
-int equal_obj_str(msgpack_object obj, const char *str, const int size);
-
-int validate_key(msgpack_object obj, const char *str, const int size);
-
-/*
- * if obj->type is string, assign obj->val to subfield
- * Otherwise leave the subfield untouched
- */
-void try_assign_subfield_str(msgpack_object obj, flb_sds_t *subfield);
-
-/*
- * if obj->type is boolean, assign obj->val to subfield
- * Otherwise leave the subfield untouched
- */
-void try_assign_subfield_bool(msgpack_object obj, int *subfield);
-
-/*
- * if obj->type is valid, assign obj->val to subfield
- * Otherwise leave the subfield untouched
- */
-void try_assign_subfield_int(msgpack_object obj, int64_t *subfield);
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c b/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c
deleted file mode 100644
index 9a1c814c0..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.c
+++ /dev/null
@@ -1,393 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <fluent-bit/flb_regex.h>
-#include "stackdriver.h"
-#include "stackdriver_helper.h"
-#include "stackdriver_http_request.h"
-
-#include <ctype.h>
-
-typedef enum {
- NO_HTTPREQUEST = 1,
- HTTPREQUEST_EXISTS = 2
-} http_request_status;
-
-void init_http_request(struct http_request_field *http_request)
-{
- http_request->latency = flb_sds_create("");
- http_request->protocol = flb_sds_create("");
- http_request->referer = flb_sds_create("");
- http_request->remoteIp = flb_sds_create("");
- http_request->requestMethod = flb_sds_create("");
- http_request->requestUrl = flb_sds_create("");
- http_request->serverIp = flb_sds_create("");
- http_request->userAgent = flb_sds_create("");
-
- http_request->cacheFillBytes = 0;
- http_request->requestSize = 0;
- http_request->responseSize = 0;
- http_request->status = 0;
-
- http_request->cacheHit = FLB_FALSE;
- http_request->cacheLookup = FLB_FALSE;
- http_request->cacheValidatedWithOriginServer = FLB_FALSE;
-}
-
-void destroy_http_request(struct http_request_field *http_request)
-{
- flb_sds_destroy(http_request->latency);
- flb_sds_destroy(http_request->protocol);
- flb_sds_destroy(http_request->referer);
- flb_sds_destroy(http_request->remoteIp);
- flb_sds_destroy(http_request->requestMethod);
- flb_sds_destroy(http_request->requestUrl);
- flb_sds_destroy(http_request->serverIp);
- flb_sds_destroy(http_request->userAgent);
-}
-
-void add_http_request_field(struct http_request_field *http_request,
- msgpack_packer *mp_pck)
-{
- msgpack_pack_str(mp_pck, 11);
- msgpack_pack_str_body(mp_pck, "httpRequest", 11);
-
- if (flb_sds_is_empty(http_request->latency) == FLB_TRUE) {
- msgpack_pack_map(mp_pck, 14);
- }
- else {
- msgpack_pack_map(mp_pck, 15);
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_LATENCY_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_LATENCY,
- HTTP_REQUEST_LATENCY_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->latency));
- msgpack_pack_str_body(mp_pck, http_request->latency,
- flb_sds_len(http_request->latency));
- }
-
- /* String sub-fields */
- msgpack_pack_str(mp_pck, HTTP_REQUEST_REQUEST_METHOD_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REQUEST_METHOD,
- HTTP_REQUEST_REQUEST_METHOD_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->requestMethod));
- msgpack_pack_str_body(mp_pck, http_request->requestMethod,
- flb_sds_len(http_request->requestMethod));
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_REQUEST_URL_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REQUEST_URL,
- HTTP_REQUEST_REQUEST_URL_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->requestUrl));
- msgpack_pack_str_body(mp_pck, http_request->requestUrl,
- flb_sds_len(http_request->requestUrl));
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_USER_AGENT_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_USER_AGENT,
- HTTP_REQUEST_USER_AGENT_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->userAgent));
- msgpack_pack_str_body(mp_pck, http_request->userAgent,
- flb_sds_len(http_request->userAgent));
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_REMOTE_IP_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REMOTE_IP,
- HTTP_REQUEST_REMOTE_IP_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->remoteIp));
- msgpack_pack_str_body(mp_pck, http_request->remoteIp,
- flb_sds_len(http_request->remoteIp));
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_SERVER_IP_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_SERVER_IP,
- HTTP_REQUEST_SERVER_IP_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->serverIp));
- msgpack_pack_str_body(mp_pck, http_request->serverIp,
- flb_sds_len(http_request->serverIp));
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_REFERER_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REFERER,
- HTTP_REQUEST_REFERER_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->referer));
- msgpack_pack_str_body(mp_pck, http_request->referer,
- flb_sds_len(http_request->referer));
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_PROTOCOL_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_PROTOCOL,
- HTTP_REQUEST_PROTOCOL_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(http_request->protocol));
- msgpack_pack_str_body(mp_pck, http_request->protocol,
- flb_sds_len(http_request->protocol));
-
- /* Integer sub-fields */
- msgpack_pack_str(mp_pck, HTTP_REQUEST_REQUESTSIZE_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_REQUESTSIZE,
- HTTP_REQUEST_REQUESTSIZE_SIZE);
- msgpack_pack_int64(mp_pck, http_request->requestSize);
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_RESPONSESIZE_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_RESPONSESIZE,
- HTTP_REQUEST_RESPONSESIZE_SIZE);
- msgpack_pack_int64(mp_pck, http_request->responseSize);
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_STATUS_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_STATUS, HTTP_REQUEST_STATUS_SIZE);
- msgpack_pack_int64(mp_pck, http_request->status);
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_FILL_BYTES_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_FILL_BYTES,
- HTTP_REQUEST_CACHE_FILL_BYTES_SIZE);
- msgpack_pack_int64(mp_pck, http_request->cacheFillBytes);
-
- /* Boolean sub-fields */
- msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_LOOKUP_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_LOOKUP,
- HTTP_REQUEST_CACHE_LOOKUP_SIZE);
- if (http_request->cacheLookup == FLB_TRUE) {
- msgpack_pack_true(mp_pck);
- }
- else {
- msgpack_pack_false(mp_pck);
- }
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_HIT_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_HIT,
- HTTP_REQUEST_CACHE_HIT_SIZE);
- if (http_request->cacheLookup == FLB_TRUE) {
- msgpack_pack_true(mp_pck);
- }
- else {
- msgpack_pack_false(mp_pck);
- }
-
- msgpack_pack_str(mp_pck, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE);
- msgpack_pack_str_body(mp_pck, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER,
- HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE);
- if (http_request->cacheValidatedWithOriginServer == FLB_TRUE) {
- msgpack_pack_true(mp_pck);
- }
- else {
- msgpack_pack_false(mp_pck);
- }
-}
-
-/* latency should be in the format:
- * whitespace (opt.) + integer + point & decimal (opt.)
- * + whitespace (opt.) + "s" + whitespace (opt.)
- *
- * latency is Duration, so the maximum value is "315576000000.999999999s".
- * (23 characters in length)
- */
-static void validate_latency(msgpack_object_str latency_in_payload,
- struct http_request_field *http_request) {
- int i = 0;
- int j = 0;
- int status = 0;
- char extract_latency[32];
- flb_sds_t pattern;
- struct flb_regex *regex;
-
- pattern = flb_sds_create("^\\s*\\d+(.\\d+)?\\s*s\\s*$");
- if (!pattern) {
- return;
- }
-
- if (latency_in_payload.size > sizeof(extract_latency)) {
- flb_sds_destroy(pattern);
- return;
- }
-
- regex = flb_regex_create(pattern);
- status = flb_regex_match(regex,
- (unsigned char *) latency_in_payload.ptr,
- latency_in_payload.size);
- flb_regex_destroy(regex);
- flb_sds_destroy(pattern);
-
- if (status == 1) {
- for (; i < latency_in_payload.size; ++ i) {
- if (latency_in_payload.ptr[i] == '.' || latency_in_payload.ptr[i] == 's'
- || isdigit(latency_in_payload.ptr[i])) {
- extract_latency[j] = latency_in_payload.ptr[i];
- ++ j;
- }
- }
- http_request->latency = flb_sds_copy(http_request->latency, extract_latency, j);
- }
-}
-
-/* Return true if httpRequest extracted */
-int extract_http_request(struct http_request_field *http_request,
- flb_sds_t http_request_key,
- int http_request_key_size,
- msgpack_object *obj, int *extra_subfields)
-{
- http_request_status op_status = NO_HTTPREQUEST;
- msgpack_object_kv *p;
- msgpack_object_kv *pend;
- msgpack_object_kv *tmp_p;
- msgpack_object_kv *tmp_pend;
-
- if (obj->via.map.size == 0) {
- return FLB_FALSE;
- }
-
- p = obj->via.map.ptr;
- pend = obj->via.map.ptr + obj->via.map.size;
-
- for (; p < pend && op_status == NO_HTTPREQUEST; ++p) {
-
- if (p->val.type != MSGPACK_OBJECT_MAP
- || !validate_key(p->key, http_request_key,
- http_request_key_size)) {
-
- continue;
- }
-
- op_status = HTTPREQUEST_EXISTS;
- msgpack_object sub_field = p->val;
-
- tmp_p = sub_field.via.map.ptr;
- tmp_pend = sub_field.via.map.ptr + sub_field.via.map.size;
-
- /* Validate the subfields of httpRequest */
- for (; tmp_p < tmp_pend; ++tmp_p) {
- if (tmp_p->key.type != MSGPACK_OBJECT_STR) {
- continue;
- }
-
- if (validate_key(tmp_p->key, HTTP_REQUEST_LATENCY,
- HTTP_REQUEST_LATENCY_SIZE)) {
- if (tmp_p->val.type != MSGPACK_OBJECT_STR) {
- continue;
- }
- validate_latency(tmp_p->val.via.str, http_request);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_PROTOCOL,
- HTTP_REQUEST_PROTOCOL_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->protocol);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_REFERER,
- HTTP_REQUEST_REFERER_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->referer);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_REMOTE_IP,
- HTTP_REQUEST_REMOTE_IP_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->remoteIp);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_REQUEST_METHOD,
- HTTP_REQUEST_REQUEST_METHOD_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->requestMethod);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_REQUEST_URL,
- HTTP_REQUEST_REQUEST_URL_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->requestUrl);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_SERVER_IP,
- HTTP_REQUEST_SERVER_IP_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->serverIp);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_USER_AGENT,
- HTTP_REQUEST_USER_AGENT_SIZE)) {
- try_assign_subfield_str(tmp_p->val, &http_request->userAgent);
- }
-
- else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_FILL_BYTES,
- HTTP_REQUEST_CACHE_FILL_BYTES_SIZE)) {
- try_assign_subfield_int(tmp_p->val, &http_request->cacheFillBytes);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_REQUESTSIZE,
- HTTP_REQUEST_REQUESTSIZE_SIZE)) {
- try_assign_subfield_int(tmp_p->val, &http_request->requestSize);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_RESPONSESIZE,
- HTTP_REQUEST_RESPONSESIZE_SIZE)) {
- try_assign_subfield_int(tmp_p->val, &http_request->responseSize);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_STATUS,
- HTTP_REQUEST_STATUS_SIZE)) {
- try_assign_subfield_int(tmp_p->val, &http_request->status);
- }
-
- else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_HIT,
- HTTP_REQUEST_CACHE_HIT_SIZE)) {
- try_assign_subfield_bool(tmp_p->val, &http_request->cacheHit);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_LOOKUP,
- HTTP_REQUEST_CACHE_LOOKUP_SIZE)) {
- try_assign_subfield_bool(tmp_p->val, &http_request->cacheLookup);
- }
- else if (validate_key(tmp_p->key, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER,
- HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE)) {
- try_assign_subfield_bool(tmp_p->val,
- &http_request->cacheValidatedWithOriginServer);
- }
-
- else {
- *extra_subfields += 1;
- }
- }
- }
-
- return op_status == HTTPREQUEST_EXISTS;
-}
-
-void pack_extra_http_request_subfields(msgpack_packer *mp_pck,
- msgpack_object *http_request,
- int extra_subfields) {
- msgpack_object_kv *p = http_request->via.map.ptr;
- msgpack_object_kv *const pend = http_request->via.map.ptr + http_request->via.map.size;
-
- msgpack_pack_map(mp_pck, extra_subfields);
-
- for (; p < pend; ++p) {
- if (validate_key(p->key, HTTP_REQUEST_LATENCY,
- HTTP_REQUEST_LATENCY_SIZE)
- || validate_key(p->key, HTTP_REQUEST_PROTOCOL,
- HTTP_REQUEST_PROTOCOL_SIZE)
- || validate_key(p->key, HTTP_REQUEST_REFERER,
- HTTP_REQUEST_REFERER_SIZE)
- || validate_key(p->key, HTTP_REQUEST_REMOTE_IP,
- HTTP_REQUEST_REMOTE_IP_SIZE)
- || validate_key(p->key, HTTP_REQUEST_REQUEST_METHOD,
- HTTP_REQUEST_REQUEST_METHOD_SIZE)
- || validate_key(p->key, HTTP_REQUEST_REQUEST_URL,
- HTTP_REQUEST_REQUEST_URL_SIZE)
- || validate_key(p->key, HTTP_REQUEST_SERVER_IP,
- HTTP_REQUEST_SERVER_IP_SIZE)
- || validate_key(p->key, HTTP_REQUEST_USER_AGENT,
- HTTP_REQUEST_USER_AGENT_SIZE)
- || validate_key(p->key, HTTP_REQUEST_CACHE_FILL_BYTES,
- HTTP_REQUEST_CACHE_FILL_BYTES_SIZE)
- || validate_key(p->key, HTTP_REQUEST_REQUESTSIZE,
- HTTP_REQUEST_REQUESTSIZE_SIZE)
- || validate_key(p->key, HTTP_REQUEST_RESPONSESIZE,
- HTTP_REQUEST_RESPONSESIZE_SIZE)
- || validate_key(p->key, HTTP_REQUEST_STATUS,
- HTTP_REQUEST_STATUS_SIZE)
- || validate_key(p->key, HTTP_REQUEST_CACHE_HIT,
- HTTP_REQUEST_CACHE_HIT_SIZE)
- || validate_key(p->key, HTTP_REQUEST_CACHE_LOOKUP,
- HTTP_REQUEST_CACHE_LOOKUP_SIZE)
- || validate_key(p->key, HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER,
- HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE)) {
-
- continue;
- }
-
- msgpack_pack_object(mp_pck, p->key);
- msgpack_pack_object(mp_pck, p->val);
- }
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h b/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h
deleted file mode 100644
index 8b935c3f7..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_http_request.h
+++ /dev/null
@@ -1,120 +0,0 @@
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef FLB_STD_HTTPREQUEST_H
-#define FLB_STD_HTTPREQUEST_H
-
-#include "stackdriver.h"
-
-/* subfield name and size */
-#define HTTP_REQUEST_LATENCY "latency"
-#define HTTP_REQUEST_PROTOCOL "protocol"
-#define HTTP_REQUEST_REFERER "referer"
-#define HTTP_REQUEST_REMOTE_IP "remoteIp"
-#define HTTP_REQUEST_REQUEST_METHOD "requestMethod"
-#define HTTP_REQUEST_REQUEST_URL "requestUrl"
-#define HTTP_REQUEST_SERVER_IP "serverIp"
-#define HTTP_REQUEST_USER_AGENT "userAgent"
-#define HTTP_REQUEST_CACHE_FILL_BYTES "cacheFillBytes"
-#define HTTP_REQUEST_REQUESTSIZE "requestSize"
-#define HTTP_REQUEST_RESPONSESIZE "responseSize"
-#define HTTP_REQUEST_STATUS "status"
-#define HTTP_REQUEST_CACHE_HIT "cacheHit"
-#define HTTP_REQUEST_CACHE_LOOKUP "cacheLookup"
-#define HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER "cacheValidatedWithOriginServer"
-
-#define HTTP_REQUEST_LATENCY_SIZE 7
-#define HTTP_REQUEST_PROTOCOL_SIZE 8
-#define HTTP_REQUEST_REFERER_SIZE 7
-#define HTTP_REQUEST_REMOTE_IP_SIZE 8
-#define HTTP_REQUEST_REQUEST_METHOD_SIZE 13
-#define HTTP_REQUEST_REQUEST_URL_SIZE 10
-#define HTTP_REQUEST_SERVER_IP_SIZE 8
-#define HTTP_REQUEST_USER_AGENT_SIZE 9
-#define HTTP_REQUEST_CACHE_FILL_BYTES_SIZE 14
-#define HTTP_REQUEST_REQUESTSIZE_SIZE 11
-#define HTTP_REQUEST_RESPONSESIZE_SIZE 12
-#define HTTP_REQUEST_STATUS_SIZE 6
-#define HTTP_REQUEST_CACHE_HIT_SIZE 8
-#define HTTP_REQUEST_CACHE_LOOKUP_SIZE 11
-#define HTTP_REQUEST_CACHE_VALIDATE_WITH_ORIGIN_SERVER_SIZE 30
-
-
-struct http_request_field {
- flb_sds_t latency;
- flb_sds_t protocol;
- flb_sds_t referer;
- flb_sds_t remoteIp;
- flb_sds_t requestMethod;
- flb_sds_t requestUrl;
- flb_sds_t serverIp;
- flb_sds_t userAgent;
-
- int64_t cacheFillBytes;
- int64_t requestSize;
- int64_t responseSize;
- int64_t status;
-
- int cacheHit;
- int cacheLookup;
- int cacheValidatedWithOriginServer;
-};
-
-void init_http_request(struct http_request_field *http_request);
-void destroy_http_request(struct http_request_field *http_request);
-
-/*
- * Add httpRequest field to the entries.
- * The structure of httpRequest is as shown in struct http_request_field
- */
-void add_http_request_field(struct http_request_field *http_request,
- msgpack_packer *mp_pck);
-
-/*
- * Extract the httpRequest field from the jsonPayload.
- * If the httpRequest field exists, return TRUE and store the subfields.
- * If there are extra subfields, count the number.
- */
-int extract_http_request(struct http_request_field *http_request,
- flb_sds_t http_request_key,
- int http_request_key_size,
- msgpack_object *obj, int *extra_subfields);
-
-/*
- * When there are extra subfields, we will preserve the extra subfields inside jsonPayload
- * For example, if the jsonPayload is as followed:
- * jsonPayload {
- * "logging.googleapis.com/http_request": {
- * "requestMethod": "GET",
- * "latency": "1s",
- * "cacheLookup": true,
- * "extra": "some string" #extra subfield
- * }
- * }
- * We will preserve the extra subfields. The jsonPayload after extracting is:
- * jsonPayload {
- * "logging.googleapis.com/http_request": {
- * "extra": "some string"
- * }
- * }
- */
-void pack_extra_http_request_subfields(msgpack_packer *mp_pck,
- msgpack_object *http_request,
- int extra_subfields);
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.c b/fluent-bit/plugins/out_stackdriver/stackdriver_operation.c
deleted file mode 100644
index 548e8b473..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.c
+++ /dev/null
@@ -1,147 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-
-#include "stackdriver.h"
-#include "stackdriver_helper.h"
-#include "stackdriver_operation.h"
-
-typedef enum {
- NO_OPERATION = 1,
- OPERATION_EXISTED = 2
-} operation_status;
-
-void add_operation_field(flb_sds_t *operation_id, flb_sds_t *operation_producer,
- int *operation_first, int *operation_last,
- msgpack_packer *mp_pck)
-{
- msgpack_pack_str(mp_pck, 9);
- msgpack_pack_str_body(mp_pck, "operation", 9);
-
- msgpack_pack_map(mp_pck, 4);
-
- msgpack_pack_str(mp_pck, OPERATION_ID_SIZE);
- msgpack_pack_str_body(mp_pck, OPERATION_ID, OPERATION_ID_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(*operation_id));
- msgpack_pack_str_body(mp_pck, *operation_id, flb_sds_len(*operation_id));
-
- msgpack_pack_str(mp_pck, OPERATION_PRODUCER_SIZE);
- msgpack_pack_str_body(mp_pck, OPERATION_PRODUCER, OPERATION_PRODUCER_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(*operation_producer));
- msgpack_pack_str_body(mp_pck, *operation_producer,
- flb_sds_len(*operation_producer));
-
- msgpack_pack_str(mp_pck, OPERATION_FIRST_SIZE);
- msgpack_pack_str_body(mp_pck, OPERATION_FIRST, OPERATION_FIRST_SIZE);
- if (*operation_first == FLB_TRUE) {
- msgpack_pack_true(mp_pck);
- }
- else {
- msgpack_pack_false(mp_pck);
- }
-
- msgpack_pack_str(mp_pck, OPERATION_LAST_SIZE);
- msgpack_pack_str_body(mp_pck, OPERATION_LAST, OPERATION_LAST_SIZE);
- if (*operation_last == FLB_TRUE) {
- msgpack_pack_true(mp_pck);
- }
- else {
- msgpack_pack_false(mp_pck);
- }
-}
-
-/* Return true if operation extracted */
-int extract_operation(flb_sds_t *operation_id, flb_sds_t *operation_producer,
- int *operation_first, int *operation_last,
- msgpack_object *obj, int *extra_subfields)
-{
- operation_status op_status = NO_OPERATION;
- msgpack_object_kv *p;
- msgpack_object_kv *pend;
- msgpack_object_kv *tmp_p;
- msgpack_object_kv *tmp_pend;
-
- if (obj->via.map.size == 0) {
- return FLB_FALSE;
- }
- p = obj->via.map.ptr;
- pend = obj->via.map.ptr + obj->via.map.size;
-
- for (; p < pend && op_status == NO_OPERATION; ++p) {
-
- if (p->val.type != MSGPACK_OBJECT_MAP
- || !validate_key(p->key, OPERATION_FIELD_IN_JSON,
- OPERATION_KEY_SIZE)) {
- continue;
- }
-
- op_status = OPERATION_EXISTED;
- msgpack_object sub_field = p->val;
-
- tmp_p = sub_field.via.map.ptr;
- tmp_pend = sub_field.via.map.ptr + sub_field.via.map.size;
-
- /* Validate the subfields of operation */
- for (; tmp_p < tmp_pend; ++tmp_p) {
- if (tmp_p->key.type != MSGPACK_OBJECT_STR) {
- continue;
- }
-
- if (validate_key(tmp_p->key, OPERATION_ID, OPERATION_ID_SIZE)) {
- try_assign_subfield_str(tmp_p->val, operation_id);
- }
- else if (validate_key(tmp_p->key, OPERATION_PRODUCER,
- OPERATION_PRODUCER_SIZE)) {
- try_assign_subfield_str(tmp_p->val, operation_producer);
- }
- else if (validate_key(tmp_p->key, OPERATION_FIRST, OPERATION_FIRST_SIZE)) {
- try_assign_subfield_bool(tmp_p->val, operation_first);
- }
- else if (validate_key(tmp_p->key, OPERATION_LAST, OPERATION_LAST_SIZE)) {
- try_assign_subfield_bool(tmp_p->val, operation_last);
- }
- else {
- *extra_subfields += 1;
- }
- }
- }
-
- return op_status == OPERATION_EXISTED;
-}
-
-void pack_extra_operation_subfields(msgpack_packer *mp_pck,
- msgpack_object *operation, int extra_subfields) {
- msgpack_object_kv *p = operation->via.map.ptr;
- msgpack_object_kv *const pend = operation->via.map.ptr + operation->via.map.size;
-
- msgpack_pack_map(mp_pck, extra_subfields);
-
- for (; p < pend; ++p) {
- if (validate_key(p->key, OPERATION_ID, OPERATION_ID_SIZE)
- || validate_key(p->key, OPERATION_PRODUCER, OPERATION_PRODUCER_SIZE)
- || validate_key(p->key, OPERATION_FIRST, OPERATION_FIRST_SIZE)
- || validate_key(p->key, OPERATION_LAST, OPERATION_LAST_SIZE)) {
- continue;
- }
-
- msgpack_pack_object(mp_pck, p->key);
- msgpack_pack_object(mp_pck, p->val);
- }
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.h b/fluent-bit/plugins/out_stackdriver/stackdriver_operation.h
deleted file mode 100644
index ded886c3b..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_operation.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef FLB_STD_OPERATION_H
-#define FLB_STD_OPERATION_H
-
-#include "stackdriver.h"
-
-/* subfield name and size */
-#define OPERATION_ID "id"
-#define OPERATION_PRODUCER "producer"
-#define OPERATION_FIRST "first"
-#define OPERATION_LAST "last"
-
-#define OPERATION_ID_SIZE 2
-#define OPERATION_PRODUCER_SIZE 8
-#define OPERATION_FIRST_SIZE 5
-#define OPERATION_LAST_SIZE 4
-
-/*
- * Add operation field to the entries.
- * The structure of operation is:
- * {
- * "id": string,
- * "producer": string,
- * "first": boolean,
- * "last": boolean
- * }
- *
- */
-void add_operation_field(flb_sds_t *operation_id, flb_sds_t *operation_producer,
- int *operation_first, int *operation_last,
- msgpack_packer *mp_pck);
-
-/*
- * Extract the operation field from the jsonPayload.
- * If the operation field exists, return TRUE and store the subfields.
- * If there are extra subfields, count the number.
- */
-int extract_operation(flb_sds_t *operation_id, flb_sds_t *operation_producer,
- int *operation_first, int *operation_last,
- msgpack_object *obj, int *extra_subfields);
-
-/*
- * When there are extra subfields, we will preserve the extra subfields inside jsonPayload
- * For example, if the jsonPayload is as followed:
- * jsonPayload {
- * "logging.googleapis.com/operation": {
- * "id": "id1",
- * "producer": "id2",
- * "first": true,
- * "last": true,
- * "extra": "some string" #extra subfield
- * }
- * }
- * We will preserve the extra subfields. The jsonPayload after extracting is:
- * jsonPayload {
- * "logging.googleapis.com/operation": {
- * "extra": "some string"
- * }
- * }
- */
-void pack_extra_operation_subfields(msgpack_packer *mp_pck, msgpack_object *operation,
- int extra_subfields);
-
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c b/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c
deleted file mode 100644
index b114e4922..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.c
+++ /dev/null
@@ -1,143 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_sds.h>
-#include <fluent-bit/flb_kv.h>
-
-#include "stackdriver.h"
-#include "stackdriver_resource_types.h"
-
-static const struct resource_type resource_types[] = {
- {
- .id = RESOURCE_TYPE_K8S,
- .resources = {"k8s_container", "k8s_node", "k8s_pod"},
- .required_labels = {"cluster_name", "location"}
- },
- {
- .id = RESOURCE_TYPE_GENERIC_NODE,
- .resources = {"generic_node"},
- .required_labels = {"location", "namespace", "node_id"}
- },
- {
- .id = RESOURCE_TYPE_GENERIC_TASK,
- .resources = {"generic_task"},
- .required_labels = {"location", "namespace", "job", "task_id"}
- }
-};
-
-static char **get_required_labels(int resource_type)
-{
- int i;
- int len;
-
- len = sizeof(resource_types) / sizeof(resource_types[0]);
- for(i = 0; i < len; i++) {
- if (resource_types[i].id == resource_type) {
- return (char **) resource_types[i].required_labels;
- }
- }
- return NULL;
-}
-
-/*
- * set_resource_type():
- * - Iterates through resource_types that are set up for validation and sets the
- * resource_type if it matches one of them.
- * - A resource may not be in the resource types list but still be accepted
- * and processed (e.g. global) if it does not require / is not set up for validation.
- */
-void set_resource_type(struct flb_stackdriver *ctx)
-{
- int i;
- int j;
- int len;
- char *resource;
- struct resource_type resource_type;
-
- len = sizeof(resource_types) / sizeof(resource_types[0]);
- for(i = 0; i < len; i++) {
- resource_type = resource_types[i];
- for(j = 0; j < MAX_RESOURCE_ENTRIES; j++) {
- if (resource_type.resources[j] != NULL) {
- resource = resource_type.resources[j];
- if (flb_sds_cmp(ctx->resource, resource, strlen(resource)) == 0) {
- ctx->resource_type = resource_type.id;
- return;
- }
- }
- }
- }
-}
-
-/*
- * resource_api_has_required_labels():
- * - Determines if all required labels for the set resource type are present as
- * keys on the resource labels key-value pairs.
- */
-int resource_api_has_required_labels(struct flb_stackdriver *ctx)
-{
- struct mk_list *head;
- struct flb_hash_table *ht;
- struct flb_kv *label_kv;
- char** required_labels;
- int i;
- int found;
- void *tmp_buf;
- size_t tmp_size;
-
- if (mk_list_size(&ctx->resource_labels_kvs) == 0) {
- return FLB_FALSE;
- }
-
- required_labels = get_required_labels(ctx->resource_type);
- if (required_labels == NULL) {
- flb_plg_warn(ctx->ins, "no validation applied to resource_labels "
- "for set resource type");
- return FLB_FALSE;
- }
-
- ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, MAX_REQUIRED_LABEL_ENTRIES, 0);
- mk_list_foreach(head, &ctx->resource_labels_kvs) {
- label_kv = mk_list_entry(head, struct flb_kv, _head);
- for (i = 0; i < MAX_REQUIRED_LABEL_ENTRIES; i++) {
- if (required_labels[i] != NULL && flb_sds_cmp(label_kv->key,
- required_labels[i], strlen(required_labels[i])) == 0) {
- flb_hash_table_add(ht, required_labels[i], strlen(required_labels[i]),
- NULL, 0);
- }
- }
- }
-
- for (i = 0; i < MAX_REQUIRED_LABEL_ENTRIES; i++) {
- if (required_labels[i] != NULL) {
- found = flb_hash_table_get(ht, required_labels[i], strlen(required_labels[i]),
- &tmp_buf, &tmp_size);
- if (found == -1) {
- flb_plg_warn(ctx->ins, "labels set in resource_labels will not be applied"
- ", as the required resource label [%s] is missing", required_labels[i]);
- ctx->should_skip_resource_labels_api = FLB_TRUE;
- flb_hash_table_destroy(ht);
- return FLB_FALSE;
- }
- }
- }
- flb_hash_table_destroy(ht);
- return FLB_TRUE;
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h b/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h
deleted file mode 100644
index 5e45c8745..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_resource_types.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_OUT_STACKDRIVER_RESOURCE_TYPES_H
-#define FLB_OUT_STACKDRIVER_RESOURCE_TYPES_H
-
-#include "stackdriver.h"
-
-#define MAX_RESOURCE_ENTRIES 10
-#define MAX_REQUIRED_LABEL_ENTRIES 10
-
-#define RESOURCE_TYPE_K8S 1
-#define RESOURCE_TYPE_GENERIC_NODE 2
-#define RESOURCE_TYPE_GENERIC_TASK 3
-
-struct resource_type {
- int id;
- char* resources[MAX_RESOURCE_ENTRIES];
- char* required_labels[MAX_REQUIRED_LABEL_ENTRIES];
-};
-
-void set_resource_type(struct flb_stackdriver *ctx);
-int resource_api_has_required_labels(struct flb_stackdriver *ctx);
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c b/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c
deleted file mode 100644
index 58102c91e..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.c
+++ /dev/null
@@ -1,139 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "stackdriver.h"
-#include "stackdriver_helper.h"
-#include "stackdriver_source_location.h"
-
-typedef enum {
- NO_SOURCELOCATION = 1,
- SOURCELOCATION_EXISTED = 2
-} source_location_status;
-
-
-void add_source_location_field(flb_sds_t *source_location_file,
- int64_t source_location_line,
- flb_sds_t *source_location_function,
- msgpack_packer *mp_pck)
-{
- msgpack_pack_str(mp_pck, 14);
- msgpack_pack_str_body(mp_pck, "sourceLocation", 14);
- msgpack_pack_map(mp_pck, 3);
-
- msgpack_pack_str(mp_pck, SOURCE_LOCATION_FILE_SIZE);
- msgpack_pack_str_body(mp_pck, SOURCE_LOCATION_FILE, SOURCE_LOCATION_FILE_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(*source_location_file));
- msgpack_pack_str_body(mp_pck, *source_location_file,
- flb_sds_len(*source_location_file));
-
- msgpack_pack_str(mp_pck, SOURCE_LOCATION_LINE_SIZE);
- msgpack_pack_str_body(mp_pck, SOURCE_LOCATION_LINE, SOURCE_LOCATION_LINE_SIZE);
- msgpack_pack_int64(mp_pck, source_location_line);
-
- msgpack_pack_str(mp_pck, SOURCE_LOCATION_FUNCTION_SIZE);
- msgpack_pack_str_body(mp_pck, SOURCE_LOCATION_FUNCTION,
- SOURCE_LOCATION_FUNCTION_SIZE);
- msgpack_pack_str(mp_pck, flb_sds_len(*source_location_function));
- msgpack_pack_str_body(mp_pck, *source_location_function,
- flb_sds_len(*source_location_function));
-}
-
-/* Return FLB_TRUE if sourceLocation extracted */
-int extract_source_location(flb_sds_t *source_location_file,
- int64_t *source_location_line,
- flb_sds_t *source_location_function,
- msgpack_object *obj, int *extra_subfields)
-{
- source_location_status op_status = NO_SOURCELOCATION;
- msgpack_object_kv *p;
- msgpack_object_kv *pend;
- msgpack_object_kv *tmp_p;
- msgpack_object_kv *tmp_pend;
-
- if (obj->via.map.size == 0) {
- return FLB_FALSE;
- }
- p = obj->via.map.ptr;
- pend = obj->via.map.ptr + obj->via.map.size;
-
- for (; p < pend && op_status == NO_SOURCELOCATION; ++p) {
-
- if (p->val.type != MSGPACK_OBJECT_MAP
- || p->key.type != MSGPACK_OBJECT_STR
- || !validate_key(p->key, SOURCELOCATION_FIELD_IN_JSON,
- SOURCE_LOCATION_SIZE)) {
-
- continue;
- }
-
- op_status = SOURCELOCATION_EXISTED;
- msgpack_object sub_field = p->val;
-
- tmp_p = sub_field.via.map.ptr;
- tmp_pend = sub_field.via.map.ptr + sub_field.via.map.size;
-
- /* Validate the subfields of sourceLocation */
- for (; tmp_p < tmp_pend; ++tmp_p) {
- if (tmp_p->key.type != MSGPACK_OBJECT_STR) {
- continue;
- }
-
- if (validate_key(tmp_p->key,
- SOURCE_LOCATION_FILE,
- SOURCE_LOCATION_FILE_SIZE)) {
- try_assign_subfield_str(tmp_p->val, source_location_file);
- }
- else if (validate_key(tmp_p->key,
- SOURCE_LOCATION_FUNCTION,
- SOURCE_LOCATION_FUNCTION_SIZE)) {
- try_assign_subfield_str(tmp_p->val, source_location_function);
- }
- else if (validate_key(tmp_p->key,
- SOURCE_LOCATION_LINE,
- SOURCE_LOCATION_LINE_SIZE)) {
- try_assign_subfield_int(tmp_p->val, source_location_line);
- }
- else {
- *extra_subfields += 1;
- }
- }
- }
-
- return op_status == SOURCELOCATION_EXISTED;
-}
-
-void pack_extra_source_location_subfields(msgpack_packer *mp_pck,
- msgpack_object *source_location,
- int extra_subfields) {
- msgpack_object_kv *p = source_location->via.map.ptr;
- msgpack_object_kv *const pend = source_location->via.map.ptr + source_location->via.map.size;
-
- msgpack_pack_map(mp_pck, extra_subfields);
-
- for (; p < pend; ++p) {
- if (validate_key(p->key, SOURCE_LOCATION_FILE, SOURCE_LOCATION_FILE_SIZE)
- || validate_key(p->key, SOURCE_LOCATION_LINE, SOURCE_LOCATION_LINE_SIZE)
- || validate_key(p->key, SOURCE_LOCATION_FUNCTION,
- SOURCE_LOCATION_FUNCTION_SIZE)) {
- continue;
- }
-
- msgpack_pack_object(mp_pck, p->key);
- msgpack_pack_object(mp_pck, p->val);
- }
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h b/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h
deleted file mode 100644
index 4f703d330..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_source_location.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef FLB_STD_SOURCELOCATION_H
-#define FLB_STD_SOURCELOCATION_H
-
-#include "stackdriver.h"
-
-/* subfield name and size */
-#define SOURCE_LOCATION_FILE "file"
-#define SOURCE_LOCATION_LINE "line"
-#define SOURCE_LOCATION_FUNCTION "function"
-
-#define SOURCE_LOCATION_FILE_SIZE 4
-#define SOURCE_LOCATION_LINE_SIZE 4
-#define SOURCE_LOCATION_FUNCTION_SIZE 8
-
-/*
- * Add sourceLocation field to the entries.
- * The structure of sourceLocation is:
- * {
- * "file": string,
- * "line": int,
- * "function": string
- * }
- */
-void add_source_location_field(flb_sds_t *source_location_file,
- int64_t source_location_line,
- flb_sds_t *source_location_function,
- msgpack_packer *mp_pck);
-
-/*
- * Extract the sourceLocation field from the jsonPayload.
- * If the sourceLocation field exists, return TRUE and store the subfields.
- * If there are extra subfields, count the number.
- */
-int extract_source_location(flb_sds_t *source_location_file,
- int64_t *source_location_line,
- flb_sds_t *source_location_function,
- msgpack_object *obj, int *extra_subfields);
-
-/*
- * When there are extra subfields, we will preserve the extra subfields inside jsonPayload
- * For example, if the jsonPayload is as followed:
- * jsonPayload {
- * "logging.googleapis.com/sourceLocation": {
- * "file": "file1",
- * "line": 1,
- * "function": "func1",
- * "extra": "some string" #extra subfield
- * }
- * }
- * We will preserve the extra subfields. The jsonPayload after extracting is:
- * jsonPayload {
- * "logging.googleapis.com/sourceLocation": {
- * "extra": "some string"
- * }
- * }
- */
-void pack_extra_source_location_subfields(msgpack_packer *mp_pck,
- msgpack_object *source_location,
- int extra_subfields);
-
-
-#endif
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c b/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c
deleted file mode 100644
index a9b350d22..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.c
+++ /dev/null
@@ -1,180 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-
-#include "stackdriver.h"
-#include "stackdriver_helper.h"
-#include "stackdriver_timestamp.h"
-#include <fluent-bit/flb_regex.h>
-
-#include <ctype.h>
-
-static int is_integer(char *str, int size) {
- int i;
- for (i = 0; i < size; ++ i) {
- if (!isdigit(str[i])) {
- return FLB_FALSE;
- }
- }
- return FLB_TRUE;
-}
-
-static void try_assign_time(long long seconds, long long nanos,
- struct flb_time *tms)
-{
- if (seconds != 0) {
- tms->tm.tv_sec = seconds;
- tms->tm.tv_nsec = nanos;
- }
-}
-
-static long long get_integer(msgpack_object obj)
-{
- char tmp[32];
-
- if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
- return obj.via.i64;
- }
- else if (obj.type == MSGPACK_OBJECT_STR
- && is_integer((char *) obj.via.str.ptr,
- obj.via.str.size)) {
-
- /*
- * use an intermediary buffer to perform the conversion to avoid any
- * overflow by atoll. LLONG_MAX value is +9,223,372,036,854,775,807,
- * so using a 32 bytes buffer is enough.
- */
- if (obj.via.str.size > sizeof(tmp) - 1) {
- return 0;
- }
-
- memcpy(tmp, obj.via.str.ptr, obj.via.str.size);
- tmp[obj.via.str.size] = '\0';
-
- return atoll(tmp);
- }
-
- return 0;
-}
-
-static int extract_format_timestamp_object(msgpack_object *obj,
- struct flb_time *tms)
-{
- int seconds_found = FLB_FALSE;
- int nanos_found = FLB_FALSE;
- long long seconds = 0;
- long long nanos = 0;
-
- msgpack_object_kv *p;
- msgpack_object_kv *pend;
- msgpack_object_kv *tmp_p;
- msgpack_object_kv *tmp_pend;
-
- if (obj->via.map.size == 0) {
- return FLB_FALSE;
- }
- p = obj->via.map.ptr;
- pend = obj->via.map.ptr + obj->via.map.size;
-
- for (; p < pend; ++p) {
- if (!validate_key(p->key, "timestamp", 9)
- || p->val.type != MSGPACK_OBJECT_MAP) {
- continue;
- }
-
- tmp_p = p->val.via.map.ptr;
- tmp_pend = p->val.via.map.ptr + p->val.via.map.size;
-
- for (; tmp_p < tmp_pend; ++tmp_p) {
- if (validate_key(tmp_p->key, "seconds", 7)) {
- seconds_found = FLB_TRUE;
- seconds = get_integer(tmp_p->val);
-
- if (nanos_found == FLB_TRUE) {
- try_assign_time(seconds, nanos, tms);
- return FLB_TRUE;
- }
- }
- else if (validate_key(tmp_p->key, "nanos", 5)) {
- nanos_found = FLB_TRUE;
- nanos = get_integer(tmp_p->val);
-
- if (seconds_found == FLB_TRUE) {
- try_assign_time(seconds, nanos, tms);
- return FLB_TRUE;
- }
- }
- }
- }
- return FLB_FALSE;
-}
-
-static int extract_format_timestamp_duo_fields(msgpack_object *obj,
- struct flb_time *tms)
-{
- int seconds_found = FLB_FALSE;
- int nanos_found = FLB_FALSE;
- long long seconds = 0;
- long long nanos = 0;
-
- msgpack_object_kv *p;
- msgpack_object_kv *pend;
-
- if (obj->via.map.size == 0) {
- return FLB_FALSE;
- }
- p = obj->via.map.ptr;
- pend = obj->via.map.ptr + obj->via.map.size;
-
- for (; p < pend; ++p) {
- if (validate_key(p->key, "timestampSeconds", 16)) {
- seconds_found = FLB_TRUE;
- seconds = get_integer(p->val);
-
- if (nanos_found == FLB_TRUE) {
- try_assign_time(seconds, nanos, tms);
- return FLB_TRUE;
- }
- }
- else if (validate_key(p->key, "timestampNanos", 14)) {
- nanos_found = FLB_TRUE;
- nanos = get_integer(p->val);
-
- if (seconds_found == FLB_TRUE) {
- try_assign_time(seconds, nanos, tms);
- return FLB_TRUE;
- }
- }
- }
-
- return FLB_FALSE;
-}
-
-timestamp_status extract_timestamp(msgpack_object *obj,
- struct flb_time *tms)
-{
- if (extract_format_timestamp_object(obj, tms)) {
- return FORMAT_TIMESTAMP_OBJECT;
- }
- if (extract_format_timestamp_duo_fields(obj, tms)) {
- return FORMAT_TIMESTAMP_DUO_FIELDS;
- }
- return TIMESTAMP_NOT_PRESENT;
-}
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h b/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h
deleted file mode 100644
index f3c025864..000000000
--- a/fluent-bit/plugins/out_stackdriver/stackdriver_timestamp.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef FLB_STD_TIMESTAMP_H
-#define FLB_STD_TIMESTAMP_H
-
-#include "stackdriver.h"
-#include <fluent-bit/flb_time.h>
-
-typedef enum {
- TIMESTAMP_NOT_PRESENT = 0,
- FORMAT_TIMESTAMP_OBJECT = 1,
- FORMAT_TIMESTAMP_DUO_FIELDS = 2
-} timestamp_status;
-
-/*
- * Currently support two formats of time-related fields
- * - "timestamp":{"seconds", "nanos"}
- * - "timestampSeconds"/"timestampNanos"
- *
- * If timestamp field is not existed, return TIMESTAMP_NOT_PRESENT
- * If timestamp format is "timestamp":{"seconds", "nanos"},
- * set the time and return FORMAT_TIMESTAMP
- *
- * If timestamp format is "timestampSeconds"/"timestampNanos",
- * set the time and return FORMAT_TIMESTAMPSECONDS
- */
-timestamp_status extract_timestamp(msgpack_object *obj,
- struct flb_time *tms);
-
-
-#endif