summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_stackdriver/stackdriver.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/out_stackdriver/stackdriver.c')
-rw-r--r--fluent-bit/plugins/out_stackdriver/stackdriver.c2867
1 files changed, 2867 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_stackdriver/stackdriver.c b/fluent-bit/plugins/out_stackdriver/stackdriver.c
new file mode 100644
index 00000000..5c48a338
--- /dev/null
+++ b/fluent-bit/plugins/out_stackdriver/stackdriver.c
@@ -0,0 +1,2867 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_oauth2.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_pthread.h>
+#include <fluent-bit/flb_crypto.h>
+#include <fluent-bit/flb_hash.h>
+#include <fluent-bit/flb_base64.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_gzip.h>
+
+#include <msgpack.h>
+
+#include "gce_metadata.h"
+#include "stackdriver.h"
+#include "stackdriver_conf.h"
+#include "stackdriver_operation.h"
+#include "stackdriver_source_location.h"
+#include "stackdriver_http_request.h"
+#include "stackdriver_timestamp.h"
+#include "stackdriver_helper.h"
+#include "stackdriver_resource_types.h"
+
+pthread_key_t oauth2_type;
+pthread_key_t oauth2_token;
+pthread_key_t oauth2_token_expires;
+
+static void oauth2_cache_exit(void *ptr)
+{
+ if (ptr) {
+ flb_sds_destroy(ptr);
+ }
+}
+
+static void oauth2_cache_free_expiration(void *ptr)
+{
+ if (ptr) {
+ flb_free(ptr);
+ }
+}
+
+static void oauth2_cache_init()
+{
+ /* oauth2 pthread key */
+ pthread_key_create(&oauth2_type, oauth2_cache_exit);
+ pthread_key_create(&oauth2_token, oauth2_cache_exit);
+ pthread_key_create(&oauth2_token_expires, oauth2_cache_free_expiration);
+}
+
+/* Set oauth2 type and token in pthread keys */
+static void oauth2_cache_set(char *type, char *token, time_t expires)
+{
+ flb_sds_t tmp;
+ time_t *tmp_expires;
+
+ /* oauth2 type */
+ tmp = pthread_getspecific(oauth2_type);
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ tmp = flb_sds_create(type);
+ pthread_setspecific(oauth2_type, tmp);
+
+ /* oauth2 access token */
+ tmp = pthread_getspecific(oauth2_token);
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ tmp = flb_sds_create(token);
+ pthread_setspecific(oauth2_token, tmp);
+
+ /* oauth2 access token expiration */
+ tmp_expires = pthread_getspecific(oauth2_token_expires);
+ if (tmp_expires) {
+ flb_free(tmp_expires);
+ }
+ tmp_expires = flb_calloc(1, sizeof(time_t));
+ if (!tmp_expires) {
+ flb_errno();
+ return;
+ }
+ *tmp_expires = expires;
+ pthread_setspecific(oauth2_token_expires, tmp_expires);
+}
+
+/* By using pthread keys cached values, compose the authorizatoin token */
+static time_t oauth2_cache_get_expiration()
+{
+ time_t *expires = pthread_getspecific(oauth2_token_expires);
+ if (expires) {
+ return *expires;
+ }
+ return 0;
+}
+
+/* By using pthread keys cached values, compose the authorizatoin token */
+static flb_sds_t oauth2_cache_to_token()
+{
+ flb_sds_t type;
+ flb_sds_t token;
+ flb_sds_t output;
+
+ type = pthread_getspecific(oauth2_type);
+ if (!type) {
+ return NULL;
+ }
+
+ output = flb_sds_create(type);
+ if (!output) {
+ return NULL;
+ }
+
+ token = pthread_getspecific(oauth2_token);
+ flb_sds_printf(&output, " %s", token);
+ return output;
+}
+
+/*
+ * Base64 Encoding in JWT must:
+ *
+ * - remove any trailing padding '=' character
+ * - replace '+' with '-'
+ * - replace '/' with '_'
+ *
+ * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C
+ */
+int jwt_base64_url_encode(unsigned char *out_buf, size_t out_size,
+ unsigned char *in_buf, size_t in_size,
+ size_t *olen)
+
+{
+ int i;
+ size_t len;
+ int result;
+
+
+ /* do normal base64 encoding */
+ result = flb_base64_encode((unsigned char *) out_buf, out_size - 1,
+ &len, in_buf, in_size);
+ if (result != 0) {
+ return -1;
+ }
+
+ /* Replace '+' and '/' characters */
+ for (i = 0; i < len && out_buf[i] != '='; i++) {
+ if (out_buf[i] == '+') {
+ out_buf[i] = '-';
+ }
+ else if (out_buf[i] == '/') {
+ out_buf[i] = '_';
+ }
+ }
+
+ /* Now 'i' becomes the new length */
+ *olen = i;
+ return 0;
+}
+
+static int jwt_encode(char *payload, char *secret,
+ char **out_signature, size_t *out_size,
+ struct flb_stackdriver *ctx)
+{
+ int ret;
+ int len;
+ int buf_size;
+ size_t olen;
+ char *buf;
+ char *sigd;
+ char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}";
+ unsigned char sha256_buf[32] = {0};
+ flb_sds_t out;
+ unsigned char sig[256] = {0};
+ size_t sig_len;
+
+ buf_size = (strlen(payload) + strlen(secret)) * 2;
+ buf = flb_malloc(buf_size);
+ if (!buf) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Encode header */
+ len = strlen(headers);
+ ret = flb_base64_encode((unsigned char *) buf, buf_size - 1,
+ &olen, (unsigned char *) headers, len);
+ if (ret != 0) {
+ flb_free(buf);
+
+ return ret;
+ }
+
+ /* Create buffer to store JWT */
+ out = flb_sds_create_size(2048);
+ if (!out) {
+ flb_errno();
+ flb_free(buf);
+ return -1;
+ }
+
+ /* Append header */
+ flb_sds_cat(out, buf, olen);
+ flb_sds_cat(out, ".", 1);
+
+ /* Encode Payload */
+ len = strlen(payload);
+ jwt_base64_url_encode((unsigned char *) buf, buf_size,
+ (unsigned char *) payload, len, &olen);
+
+ /* Append Payload */
+ flb_sds_cat(out, buf, olen);
+
+ /* do sha256() of base64(header).base64(payload) */
+ ret = flb_hash_simple(FLB_HASH_SHA256,
+ (unsigned char *) out, flb_sds_len(out),
+ sha256_buf, sizeof(sha256_buf));
+
+ if (ret != FLB_CRYPTO_SUCCESS) {
+ flb_plg_error(ctx->ins, "error hashing token");
+ flb_free(buf);
+ flb_sds_destroy(out);
+ return -1;
+ }
+
+ len = strlen(secret);
+ sig_len = sizeof(sig);
+
+ ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY,
+ FLB_CRYPTO_PADDING_PKCS1,
+ FLB_HASH_SHA256,
+ (unsigned char *) secret, len,
+ sha256_buf, sizeof(sha256_buf),
+ sig, &sig_len);
+
+ if (ret != FLB_CRYPTO_SUCCESS) {
+ flb_plg_error(ctx->ins, "error creating RSA context");
+ flb_free(buf);
+ flb_sds_destroy(out);
+ return -1;
+ }
+
+ sigd = flb_malloc(2048);
+ if (!sigd) {
+ flb_errno();
+ flb_free(buf);
+ flb_sds_destroy(out);
+ return -1;
+ }
+
+ jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen);
+
+ flb_sds_cat(out, ".", 1);
+ flb_sds_cat(out, sigd, olen);
+
+ *out_signature = out;
+ *out_size = flb_sds_len(out);
+
+ flb_free(buf);
+ flb_free(sigd);
+
+ return 0;
+}
+
+/* Create a new oauth2 context and get a oauth2 token */
+static int get_oauth2_token(struct flb_stackdriver *ctx)
+{
+ int ret;
+ char *token;
+ char *sig_data;
+ size_t sig_size;
+ time_t issued;
+ time_t expires;
+ char payload[1024];
+
+ flb_oauth2_payload_clear(ctx->o);
+
+ /* In case of using metadata server, fetch token from there */
+ if (ctx->metadata_server_auth) {
+ return gce_metadata_read_token(ctx);
+ }
+
+ /* JWT encode for oauth2 */
+ issued = time(NULL);
+ expires = issued + FLB_STD_TOKEN_REFRESH;
+
+ snprintf(payload, sizeof(payload) - 1,
+ "{\"iss\": \"%s\", \"scope\": \"%s\", "
+ "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}",
+ ctx->client_email, FLB_STD_SCOPE,
+ FLB_STD_AUTH_URL,
+ expires, issued);
+
+ /* Compose JWT signature */
+ ret = jwt_encode(payload, ctx->private_key, &sig_data, &sig_size, ctx);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "JWT signature generation failed");
+ return -1;
+ }
+ flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data);
+
+ ret = flb_oauth2_payload_append(ctx->o,
+ "grant_type", -1,
+ "urn%3Aietf%3Aparams%3Aoauth%3A"
+ "grant-type%3Ajwt-bearer", -1);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ flb_sds_destroy(sig_data);
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o,
+ "assertion", -1,
+ sig_data, sig_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ flb_sds_destroy(sig_data);
+ return -1;
+ }
+ flb_sds_destroy(sig_data);
+
+ /* Retrieve access token */
+ token = flb_oauth2_token_get(ctx->o);
+ if (!token) {
+ flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
+ return -1;
+ }
+
+ return 0;
+}
+
+static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
+{
+ int ret = 0;
+ flb_sds_t output = NULL;
+ time_t cached_expiration = 0;
+
+ ret = pthread_mutex_trylock(&ctx->token_mutex);
+ if (ret == EBUSY) {
+ /*
+ * If the routine is locked we just use our pre-cached values and
+ * compose the expected authorization value.
+ *
+ * If the routine fails it will return NULL and the caller will just
+ * issue a FLB_RETRY.
+ */
+ output = oauth2_cache_to_token();
+ cached_expiration = oauth2_cache_get_expiration();
+ if (time(NULL) >= cached_expiration) {
+ return output;
+ } else {
+ /*
+ * Cached token is expired. Wait on lock to use up-to-date token
+ * by either waiting for it to be refreshed or refresh it ourselves.
+ */
+ flb_plg_info(ctx->ins, "Cached token is expired. Waiting on lock.");
+ ret = pthread_mutex_lock(&ctx->token_mutex);
+ }
+ }
+
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error locking mutex");
+ return NULL;
+ }
+
+ if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
+ ret = get_oauth2_token(ctx);
+ }
+
+ /* Copy string to prevent race conditions (get_oauth2 can free the string) */
+ if (ret == 0) {
+ /* Update pthread keys cached values */
+ oauth2_cache_set(ctx->o->token_type, ctx->o->access_token, ctx->o->expires);
+
+ /* Compose outgoing buffer using cached values */
+ output = oauth2_cache_to_token();
+ }
+
+ if (pthread_mutex_unlock(&ctx->token_mutex)){
+ flb_plg_error(ctx->ins, "error unlocking mutex");
+ if (output) {
+ flb_sds_destroy(output);
+ }
+ return NULL;
+ }
+
+
+ return output;
+}
+
+void replace_prefix_dot(flb_sds_t s, int tag_prefix_len)
+{
+ int i;
+ int str_len;
+ char c;
+
+ if (!s) {
+ return;
+ }
+
+ str_len = flb_sds_len(s);
+ if (tag_prefix_len > str_len) {
+ flb_error("[output] tag_prefix shouldn't be longer than local_resource_id");
+ return;
+ }
+
+ for (i = 0; i < tag_prefix_len; i++) {
+ c = s[i];
+
+ if (c == '.') {
+ s[i] = '_';
+ }
+ }
+}
+
+static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map,
+ const char *key, int key_size)
+{
+ int i;
+ msgpack_object k;
+ msgpack_object v;
+ flb_sds_t ptr = NULL;
+
+ for (i = 0; i < map.size; i++) {
+ k = map.ptr[i].key;
+ v = map.ptr[i].val;
+
+ if (k.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ if (k.via.str.size == key_size &&
+ strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) {
+ /* make sure to free it after use */
+ ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size);
+ break;
+ }
+ }
+
+ return ptr;
+}
+
+/* parse_monitored_resource is to extract the monitoired resource labels
+ * from "logging.googleapis.com/monitored_resource" in log data
+ * and append to 'resource'/'labels' in log entry.
+ * Monitored resource type is already read from resource field in stackdriver
+ * output plugin configuration parameters.
+ *
+ * The structure of monitored_resource is:
+ * {
+ * "logging.googleapis.com/monitored_resource": {
+ * "labels": {
+ * "resource_label": <label_value>,
+ * }
+ * }
+ * }
+ * See https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types
+ * for required labels for each monitored resource.
+ */
+
+static int parse_monitored_resource(struct flb_stackdriver *ctx, const void *data, size_t bytes, msgpack_packer *mp_pck)
+{
+ int ret = -1;
+ msgpack_object *obj;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ obj = log_event.body;
+
+ msgpack_object_kv *kv = obj->via.map.ptr;
+ msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
+ for (; kv < kvend; ++kv) {
+ if (kv->val.type == MSGPACK_OBJECT_MAP && kv->key.type == MSGPACK_OBJECT_STR
+ && strncmp (MONITORED_RESOURCE_KEY, kv->key.via.str.ptr, kv->key.via.str.size) == 0) {
+ msgpack_object subobj = kv->val;
+ msgpack_object_kv *p = subobj.via.map.ptr;
+ msgpack_object_kv *pend = subobj.via.map.ptr + subobj.via.map.size;
+ for (; p < pend; ++p) {
+ if (p->key.type != MSGPACK_OBJECT_STR || p->val.type != MSGPACK_OBJECT_MAP) {
+ continue;
+ }
+ if (strncmp("labels", p->key.via.str.ptr, p->key.via.str.size) == 0) {
+ msgpack_object labels = p->val;
+ msgpack_object_kv *q = labels.via.map.ptr;
+ msgpack_object_kv *qend = labels.via.map.ptr + labels.via.map.size;
+ int fields = 0;
+ for (; q < qend; ++q) {
+ if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "Key and value should be string in the %s/labels", MONITORED_RESOURCE_KEY);
+ }
+ ++fields;
+ }
+ if (fields > 0) {
+ msgpack_pack_map(mp_pck, fields);
+ q = labels.via.map.ptr;
+ for (; q < qend; ++q) {
+ if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+ flb_plg_debug(ctx->ins, "[%s] found in the payload", MONITORED_RESOURCE_KEY);
+ msgpack_pack_str(mp_pck, q->key.via.str.size);
+ msgpack_pack_str_body(mp_pck, q->key.via.str.ptr, q->key.via.str.size);
+ msgpack_pack_str(mp_pck, q->val.via.str.size);
+ msgpack_pack_str_body(mp_pck, q->val.via.str.ptr, q->val.via.str.size);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return 0;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ flb_plg_debug(ctx->ins, "[%s] not found in the payload", MONITORED_RESOURCE_KEY);
+
+ return ret;
+}
+
+/*
+ * Given a local_resource_id, split the content using the proper separator generating
+ * a linked list to store the spliited string
+ */
+static struct mk_list *parse_local_resource_id_to_list(char *local_resource_id, char *type)
+{
+ int ret = -1;
+ int max_split = -1;
+ int len_k8s_container;
+ int len_k8s_node;
+ int len_k8s_pod;
+ struct mk_list *list;
+
+ len_k8s_container = sizeof(K8S_CONTAINER) - 1;
+ len_k8s_node = sizeof(K8S_NODE) - 1;
+ len_k8s_pod = sizeof(K8S_POD) - 1;
+
+ /* Allocate list head */
+ list = flb_malloc(sizeof(struct mk_list));
+ if (!list) {
+ flb_errno();
+ return NULL;
+ }
+ mk_list_init(list);
+
+ /* Determinate the max split value based on type */
+ if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
+ /* including the prefix of tag */
+ max_split = 4;
+ }
+ else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
+ max_split = 2;
+ }
+ else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
+ max_split = 3;
+ }
+
+ /* The local_resource_id is splitted by '.' */
+ ret = flb_slist_split_string(list, local_resource_id, '.', max_split);
+
+ if (ret == -1 || mk_list_size(list) != max_split) {
+ flb_error("error parsing local_resource_id [%s] for type %s", local_resource_id, type);
+ flb_slist_destroy(list);
+ flb_free(list);
+ return NULL;
+ }
+
+ return list;
+}
+
+/*
+ * extract_local_resource_id():
+ * - extract the value from "logging.googleapis.com/local_resource_id" field
+ * - if local_resource_id is missing from the payLoad, use the tag of the log
+ */
+static int extract_local_resource_id(const void *data, size_t bytes,
+ struct flb_stackdriver *ctx, const char *tag) {
+ msgpack_object_map map;
+ flb_sds_t local_resource_id;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ if ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ map = log_event.body->via.map;
+ local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY,
+ LEN_LOCAL_RESOURCE_ID_KEY);
+
+ if (local_resource_id == NULL) {
+ /* if local_resource_id is not found, use the tag of the log */
+ flb_plg_debug(ctx->ins, "local_resource_id not found, "
+ "tag [%s] is assigned for local_resource_id", tag);
+ local_resource_id = flb_sds_create(tag);
+ }
+
+ /* we need to create up the local_resource_id from previous log */
+ if (ctx->local_resource_id) {
+ flb_sds_destroy(ctx->local_resource_id);
+ }
+
+ ctx->local_resource_id = flb_sds_create(local_resource_id);
+
+ flb_sds_destroy(local_resource_id);
+
+ ret = 0;
+ }
+ else {
+ flb_plg_error(ctx->ins, "failed to unpack data");
+
+ ret = -1;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return ret;
+}
+
+/*
+ * set_monitored_resource_labels():
+ * - use the extracted local_resource_id to assign the label keys for different
+ * resource types that are specified in the configuration of stackdriver_out plugin
+ */
+static int set_monitored_resource_labels(struct flb_stackdriver *ctx, char *type)
+{
+ int ret = -1;
+ int first = FLB_TRUE;
+ int counter = 0;
+ int len_k8s_container;
+ int len_k8s_node;
+ int len_k8s_pod;
+ size_t prefix_len = 0;
+ struct local_resource_id_list *ptr;
+ struct mk_list *list = NULL;
+ struct mk_list *head;
+ flb_sds_t new_local_resource_id;
+
+ if (!ctx->local_resource_id) {
+ flb_plg_error(ctx->ins, "local_resource_is is not assigned");
+ return -1;
+ }
+
+ len_k8s_container = sizeof(K8S_CONTAINER) - 1;
+ len_k8s_node = sizeof(K8S_NODE) - 1;
+ len_k8s_pod = sizeof(K8S_POD) - 1;
+
+ prefix_len = flb_sds_len(ctx->tag_prefix);
+ if (flb_sds_casecmp(ctx->tag_prefix, ctx->local_resource_id, prefix_len) != 0) {
+ flb_plg_error(ctx->ins, "tag_prefix [%s] doesn't match the prefix of"
+ " local_resource_id [%s]", ctx->tag_prefix,
+ ctx->local_resource_id);
+ return -1;
+ }
+
+ new_local_resource_id = flb_sds_create_len(ctx->local_resource_id,
+ flb_sds_len(ctx->local_resource_id));
+ replace_prefix_dot(new_local_resource_id, prefix_len - 1);
+
+ if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
+ list = parse_local_resource_id_to_list(new_local_resource_id, K8S_CONTAINER);
+ if (!list) {
+ goto error;
+ }
+
+ /* iterate through the list */
+ mk_list_foreach(head, list) {
+ ptr = mk_list_entry(head, struct local_resource_id_list, _head);
+ if (first) {
+ first = FLB_FALSE;
+ continue;
+ }
+
+ /* Follow the order of fields in local_resource_id */
+ if (counter == 0) {
+ if (ctx->namespace_name) {
+ flb_sds_destroy(ctx->namespace_name);
+ }
+ ctx->namespace_name = flb_sds_create(ptr->val);
+ }
+ else if (counter == 1) {
+ if (ctx->pod_name) {
+ flb_sds_destroy(ctx->pod_name);
+ }
+ ctx->pod_name = flb_sds_create(ptr->val);
+ }
+ else if (counter == 2) {
+ if (ctx->container_name) {
+ flb_sds_destroy(ctx->container_name);
+ }
+ ctx->container_name = flb_sds_create(ptr->val);
+ }
+
+ counter++;
+ }
+
+ if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) {
+ goto error;
+ }
+ }
+ else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
+ list = parse_local_resource_id_to_list(new_local_resource_id, K8S_NODE);
+ if (!list) {
+ goto error;
+ }
+
+ mk_list_foreach(head, list) {
+ ptr = mk_list_entry(head, struct local_resource_id_list, _head);
+ if (first) {
+ first = FLB_FALSE;
+ continue;
+ }
+
+ if (ptr != NULL) {
+ if (ctx->node_name) {
+ flb_sds_destroy(ctx->node_name);
+ }
+ ctx->node_name = flb_sds_create(ptr->val);
+ }
+ }
+
+ if (!ctx->node_name) {
+ goto error;
+ }
+ }
+ else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
+ list = parse_local_resource_id_to_list(new_local_resource_id, K8S_POD);
+ if (!list) {
+ goto error;
+ }
+
+ mk_list_foreach(head, list) {
+ ptr = mk_list_entry(head, struct local_resource_id_list, _head);
+ if (first) {
+ first = FLB_FALSE;
+ continue;
+ }
+
+ /* Follow the order of fields in local_resource_id */
+ if (counter == 0) {
+ if (ctx->namespace_name) {
+ flb_sds_destroy(ctx->namespace_name);
+ }
+ ctx->namespace_name = flb_sds_create(ptr->val);
+ }
+ else if (counter == 1) {
+ if (ctx->pod_name) {
+ flb_sds_destroy(ctx->pod_name);
+ }
+ ctx->pod_name = flb_sds_create(ptr->val);
+ }
+
+ counter++;
+ }
+
+ if (!ctx->namespace_name || !ctx->pod_name) {
+ goto error;
+ }
+ }
+
+ ret = 0;
+
+ if (list) {
+ flb_slist_destroy(list);
+ flb_free(list);
+ }
+ flb_sds_destroy(new_local_resource_id);
+
+ return ret;
+
+ error:
+ if (list) {
+ flb_slist_destroy(list);
+ flb_free(list);
+ }
+
+ if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) {
+ if (ctx->namespace_name) {
+ flb_sds_destroy(ctx->namespace_name);
+ }
+
+ if (ctx->pod_name) {
+ flb_sds_destroy(ctx->pod_name);
+ }
+
+ if (ctx->container_name) {
+ flb_sds_destroy(ctx->container_name);
+ }
+ }
+ else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) {
+ if (ctx->node_name) {
+ flb_sds_destroy(ctx->node_name);
+ }
+ }
+ else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) {
+ if (ctx->namespace_name) {
+ flb_sds_destroy(ctx->namespace_name);
+ }
+
+ if (ctx->pod_name) {
+ flb_sds_destroy(ctx->pod_name);
+ }
+ }
+
+ flb_sds_destroy(new_local_resource_id);
+ return -1;
+}
+
+static int is_tag_match_regex(struct flb_stackdriver *ctx,
+ const char *tag, int tag_len)
+{
+ int ret;
+ int tag_prefix_len;
+ int len_to_be_matched;
+ const char *tag_str_to_be_matcheds;
+
+ tag_prefix_len = flb_sds_len(ctx->tag_prefix);
+ if (tag_len > tag_prefix_len &&
+ flb_sds_cmp(ctx->tag_prefix, tag, tag_prefix_len) != 0) {
+ return 0;
+ }
+
+ tag_str_to_be_matcheds = tag + tag_prefix_len;
+ len_to_be_matched = tag_len - tag_prefix_len;
+ ret = flb_regex_match(ctx->regex,
+ (unsigned char *) tag_str_to_be_matcheds,
+ len_to_be_matched);
+
+ /* 1 -> match; 0 -> doesn't match; < 0 -> error */
+ return ret;
+}
+
+static int is_local_resource_id_match_regex(struct flb_stackdriver *ctx)
+{
+ int ret;
+ int prefix_len;
+ int len_to_be_matched;
+ const char *str_to_be_matcheds;
+
+ if (!ctx->local_resource_id) {
+ flb_plg_warn(ctx->ins, "local_resource_id not found in the payload");
+ return -1;
+ }
+
+ prefix_len = flb_sds_len(ctx->tag_prefix);
+ str_to_be_matcheds = ctx->local_resource_id + prefix_len;
+ len_to_be_matched = flb_sds_len(ctx->local_resource_id) - prefix_len;
+
+ ret = flb_regex_match(ctx->regex,
+ (unsigned char *) str_to_be_matcheds,
+ len_to_be_matched);
+
+ /* 1 -> match; 0 -> doesn't match; < 0 -> error */
+ return ret;
+}
+
+static void cb_results(const char *name, const char *value,
+ size_t vlen, void *data);
+/*
+ * extract_resource_labels_from_regex(4) will only be called if the
+ * tag or local_resource_id field matches the regex rule
+ */
+static int extract_resource_labels_from_regex(struct flb_stackdriver *ctx,
+ const char *tag, int tag_len,
+ int from_tag)
+{
+ int ret = 1;
+ int prefix_len;
+ int len_to_be_matched;
+ int local_resource_id_len;
+ const char *str_to_be_matcheds;
+ struct flb_regex_search result;
+
+ prefix_len = flb_sds_len(ctx->tag_prefix);
+ if (from_tag == FLB_TRUE) {
+ local_resource_id_len = tag_len;
+ str_to_be_matcheds = tag + prefix_len;
+ }
+ else {
+ // this will be called only if the payload contains local_resource_id
+ local_resource_id_len = flb_sds_len(ctx->local_resource_id);
+ str_to_be_matcheds = ctx->local_resource_id + prefix_len;
+ }
+
+ len_to_be_matched = local_resource_id_len - prefix_len;
+ ret = flb_regex_do(ctx->regex, str_to_be_matcheds, len_to_be_matched, &result);
+ if (ret <= 0) {
+ flb_plg_warn(ctx->ins, "invalid pattern for given value %s when"
+ " extracting resource labels", str_to_be_matcheds);
+ return -1;
+ }
+
+ flb_regex_parse(ctx->regex, &result, cb_results, ctx);
+
+ return ret;
+}
+
+static int process_local_resource_id(struct flb_stackdriver *ctx,
+ const char *tag, int tag_len, char *type)
+{
+ int ret;
+
+ // parsing local_resource_id from tag takes higher priority
+ if (is_tag_match_regex(ctx, tag, tag_len) > 0) {
+ ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_TRUE);
+ }
+ else if (is_local_resource_id_match_regex(ctx) > 0) {
+ ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_FALSE);
+ }
+ else {
+ ret = set_monitored_resource_labels(ctx, type);
+ }
+
+ return ret;
+}
+
+/*
+ * get_payload_labels
+ * - Iterate throught the original payload (obj) and find out the entry that matches
+ * the labels_key
+ * - Used to convert all labels under labels_key to root-level `labels` field
+ */
+static msgpack_object *get_payload_labels(struct flb_stackdriver *ctx, msgpack_object *obj)
+{
+ int i;
+ int len;
+ msgpack_object_kv *kv = NULL;
+
+ if (!obj || obj->type != MSGPACK_OBJECT_MAP) {
+ return NULL;
+ }
+
+ len = flb_sds_len(ctx->labels_key);
+ for (i = 0; i < obj->via.map.size; i++) {
+ kv = &obj->via.map.ptr[i];
+ if (flb_sds_casecmp(ctx->labels_key, kv->key.via.str.ptr, len) == 0) {
+ /* only the first matching entry will be returned */
+ return &kv->val;
+ }
+ }
+
+ //flb_plg_debug(ctx->ins, "labels_key [%s] not found in the payload",
+ // ctx->labels_key);
+ return NULL;
+}
+
+/*
+ * pack_resource_labels():
+ * - Looks through the resource_labels parameter and appends new key value
+ * pair to the log entry.
+ * - Supports field access, plaintext assignment and environment variables.
+ */
+static int pack_resource_labels(struct flb_stackdriver *ctx,
+ struct flb_mp_map_header *mh,
+ msgpack_packer *mp_pck,
+ const void *data,
+ size_t bytes)
+{
+ struct mk_list *head;
+ struct flb_kv *label_kv;
+ struct flb_record_accessor *ra;
+ struct flb_ra_value *rval;
+ int len;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+
+ if (ctx->should_skip_resource_labels_api == FLB_TRUE) {
+ return -1;
+ }
+
+ len = mk_list_size(&ctx->resource_labels_kvs);
+ if (len == 0) {
+ return -1;
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ if ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+
+ flb_mp_map_header_init(mh, mp_pck);
+ mk_list_foreach(head, &ctx->resource_labels_kvs) {
+ label_kv = mk_list_entry(head, struct flb_kv, _head);
+ /*
+ * KVs have the form destination=original, so the original key is the value.
+ * If the value starts with '$', it will be processed using record accessor.
+ * Otherwise, it will be treated as a plaintext assignment.
+ */
+ if (label_kv->val[0] == '$') {
+ ra = flb_ra_create(label_kv->val, FLB_TRUE);
+ rval = flb_ra_get_value_object(ra, *log_event.body);
+
+ if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) {
+ flb_mp_map_header_append(mh);
+ msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
+ msgpack_pack_str_body(mp_pck, label_kv->key,
+ flb_sds_len(label_kv->key));
+ msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string));
+ msgpack_pack_str_body(mp_pck, rval->val.string,
+ flb_sds_len(rval->val.string));
+ flb_ra_key_value_destroy(rval);
+ } else {
+ flb_plg_warn(ctx->ins, "failed to find a corresponding entry for "
+ "resource label entry [%s=%s]", label_kv->key, label_kv->val);
+ }
+ flb_ra_destroy(ra);
+ } else {
+ flb_mp_map_header_append(mh);
+ msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
+ msgpack_pack_str_body(mp_pck, label_kv->key,
+ flb_sds_len(label_kv->key));
+ msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val));
+ msgpack_pack_str_body(mp_pck, label_kv->val,
+ flb_sds_len(label_kv->val));
+ }
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "failed to unpack data");
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return -1;
+ }
+
+ /* project_id should always be packed from config parameter */
+ flb_mp_map_header_append(mh);
+ msgpack_pack_str(mp_pck, 10);
+ msgpack_pack_str_body(mp_pck, "project_id", 10);
+ msgpack_pack_str(mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_mp_map_header_end(mh);
+
+ return 0;
+}
+
+static void pack_labels(struct flb_stackdriver *ctx,
+ msgpack_packer *mp_pck,
+ msgpack_object *payload_labels_ptr)
+{
+ int i;
+ int labels_size = 0;
+ struct mk_list *head;
+ struct flb_kv *list_kv;
+ msgpack_object_kv *obj_kv = NULL;
+
+ /* Determine size of labels map */
+ labels_size = mk_list_size(&ctx->config_labels);
+ if (payload_labels_ptr != NULL &&
+ payload_labels_ptr->type == MSGPACK_OBJECT_MAP) {
+ labels_size += payload_labels_ptr->via.map.size;
+ }
+
+ msgpack_pack_map(mp_pck, labels_size);
+
+ /* pack labels from the payload */
+ if (payload_labels_ptr != NULL &&
+ payload_labels_ptr->type == MSGPACK_OBJECT_MAP) {
+
+ for (i = 0; i < payload_labels_ptr->via.map.size; i++) {
+ obj_kv = &payload_labels_ptr->via.map.ptr[i];
+ msgpack_pack_object(mp_pck, obj_kv->key);
+ msgpack_pack_object(mp_pck, obj_kv->val);
+ }
+ }
+
+ /* pack labels set in configuration */
+ /* in msgpack duplicate keys are overriden by the last set */
+ /* static label keys override payload labels */
+ mk_list_foreach(head, &ctx->config_labels){
+ list_kv = mk_list_entry(head, struct flb_kv, _head);
+ msgpack_pack_str(mp_pck, flb_sds_len(list_kv->key));
+ msgpack_pack_str_body(mp_pck, list_kv->key, flb_sds_len(list_kv->key));
+ msgpack_pack_str(mp_pck, flb_sds_len(list_kv->val));
+ msgpack_pack_str_body(mp_pck, list_kv->val, flb_sds_len(list_kv->val));
+ }
+}
+
+static void cb_results(const char *name, const char *value,
+ size_t vlen, void *data)
+{
+ struct flb_stackdriver *ctx = data;
+
+ if (vlen == 0) {
+ return;
+ }
+
+ if (strcmp(name, "pod_name") == 0) {
+ if (ctx->pod_name != NULL) {
+ flb_sds_destroy(ctx->pod_name);
+ }
+ ctx->pod_name = flb_sds_create_len(value, vlen);
+ }
+ else if (strcmp(name, "namespace_name") == 0) {
+ if (ctx->namespace_name != NULL) {
+ flb_sds_destroy(ctx->namespace_name);
+ }
+ ctx->namespace_name = flb_sds_create_len(value, vlen);
+ }
+ else if (strcmp(name, "container_name") == 0) {
+ if (ctx->container_name != NULL) {
+ flb_sds_destroy(ctx->container_name);
+ }
+ ctx->container_name = flb_sds_create_len(value, vlen);
+ }
+ else if (strcmp(name, "node_name") == 0) {
+ if (ctx->node_name != NULL) {
+ flb_sds_destroy(ctx->node_name);
+ }
+ ctx->node_name = flb_sds_create_len(value, vlen);
+ }
+
+ return;
+}
+
+int flb_stackdriver_regex_init(struct flb_stackdriver *ctx)
+{
+ /* If a custom regex is not set, use the defaults */
+ ctx->regex = flb_regex_create(ctx->custom_k8s_regex);
+ if (!ctx->regex) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int cb_stackdriver_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ int io_flags = FLB_IO_TLS;
+ char *token;
+ struct flb_stackdriver *ctx;
+
+ /* Create config context */
+ ctx = flb_stackdriver_conf_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "configuration failed");
+ return -1;
+ }
+
+ /* Load config map */
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* Set context */
+ flb_output_set_context(ins, ctx);
+
+ /* Network mode IPv6 */
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ /* Initialize oauth2 cache pthread keys */
+ oauth2_cache_init();
+
+ /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */
+ pthread_mutex_init(&ctx->token_mutex, NULL);
+
+ /* Create Upstream context for Stackdriver Logging (no oauth2 service) */
+ ctx->u = flb_upstream_create_url(config, FLB_STD_WRITE_URL,
+ io_flags, ins->tls);
+ ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_server,
+ FLB_IO_TCP, NULL);
+
+ /* Create oauth2 context */
+ ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000);
+
+ if (!ctx->u) {
+ flb_plg_error(ctx->ins, "upstream creation failed");
+ return -1;
+ }
+ if (!ctx->metadata_u) {
+ flb_plg_error(ctx->ins, "metadata upstream creation failed");
+ return -1;
+ }
+ if (!ctx->o) {
+ flb_plg_error(ctx->ins, "cannot create oauth2 context");
+ return -1;
+ }
+ flb_output_upstream_set(ctx->u, ins);
+
+ /* Metadata Upstream Sync flags */
+ flb_stream_disable_async_mode(&ctx->metadata_u->base);
+
+ if (ins->test_mode == FLB_FALSE) {
+ /* Retrieve oauth2 token */
+ token = get_google_token(ctx);
+ if (!token) {
+ flb_plg_warn(ctx->ins, "token retrieval failed");
+ }
+ else {
+ flb_sds_destroy(token);
+ }
+ }
+
+ if (ctx->metadata_server_auth) {
+ ret = gce_metadata_read_project_id(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
+ && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) {
+ ret = gce_metadata_read_zone(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ ret = gce_metadata_read_instance_id(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+ }
+
+ /* Validate project_id */
+ if (!ctx->project_id) {
+ flb_plg_error(ctx->ins, "property 'project_id' is not set");
+ return -1;
+ }
+
+ if (!ctx->export_to_project_id) {
+ ctx->export_to_project_id = ctx->project_id;
+ }
+
+ ret = flb_stackdriver_regex_init(ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "failed to init stackdriver custom regex");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int validate_severity_level(severity_t * s,
+ const char * str,
+ const unsigned int str_size)
+{
+ int i = 0;
+
+ const static struct {
+ severity_t s;
+ const unsigned int str_size;
+ const char * str;
+ } enum_mapping[] = {
+ {FLB_STD_EMERGENCY, 9, "EMERGENCY"},
+ {FLB_STD_EMERGENCY, 5, "EMERG" },
+
+ {FLB_STD_ALERT , 1, "A" },
+ {FLB_STD_ALERT , 5, "ALERT" },
+
+ {FLB_STD_CRITICAL , 1, "C" },
+ {FLB_STD_CRITICAL , 1, "F" },
+ {FLB_STD_CRITICAL , 4, "CRIT" },
+ {FLB_STD_CRITICAL , 5, "FATAL" },
+ {FLB_STD_CRITICAL , 8, "CRITICAL" },
+
+ {FLB_STD_ERROR , 1, "E" },
+ {FLB_STD_ERROR , 3, "ERR" },
+ {FLB_STD_ERROR , 5, "ERROR" },
+ {FLB_STD_ERROR , 6, "SEVERE" },
+
+ {FLB_STD_WARNING , 1, "W" },
+ {FLB_STD_WARNING , 4, "WARN" },
+ {FLB_STD_WARNING , 7, "WARNING" },
+
+ {FLB_STD_NOTICE , 1, "N" },
+ {FLB_STD_NOTICE , 6, "NOTICE" },
+
+ {FLB_STD_INFO , 1, "I" },
+ {FLB_STD_INFO , 4, "INFO" },
+
+ {FLB_STD_DEBUG , 1, "D" },
+ {FLB_STD_DEBUG , 5, "DEBUG" },
+ {FLB_STD_DEBUG , 5, "TRACE" },
+ {FLB_STD_DEBUG , 9, "TRACE_INT"},
+ {FLB_STD_DEBUG , 4, "FINE" },
+ {FLB_STD_DEBUG , 5, "FINER" },
+ {FLB_STD_DEBUG , 6, "FINEST" },
+ {FLB_STD_DEBUG , 6, "CONFIG" },
+
+ {FLB_STD_DEFAULT , 7, "DEFAULT" }
+ };
+
+ for (i = 0; i < sizeof (enum_mapping) / sizeof (enum_mapping[0]); ++i) {
+ if (enum_mapping[i].str_size != str_size) {
+ continue;
+ }
+
+ if (strncasecmp(str, enum_mapping[i].str, str_size) == 0) {
+ *s = enum_mapping[i].s;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o,
+ const flb_sds_t key, const int key_size,
+ msgpack_object_type type)
+{
+ int i = 0;
+ msgpack_object_kv * p = NULL;
+
+ if (o == NULL || subobj == NULL) {
+ return -1;
+ }
+
+ for (i = 0; i < o->via.map.size; i++) {
+ p = &o->via.map.ptr[i];
+ if (p->val.type != type) {
+ continue;
+ }
+
+ if (flb_sds_cmp(key, p->key.via.str.ptr, p->key.via.str.size) == 0) {
+ *subobj = p->val;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key)
+{
+ msgpack_object tmp;
+ if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) {
+ *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size);
+ return 0;
+ }
+
+ *s = 0;
+ return -1;
+}
+
+static int get_severity_level(severity_t * s, const msgpack_object * o,
+ const flb_sds_t key)
+{
+ msgpack_object tmp;
+ if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0
+ && validate_severity_level(s, tmp.via.str.ptr, tmp.via.str.size) == 0) {
+ return 0;
+ }
+ *s = 0;
+ return -1;
+}
+
+static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * src_obj,
+ const flb_sds_t key)
+{
+ msgpack_object tmp;
+ int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN);
+
+ if (ret == 0 && tmp.via.boolean == true) {
+ *trace_sampled_value = FLB_TRUE;
+ return 0;
+ } else if (ret == 0 && tmp.via.boolean == false) {
+ *trace_sampled_value = FLB_FALSE;
+ return 0;
+ }
+
+ return -1;
+}
+
+static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
+ const msgpack_object * obj)
+{
+ int i = 0;
+ msgpack_object_kv * p = NULL;
+ insert_id_status ret = INSERTID_NOT_PRESENT;
+
+ if (obj == NULL) {
+ return ret;
+ }
+
+ for (i = 0; i < obj->via.map.size; i++) {
+ p = &obj->via.map.ptr[i];
+ if (p->key.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+ if (validate_key(p->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
+ if (p->val.type == MSGPACK_OBJECT_STR && p->val.via.str.size > 0) {
+ *insert_id_value = p->val;
+ ret = INSERTID_VALID;
+ }
+ else {
+ ret = INSERTID_INVALID;
+ }
+ break;
+ }
+ }
+ return ret;
+}
+
+static int pack_json_payload(int insert_id_extracted,
+ int operation_extracted, int operation_extra_size,
+ int source_location_extracted,
+ int source_location_extra_size,
+ int http_request_extracted,
+ int http_request_extra_size,
+ timestamp_status tms_status,
+ msgpack_packer *mp_pck, msgpack_object *obj,
+ struct flb_stackdriver *ctx)
+{
+ /* Specified fields include local_resource_id, operation, sourceLocation ... */
+ int i, j;
+ int to_remove = 0;
+ int ret;
+ int map_size;
+ int new_map_size;
+ int len;
+ int len_to_be_removed;
+ int key_not_found;
+ flb_sds_t removed;
+ flb_sds_t monitored_resource_key;
+ flb_sds_t local_resource_id_key;
+ flb_sds_t stream;
+ msgpack_object_kv *kv = obj->via.map.ptr;
+ msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;
+
+ monitored_resource_key = flb_sds_create(MONITORED_RESOURCE_KEY);
+ local_resource_id_key = flb_sds_create(LOCAL_RESOURCE_ID_KEY);
+ stream = flb_sds_create("stream");
+ /*
+ * array of elements that need to be removed from payload,
+ * special field 'operation' will be processed individually
+ */
+ flb_sds_t to_be_removed[] =
+ {
+ monitored_resource_key,
+ local_resource_id_key,
+ ctx->labels_key,
+ ctx->severity_key,
+ ctx->trace_key,
+ ctx->span_id_key,
+ ctx->trace_sampled_key,
+ ctx->log_name_key,
+ stream
+ /* more special fields are required to be added, but, if this grows with more
+ than a few records, it might need to be converted to flb_hash
+ */
+ };
+
+ if (insert_id_extracted == FLB_TRUE) {
+ to_remove += 1;
+ }
+ if (operation_extracted == FLB_TRUE && operation_extra_size == 0) {
+ to_remove += 1;
+ }
+ if (source_location_extracted == FLB_TRUE && source_location_extra_size == 0) {
+ to_remove += 1;
+ }
+ if (http_request_extracted == FLB_TRUE && http_request_extra_size == 0) {
+ to_remove += 1;
+ }
+ if (tms_status == FORMAT_TIMESTAMP_OBJECT) {
+ to_remove += 1;
+ }
+ if (tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
+ to_remove += 2;
+ }
+
+ map_size = obj->via.map.size;
+ len_to_be_removed = sizeof(to_be_removed) / sizeof(to_be_removed[0]);
+ for (i = 0; i < map_size; i++) {
+ kv = &obj->via.map.ptr[i];
+ len = kv->key.via.str.size;
+ for (j = 0; j < len_to_be_removed; j++) {
+ removed = to_be_removed[j];
+ /*
+ * check length of key to avoid partial matching
+ * e.g. labels key = labels && kv->key = labelss
+ */
+ if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
+ to_remove += 1;
+ break;
+ }
+ }
+ }
+
+ new_map_size = map_size - to_remove;
+
+ ret = msgpack_pack_map(mp_pck, new_map_size);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* points back to the beginning of map */
+ kv = obj->via.map.ptr;
+ for(; kv != kvend; ++kv ) {
+ key_not_found = 1;
+
+ /* processing logging.googleapis.com/insertId */
+ if (insert_id_extracted == FLB_TRUE
+ && validate_key(kv->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) {
+ continue;
+ }
+
+ /* processing logging.googleapis.com/operation */
+ if (validate_key(kv->key, OPERATION_FIELD_IN_JSON,
+ OPERATION_KEY_SIZE)
+ && kv->val.type == MSGPACK_OBJECT_MAP) {
+ if (operation_extra_size > 0) {
+ msgpack_pack_object(mp_pck, kv->key);
+ pack_extra_operation_subfields(mp_pck, &kv->val, operation_extra_size);
+ }
+ continue;
+ }
+
+ if (validate_key(kv->key, SOURCELOCATION_FIELD_IN_JSON,
+ SOURCE_LOCATION_SIZE)
+ && kv->val.type == MSGPACK_OBJECT_MAP) {
+
+ if (source_location_extra_size > 0) {
+ msgpack_pack_object(mp_pck, kv->key);
+ pack_extra_source_location_subfields(mp_pck, &kv->val,
+ source_location_extra_size);
+ }
+ continue;
+ }
+
+ if (validate_key(kv->key, ctx->http_request_key,
+ ctx->http_request_key_size)
+ && kv->val.type == MSGPACK_OBJECT_MAP) {
+
+ if(http_request_extra_size > 0) {
+ msgpack_pack_object(mp_pck, kv->key);
+ pack_extra_http_request_subfields(mp_pck, &kv->val,
+ http_request_extra_size);
+ }
+ continue;
+ }
+
+ if (validate_key(kv->key, "timestamp", 9)
+ && tms_status == FORMAT_TIMESTAMP_OBJECT) {
+ continue;
+ }
+
+ if (validate_key(kv->key, "timestampSeconds", 16)
+ && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
+ continue;
+ }
+ if (validate_key(kv->key, "timestampNanos", 14)
+ && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) {
+ continue;
+ }
+
+ len = kv->key.via.str.size;
+ for (j = 0; j < len_to_be_removed; j++) {
+ removed = to_be_removed[j];
+ if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) {
+ key_not_found = 0;
+ break;
+ }
+ }
+
+ if (key_not_found) {
+ ret = msgpack_pack_object(mp_pck, kv->key);
+ if (ret < 0) {
+ goto error;
+ }
+ ret = msgpack_pack_object(mp_pck, kv->val);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+ }
+
+ flb_sds_destroy(monitored_resource_key);
+ flb_sds_destroy(local_resource_id_key);
+ flb_sds_destroy(stream);
+ return 0;
+
+ error:
+ flb_sds_destroy(monitored_resource_key);
+ flb_sds_destroy(local_resource_id_key);
+ flb_sds_destroy(stream);
+ return ret;
+}
+
+static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
+ int total_records,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes)
+{
+ int len;
+ int ret;
+ int array_size = 0;
+ /* The default value is 3: timestamp, jsonPayload, logName. */
+ int entry_size = 3;
+ size_t s;
+ // size_t off = 0;
+ char path[PATH_MAX];
+ char time_formatted[255];
+ const char *newtag;
+ const char *new_log_name;
+ msgpack_object *obj;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ flb_sds_t out_buf;
+ struct flb_mp_map_header mh;
+
+ /* Parameters for severity */
+ int severity_extracted = FLB_FALSE;
+ severity_t severity;
+
+ /* Parameters for trace */
+ int trace_extracted = FLB_FALSE;
+ flb_sds_t trace;
+ char stackdriver_trace[PATH_MAX];
+ const char *new_trace;
+
+ /* Parameters for span id */
+ int span_id_extracted = FLB_FALSE;
+ flb_sds_t span_id;
+
+ /* Parameters for trace sampled */
+ int trace_sampled_extracted = FLB_FALSE;
+ int trace_sampled = FLB_FALSE;
+
+ /* Parameters for log name */
+ int log_name_extracted = FLB_FALSE;
+ flb_sds_t log_name = NULL;
+ flb_sds_t stream = NULL;
+ flb_sds_t stream_key;
+
+ /* Parameters for insertId */
+ msgpack_object insert_id_obj;
+ insert_id_status in_status;
+ int insert_id_extracted;
+
+ /* Parameters in Operation */
+ flb_sds_t operation_id;
+ flb_sds_t operation_producer;
+ int operation_first = FLB_FALSE;
+ int operation_last = FLB_FALSE;
+ int operation_extracted = FLB_FALSE;
+ int operation_extra_size = 0;
+
+ /* Parameters for sourceLocation */
+ flb_sds_t source_location_file;
+ int64_t source_location_line = 0;
+ flb_sds_t source_location_function;
+ int source_location_extracted = FLB_FALSE;
+ int source_location_extra_size = 0;
+
+ /* Parameters for httpRequest */
+ struct http_request_field http_request;
+ int http_request_extracted = FLB_FALSE;
+ int http_request_extra_size = 0;
+
+ /* Parameters for Timestamp */
+ struct tm tm;
+ // struct flb_time tms;
+ timestamp_status tms_status;
+ /* Count number of records */
+ array_size = total_records;
+
+ /* Parameters for labels */
+ msgpack_object *payload_labels_ptr;
+ int labels_size = 0;
+
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return NULL;
+ }
+
+ /*
+ * Search each entry and validate insertId.
+ * Reject the entry if insertId is invalid.
+ * If all the entries are rejected, stop formatting.
+ *
+ */
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ /* Extract insertId */
+ in_status = validate_insert_id(&insert_id_obj, log_event.body);
+
+ if (in_status == INSERTID_INVALID) {
+ flb_plg_error(ctx->ins,
+ "Incorrect insertId received. InsertId should be non-empty string.");
+ array_size -= 1;
+ }
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ /* Sounds like this should compare to -1 instead of zero */
+ if (array_size == 0) {
+ return NULL;
+ }
+
+ /* Create temporal msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /*
+ * Pack root map (resource & entries):
+ *
+ * {"resource": {"type": "...", "labels": {...},
+ * "entries": []
+ */
+ msgpack_pack_map(&mp_pck, 2);
+
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "resource", 8);
+
+ /* type & labels */
+ msgpack_pack_map(&mp_pck, 2);
+
+ /* type */
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "type", 4);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->resource));
+ msgpack_pack_str_body(&mp_pck, ctx->resource,
+ flb_sds_len(ctx->resource));
+
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "labels", 6);
+
+ ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes);
+ if (ret != 0) {
+ if (ctx->resource_type == RESOURCE_TYPE_K8S) {
+ ret = extract_local_resource_id(data, bytes, ctx, tag);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "fail to construct local_resource_id");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return NULL;
+ }
+ }
+ ret = parse_monitored_resource(ctx, data, bytes, &mp_pck);
+ if (ret != 0) {
+ if (strcmp(ctx->resource, "global") == 0) {
+ /* global resource has field project_id */
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "project_id", 10);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+ }
+ else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE
+ || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) {
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE && ctx->node_id) {
+ /* generic_node has fields project_id, location, namespace, node_id */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "node_id", 7);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->node_id, flb_sds_len(ctx->node_id));
+ }
+ else {
+ /* generic_task has fields project_id, location, namespace, job, task_id */
+ if (ctx->job) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 3);
+ msgpack_pack_str_body(&mp_pck, "job", 3);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->job));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->job, flb_sds_len(ctx->job));
+ }
+
+ if (ctx->task_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "task_id", 7);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->task_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->task_id, flb_sds_len(ctx->task_id));
+ }
+ }
+
+ if (ctx->project_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "project_id", 10);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+ }
+
+ if (ctx->location) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "location", 8);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->location));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->location, flb_sds_len(ctx->location));
+ }
+
+ if (ctx->namespace_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "namespace", 9);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->namespace_id, flb_sds_len(ctx->namespace_id));
+ }
+
+ flb_mp_map_header_end(&mh);
+ }
+ else if (strcmp(ctx->resource, "gce_instance") == 0) {
+ /* gce_instance resource has fields project_id, zone, instance_id */
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ if (ctx->project_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "project_id", 10);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+ }
+
+ if (ctx->zone) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "zone", 4);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->zone));
+ msgpack_pack_str_body(&mp_pck, ctx->zone, flb_sds_len(ctx->zone));
+ }
+
+ if (ctx->instance_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 11);
+ msgpack_pack_str_body(&mp_pck, "instance_id", 11);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->instance_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->instance_id, flb_sds_len(ctx->instance_id));
+ }
+ flb_mp_map_header_end(&mh);
+ }
+ else if (strcmp(ctx->resource, K8S_CONTAINER) == 0) {
+ /* k8s_container resource has fields project_id, location, cluster_name,
+ * namespace_name, pod_name, container_name
+ *
+ * The local_resource_id for k8s_container is in format:
+ * k8s_container.<namespace_name>.<pod_name>.<container_name>
+ */
+
+ ret = process_local_resource_id(ctx, tag, tag_len, K8S_CONTAINER);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "fail to extract resource labels "
+ "for k8s_container resource type");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return NULL;
+ }
+
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ if (ctx->project_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "project_id", 10);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+ }
+
+ if (ctx->cluster_location) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "location", 8);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->cluster_location,
+ flb_sds_len(ctx->cluster_location));
+ }
+
+ if (ctx->cluster_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 12);
+ msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->cluster_name, flb_sds_len(ctx->cluster_name));
+ }
+
+ if (ctx->namespace_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->namespace_name,
+ flb_sds_len(ctx->namespace_name));
+ }
+
+ if (ctx->pod_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "pod_name", 8);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->pod_name, flb_sds_len(ctx->pod_name));
+ }
+
+ if (ctx->container_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "container_name", 14);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->container_name,
+ flb_sds_len(ctx->container_name));
+ }
+
+ flb_mp_map_header_end(&mh);
+ }
+ else if (strcmp(ctx->resource, K8S_NODE) == 0) {
+ /* k8s_node resource has fields project_id, location, cluster_name, node_name
+ *
+ * The local_resource_id for k8s_node is in format:
+ * k8s_node.<node_name>
+ */
+
+ ret = process_local_resource_id(ctx, tag, tag_len, K8S_NODE);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "fail to process local_resource_id from "
+ "log entry for k8s_node");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return NULL;
+ }
+
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ if (ctx->project_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "project_id", 10);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+ }
+
+ if (ctx->cluster_location) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "location", 8);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->cluster_location,
+ flb_sds_len(ctx->cluster_location));
+ }
+
+ if (ctx->cluster_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 12);
+ msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->cluster_name, flb_sds_len(ctx->cluster_name));
+ }
+
+ if (ctx->node_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "node_name", 9);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->node_name, flb_sds_len(ctx->node_name));
+ }
+
+ flb_mp_map_header_end(&mh);
+ }
+ else if (strcmp(ctx->resource, K8S_POD) == 0) {
+ /* k8s_pod resource has fields project_id, location, cluster_name,
+ * namespace_name, pod_name.
+ *
+ * The local_resource_id for k8s_pod is in format:
+ * k8s_pod.<namespace_name>.<pod_name>
+ */
+
+ ret = process_local_resource_id(ctx, tag, tag_len, K8S_POD);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "fail to process local_resource_id from "
+ "log entry for k8s_pod");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return NULL;
+ }
+
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ if (ctx->project_id) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 10);
+ msgpack_pack_str_body(&mp_pck, "project_id", 10);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->project_id, flb_sds_len(ctx->project_id));
+ }
+
+ if (ctx->cluster_location) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "location", 8);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->cluster_location,
+ flb_sds_len(ctx->cluster_location));
+ }
+
+ if (ctx->cluster_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 12);
+ msgpack_pack_str_body(&mp_pck, "cluster_name", 12);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->cluster_name, flb_sds_len(ctx->cluster_name));
+ }
+
+ if (ctx->namespace_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->namespace_name,
+ flb_sds_len(ctx->namespace_name));
+ }
+
+ if (ctx->pod_name) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "pod_name", 8);
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->pod_name, flb_sds_len(ctx->pod_name));
+ }
+
+ flb_mp_map_header_end(&mh);
+ }
+ else {
+ flb_plg_error(ctx->ins, "unsupported resource type '%s'",
+ ctx->resource);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return NULL;
+ }
+ }
+ }
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "entries", 7);
+
+ /* Append entries */
+ msgpack_pack_array(&mp_pck, array_size);
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ return NULL;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ obj = log_event.body;
+ tms_status = extract_timestamp(obj, &log_event.timestamp);
+
+ /*
+ * Pack entry
+ *
+ * {
+ * "severity": "...",
+ * "labels": "...",
+ * "logName": "...",
+ * "jsonPayload": {...},
+ * "timestamp": "...",
+ * "spanId": "...",
+ * "traceSampled": <true or false>,
+ * "trace": "..."
+ * }
+ */
+ entry_size = 3;
+
+ /* Extract severity */
+ severity_extracted = FLB_FALSE;
+ if (ctx->severity_key
+ && get_severity_level(&severity, obj, ctx->severity_key) == 0) {
+ severity_extracted = FLB_TRUE;
+ entry_size += 1;
+ }
+
+ /* Extract trace */
+ trace_extracted = FLB_FALSE;
+ if (ctx->trace_key
+ && get_string(&trace, obj, ctx->trace_key) == 0) {
+ trace_extracted = FLB_TRUE;
+ entry_size += 1;
+ }
+
+ /* Extract span id */
+ span_id_extracted = FLB_FALSE;
+ if (ctx->span_id_key
+ && get_string(&span_id, obj, ctx->span_id_key) == 0) {
+ span_id_extracted = FLB_TRUE;
+ entry_size += 1;
+ }
+
+ /* Extract trace sampled */
+ trace_sampled_extracted = FLB_FALSE;
+ if (ctx->trace_sampled_key
+ && get_trace_sampled(&trace_sampled, obj, ctx->trace_sampled_key) == 0) {
+ trace_sampled_extracted = FLB_TRUE;
+ entry_size += 1;
+ }
+
+ /* Extract log name */
+ log_name_extracted = FLB_FALSE;
+ if (ctx->log_name_key
+ && get_string(&log_name, obj, ctx->log_name_key) == 0) {
+ log_name_extracted = FLB_TRUE;
+ }
+
+ /* Extract insertId */
+ in_status = validate_insert_id(&insert_id_obj, obj);
+ if (in_status == INSERTID_VALID) {
+ insert_id_extracted = FLB_TRUE;
+ entry_size += 1;
+ }
+ else if (in_status == INSERTID_NOT_PRESENT) {
+ insert_id_extracted = FLB_FALSE;
+ }
+ else {
+ if (log_name_extracted == FLB_TRUE) {
+ flb_sds_destroy(log_name);
+ }
+ continue;
+ }
+
+ /* Extract operation */
+ operation_id = flb_sds_create("");
+ operation_producer = flb_sds_create("");
+ operation_first = FLB_FALSE;
+ operation_last = FLB_FALSE;
+ operation_extra_size = 0;
+ operation_extracted = extract_operation(&operation_id, &operation_producer,
+ &operation_first, &operation_last, obj,
+ &operation_extra_size);
+
+ if (operation_extracted == FLB_TRUE) {
+ entry_size += 1;
+ }
+
+ /* Extract sourceLocation */
+ source_location_file = flb_sds_create("");
+ source_location_line = 0;
+ source_location_function = flb_sds_create("");
+ source_location_extra_size = 0;
+ source_location_extracted = extract_source_location(&source_location_file,
+ &source_location_line,
+ &source_location_function,
+ obj,
+ &source_location_extra_size);
+
+ if (source_location_extracted == FLB_TRUE) {
+ entry_size += 1;
+ }
+
+ /* Extract httpRequest */
+ init_http_request(&http_request);
+ http_request_extra_size = 0;
+ http_request_extracted = extract_http_request(&http_request,
+ ctx->http_request_key,
+ ctx->http_request_key_size,
+ obj, &http_request_extra_size);
+ if (http_request_extracted == FLB_TRUE) {
+ entry_size += 1;
+ }
+
+ /* Extract payload labels */
+ payload_labels_ptr = get_payload_labels(ctx, obj);
+ if (payload_labels_ptr != NULL &&
+ payload_labels_ptr->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "the type of payload labels should be map");
+ flb_sds_destroy(operation_id);
+ flb_sds_destroy(operation_producer);
+ flb_log_event_decoder_destroy(&log_decoder);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return NULL;
+ }
+
+ /* Number of parsed labels */
+ labels_size = mk_list_size(&ctx->config_labels);
+ if (payload_labels_ptr != NULL &&
+ payload_labels_ptr->type == MSGPACK_OBJECT_MAP) {
+ labels_size += payload_labels_ptr->via.map.size;
+ }
+
+ if (labels_size > 0) {
+ entry_size += 1;
+ }
+
+ msgpack_pack_map(&mp_pck, entry_size);
+
+ /* Add severity into the log entry */
+ if (severity_extracted == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "severity", 8);
+ msgpack_pack_int(&mp_pck, severity);
+ }
+
+ /* Add trace into the log entry */
+ if (trace_extracted == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, 5);
+ msgpack_pack_str_body(&mp_pck, "trace", 5);
+
+ if (ctx->autoformat_stackdriver_trace) {
+ len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1,
+ "projects/%s/traces/%s", ctx->project_id, trace);
+ new_trace = stackdriver_trace;
+ }
+ else {
+ len = flb_sds_len(trace);
+ new_trace = trace;
+ }
+
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, new_trace, len);
+ flb_sds_destroy(trace);
+ }
+
+ /* Add spanId field into the log entry */
+ if (span_id_extracted == FLB_TRUE) {
+ msgpack_pack_str_with_body(&mp_pck, "spanId", 6);
+ len = flb_sds_len(span_id);
+ msgpack_pack_str_with_body(&mp_pck, span_id, len);
+ flb_sds_destroy(span_id);
+ }
+
+ /* Add traceSampled field into the log entry */
+ if (trace_sampled_extracted == FLB_TRUE) {
+ msgpack_pack_str_with_body(&mp_pck, "traceSampled", 12);
+
+ if (trace_sampled == FLB_TRUE) {
+ msgpack_pack_true(&mp_pck);
+ } else {
+ msgpack_pack_false(&mp_pck);
+ }
+
+ }
+
+ /* Add insertId field into the log entry */
+ if (insert_id_extracted == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "insertId", 8);
+ msgpack_pack_object(&mp_pck, insert_id_obj);
+ }
+
+ /* Add operation field into the log entry */
+ if (operation_extracted == FLB_TRUE) {
+ add_operation_field(&operation_id, &operation_producer,
+ &operation_first, &operation_last, &mp_pck);
+ }
+
+ /* Add sourceLocation field into the log entry */
+ if (source_location_extracted == FLB_TRUE) {
+ add_source_location_field(&source_location_file, source_location_line,
+ &source_location_function, &mp_pck);
+ }
+
+ /* Add httpRequest field into the log entry */
+ if (http_request_extracted == FLB_TRUE) {
+ add_http_request_field(&http_request, &mp_pck);
+ }
+
+ /* labels */
+ if (labels_size > 0) {
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "labels", 6);
+ pack_labels(ctx, &mp_pck, payload_labels_ptr);
+ }
+
+ /* Clean up id and producer if operation extracted */
+ flb_sds_destroy(operation_id);
+ flb_sds_destroy(operation_producer);
+ flb_sds_destroy(source_location_file);
+ flb_sds_destroy(source_location_function);
+ destroy_http_request(&http_request);
+
+ /* jsonPayload */
+ msgpack_pack_str(&mp_pck, 11);
+ msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
+ pack_json_payload(insert_id_extracted,
+ operation_extracted, operation_extra_size,
+ source_location_extracted,
+ source_location_extra_size,
+ http_request_extracted,
+ http_request_extra_size,
+ tms_status,
+ &mp_pck, obj, ctx);
+
+ /* avoid modifying the original tag */
+ newtag = tag;
+ stream_key = flb_sds_create("stream");
+ if (ctx->resource_type == RESOURCE_TYPE_K8S
+ && get_string(&stream, obj, stream_key) == 0) {
+ if (flb_sds_cmp(stream, STDOUT, flb_sds_len(stream)) == 0) {
+ newtag = "stdout";
+ }
+ else if (flb_sds_cmp(stream, STDERR, flb_sds_len(stream)) == 0) {
+ newtag = "stderr";
+ }
+ }
+
+ if (log_name_extracted == FLB_FALSE) {
+ new_log_name = newtag;
+ }
+ else {
+ new_log_name = log_name;
+ }
+
+ /* logName */
+ len = snprintf(path, sizeof(path) - 1,
+ "projects/%s/logs/%s", ctx->export_to_project_id, new_log_name);
+
+ if (log_name_extracted == FLB_TRUE) {
+ flb_sds_destroy(log_name);
+ }
+
+ msgpack_pack_str(&mp_pck, 7);
+ msgpack_pack_str_body(&mp_pck, "logName", 7);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, path, len);
+ flb_sds_destroy(stream_key);
+ flb_sds_destroy(stream);
+
+ /* timestamp */
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "timestamp", 9);
+
+ /* Format the time */
+ /*
+ * If format is timestamp_object or timestamp_duo_fields,
+ * tms has been updated.
+ *
+ * If timestamp is not presen,
+ * use the default tms(current time).
+ */
+
+ gmtime_r(&log_event.timestamp.tm.tv_sec, &tm);
+ s = strftime(time_formatted, sizeof(time_formatted) - 1,
+ FLB_STD_TIME_FMT, &tm);
+ len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
+ ".%09" PRIu64 "Z",
+ (uint64_t) log_event.timestamp.tm.tv_nsec);
+ s += len;
+
+ msgpack_pack_str(&mp_pck, s);
+ msgpack_pack_str_body(&mp_pck, time_formatted, s);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ /* Convert from msgpack to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ if (!out_buf) {
+ flb_plg_error(ctx->ins, "error formatting JSON payload");
+ return NULL;
+ }
+
+ return out_buf;
+}
+
+static int stackdriver_format_test(struct flb_config *config,
+ struct flb_input_instance *ins,
+ void *plugin_context,
+ void *flush_ctx,
+ int event_type,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_data, size_t *out_size)
+{
+ int total_records;
+ flb_sds_t payload = NULL;
+ struct flb_stackdriver *ctx = plugin_context;
+
+ /* Count number of records */
+ total_records = flb_mp_count(data, bytes);
+
+ payload = stackdriver_format(ctx, total_records,
+ (char *) tag, tag_len, data, bytes);
+ if (payload == NULL) {
+ return -1;
+ }
+
+ *out_data = payload;
+ *out_size = flb_sds_len(payload);
+
+ return 0;
+
+}
+
+#ifdef FLB_HAVE_METRICS
+static void update_http_metrics(struct flb_stackdriver *ctx,
+ struct flb_event_chunk *event_chunk,
+ uint64_t ts,
+ int http_status)
+{
+ char tmp[32];
+
+ /* convert status to string format */
+ snprintf(tmp, sizeof(tmp) - 1, "%i", http_status);
+ char *name = (char *) flb_output_name(ctx->ins);
+
+ /* processed records total */
+ cmt_counter_add(ctx->cmt_proc_records_total, ts, event_chunk->total_events,
+ 2, (char *[]) {tmp, name});
+
+ /* HTTP status */
+ if (http_status != STACKDRIVER_NET_ERROR) {
+ cmt_counter_inc(ctx->cmt_requests_total, ts, 2, (char *[]) {tmp, name});
+ }
+}
+
+static void update_retry_metric(struct flb_stackdriver *ctx,
+ struct flb_event_chunk *event_chunk,
+ uint64_t ts,
+ int http_status, int ret_code)
+{
+ char tmp[32];
+ char *name = (char *) flb_output_name(ctx->ins);
+
+ if (ret_code != FLB_RETRY) {
+ return;
+ }
+
+ /* convert status to string format */
+ snprintf(tmp, sizeof(tmp) - 1, "%i", http_status);
+ cmt_counter_add(ctx->cmt_retried_records_total,
+ ts, event_chunk->total_events, 2, (char *[]) {tmp, name});
+
+}
+#endif
+
+static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ (void) i_ins;
+ (void) config;
+ int ret;
+ int ret_code = FLB_RETRY;
+ size_t b_sent;
+ flb_sds_t token;
+ flb_sds_t payload_buf;
+ void *compressed_payload_buffer = NULL;
+ size_t compressed_payload_size;
+ struct flb_stackdriver *ctx = out_context;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ int compressed = FLB_FALSE;
+#ifdef FLB_HAVE_METRICS
+ char *name = (char *) flb_output_name(ctx->ins);
+ uint64_t ts = cfl_time_now();
+#endif
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+#ifdef FLB_HAVE_METRICS
+ cmt_counter_inc(ctx->cmt_failed_requests,
+ ts, 1, (char *[]) {name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
+
+ update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR);
+ update_retry_metric(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR, FLB_RETRY);
+#endif
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Reformat msgpack to stackdriver JSON payload */
+ payload_buf = stackdriver_format(ctx,
+ event_chunk->total_events,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ event_chunk->data, event_chunk->size);
+ if (!payload_buf) {
+#ifdef FLB_HAVE_METRICS
+ cmt_counter_inc(ctx->cmt_failed_requests,
+ ts, 1, (char *[]) {name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
+#endif
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Get or renew Token */
+ token = get_google_token(ctx);
+ if (!token) {
+ flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
+ flb_upstream_conn_release(u_conn);
+ flb_sds_destroy(payload_buf);
+#ifdef FLB_HAVE_METRICS
+ cmt_counter_inc(ctx->cmt_failed_requests,
+ ts, 1, (char *[]) {name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
+#endif
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ compressed_payload_buffer = payload_buf;
+ compressed_payload_size = flb_sds_len(payload_buf);
+ if (ctx->compress_gzip == FLB_TRUE) {
+ ret = flb_gzip_compress((void *) payload_buf, flb_sds_len(payload_buf),
+ &compressed_payload_buffer, &compressed_payload_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
+ } else {
+ compressed = FLB_TRUE;
+ flb_sds_destroy(payload_buf);
+ }
+ }
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI,
+ compressed_payload_buffer, compressed_payload_size, NULL, 0, NULL, 0);
+
+ flb_http_buffer_size(c, 4192);
+
+ if (ctx->stackdriver_agent) {
+ flb_http_add_header(c, "User-Agent", 10,
+ ctx->stackdriver_agent,
+ flb_sds_len(ctx->stackdriver_agent));
+ }
+ else {
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ }
+
+ flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
+ flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
+ /* Content Encoding: gzip */
+ if (compressed == FLB_TRUE) {
+ flb_http_set_content_encoding_gzip(c);
+ }
+
+ /* Send HTTP request */
+ ret = flb_http_do(c, &b_sent);
+
+ /* validate response */
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "http_do=%i", ret);
+ ret_code = FLB_RETRY;
+#ifdef FLB_HAVE_METRICS
+ update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR);
+#endif
+ }
+ else {
+ /* The request was issued successfully, validate the 'error' field */
+ flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
+ if (c->resp.status == 200) {
+ ret_code = FLB_OK;
+ }
+ else if (c->resp.status >= 400 && c->resp.status < 500) {
+ ret_code = FLB_ERROR;
+ flb_plg_warn(ctx->ins, "error\n%s",
+ c->resp.payload);
+ }
+ else {
+ if (c->resp.payload_size > 0) {
+ /* we got an error */
+ flb_plg_warn(ctx->ins, "error\n%s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "response\n%s",
+ c->resp.payload);
+ }
+ ret_code = FLB_RETRY;
+ }
+ }
+
+ /* Update specific stackdriver metrics */
+#ifdef FLB_HAVE_METRICS
+ if (ret_code == FLB_OK) {
+ cmt_counter_inc(ctx->cmt_successful_requests,
+ ts, 1, (char *[]) {name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);
+ }
+ else {
+ cmt_counter_inc(ctx->cmt_failed_requests,
+ ts, 1, (char *[]) {name});
+
+ /* OLD api */
+ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics);
+ }
+
+ /* Update metrics counter by using labels/http status code */
+ if (ret == 0) {
+ update_http_metrics(ctx, event_chunk, ts, c->resp.status);
+ }
+
+ /* Update retry count if necessary */
+ update_retry_metric(ctx, event_chunk, ts, c->resp.status, ret_code);
+#endif
+
+
+ /* Cleanup */
+ if (compressed == FLB_TRUE) {
+ flb_free(compressed_payload_buffer);
+ }
+ else {
+ flb_sds_destroy(payload_buf);
+ }
+ flb_sds_destroy(token);
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+
+ /* Done */
+ FLB_OUTPUT_RETURN(ret_code);
+}
+
+static int cb_stackdriver_exit(void *data, struct flb_config *config)
+{
+ struct flb_stackdriver *ctx = data;
+
+ if (!ctx) {
+ return -1;
+ }
+
+ flb_stackdriver_conf_destroy(ctx);
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, credentials_file),
+ "Set the path for the google service credentials file"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "metadata_server", (char *)NULL,
+ 0, FLB_FALSE, 0,
+ "Set the metadata server"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, client_email),
+ "Set the service account email"
+ },
+ // set in flb_bigquery_oauth_credentials
+ {
+ FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, private_key),
+ "Set the service account secret"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "export_to_project_id", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, export_to_project_id),
+ "Export to project id"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "resource", FLB_SDS_RESOURCE_TYPE,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource),
+ "Set the resource"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "severity_key", DEFAULT_SEVERITY_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, severity_key),
+ "Set the severity key"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "autoformat_stackdriver_trace", "false",
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, autoformat_stackdriver_trace),
+ "Autoformat the stackdriver trace"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "trace_key", DEFAULT_TRACE_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_key),
+ "Set the trace key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "span_id_key", DEFAULT_SPAN_ID_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, span_id_key),
+ "Set the span id key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "trace_sampled_key", DEFAULT_TRACE_SAMPLED_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_sampled_key),
+ "Set the trace sampled key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "log_name_key", DEFAULT_LOG_NAME_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, log_name_key),
+ "Set the logname key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "http_request_key", HTTPREQUEST_FIELD_IN_JSON,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, http_request_key),
+ "Set the http request key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "k8s_cluster_name", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_name),
+ "Set the kubernetes cluster name"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "k8s_cluster_location", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_location),
+ "Set the kubernetes cluster location"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "location", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, location),
+ "Set the resource location"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "namespace", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, namespace_id),
+ "Set the resource namespace"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "node_id", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, node_id),
+ "Set the resource node id"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "job", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, job),
+ "Set the resource job"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "task_id", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, task_id),
+ "Set the resource task id"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "compress", NULL,
+ 0, FLB_FALSE, 0,
+ "Set log payload compression method. Option available is 'gzip'"
+ },
+ {
+ FLB_CONFIG_MAP_CLIST, "labels", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels),
+ "Set the labels"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "labels_key", DEFAULT_LABELS_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels_key),
+ "Set the labels key"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tag_prefix", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, tag_prefix),
+ "Set the tag prefix"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "stackdriver_agent", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, stackdriver_agent),
+ "Set the stackdriver agent"
+ },
+ /* Custom Regex */
+ {
+ FLB_CONFIG_MAP_STR, "custom_k8s_regex", DEFAULT_TAG_REGEX,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, custom_k8s_regex),
+ "Set a custom kubernetes regex filter"
+ },
+ {
+ FLB_CONFIG_MAP_CLIST, "resource_labels", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels),
+ "Set the resource labels"
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_output_plugin out_stackdriver_plugin = {
+ .name = "stackdriver",
+ .description = "Send events to Google Stackdriver Logging",
+ .cb_init = cb_stackdriver_init,
+ .cb_flush = cb_stackdriver_flush,
+ .cb_exit = cb_stackdriver_exit,
+ .workers = 1,
+ .config_map = config_map,
+
+ /* Test */
+ .test_formatter.callback = stackdriver_format_test,
+
+ /* Plugin flags */
+ .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
+};