/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ /* Fluent Bit * ========== * Copyright (C) 2015-2022 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "gce_metadata.h" #include "stackdriver.h" #include "stackdriver_conf.h" #include "stackdriver_operation.h" #include "stackdriver_source_location.h" #include "stackdriver_http_request.h" #include "stackdriver_timestamp.h" #include "stackdriver_helper.h" #include "stackdriver_resource_types.h" pthread_key_t oauth2_type; pthread_key_t oauth2_token; pthread_key_t oauth2_token_expires; static void oauth2_cache_exit(void *ptr) { if (ptr) { flb_sds_destroy(ptr); } } static void oauth2_cache_free_expiration(void *ptr) { if (ptr) { flb_free(ptr); } } static void oauth2_cache_init() { /* oauth2 pthread key */ pthread_key_create(&oauth2_type, oauth2_cache_exit); pthread_key_create(&oauth2_token, oauth2_cache_exit); pthread_key_create(&oauth2_token_expires, oauth2_cache_free_expiration); } /* Set oauth2 type and token in pthread keys */ static void oauth2_cache_set(char *type, char *token, time_t expires) { flb_sds_t tmp; time_t *tmp_expires; /* oauth2 type */ tmp = pthread_getspecific(oauth2_type); if (tmp) { flb_sds_destroy(tmp); } tmp = flb_sds_create(type); pthread_setspecific(oauth2_type, tmp); /* oauth2 access token */ tmp = pthread_getspecific(oauth2_token); if (tmp) { flb_sds_destroy(tmp); } tmp = flb_sds_create(token); pthread_setspecific(oauth2_token, tmp); /* oauth2 access token expiration */ tmp_expires = pthread_getspecific(oauth2_token_expires); if (tmp_expires) { flb_free(tmp_expires); } tmp_expires = flb_calloc(1, sizeof(time_t)); if (!tmp_expires) { flb_errno(); return; } *tmp_expires = expires; pthread_setspecific(oauth2_token_expires, tmp_expires); } /* By using pthread keys cached values, compose the authorizatoin token */ static time_t oauth2_cache_get_expiration() { time_t *expires = pthread_getspecific(oauth2_token_expires); if (expires) { return *expires; } return 0; } /* By using pthread keys cached values, compose the authorizatoin token */ static flb_sds_t oauth2_cache_to_token() { flb_sds_t type; flb_sds_t token; flb_sds_t output; type = pthread_getspecific(oauth2_type); if (!type) { return NULL; } output = flb_sds_create(type); if (!output) { return NULL; } token = pthread_getspecific(oauth2_token); flb_sds_printf(&output, " %s", token); return output; } /* * Base64 Encoding in JWT must: * * - remove any trailing padding '=' character * - replace '+' with '-' * - replace '/' with '_' * * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C */ int jwt_base64_url_encode(unsigned char *out_buf, size_t out_size, unsigned char *in_buf, size_t in_size, size_t *olen) { int i; size_t len; int result; /* do normal base64 encoding */ result = flb_base64_encode((unsigned char *) out_buf, out_size - 1, &len, in_buf, in_size); if (result != 0) { return -1; } /* Replace '+' and '/' characters */ for (i = 0; i < len && out_buf[i] != '='; i++) { if (out_buf[i] == '+') { out_buf[i] = '-'; } else if (out_buf[i] == '/') { out_buf[i] = '_'; } } /* Now 'i' becomes the new length */ *olen = i; return 0; } static int jwt_encode(char *payload, char *secret, char **out_signature, size_t *out_size, struct flb_stackdriver *ctx) { int ret; int len; int buf_size; size_t olen; char *buf; char *sigd; char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}"; unsigned char sha256_buf[32] = {0}; flb_sds_t out; unsigned char sig[256] = {0}; size_t sig_len; buf_size = (strlen(payload) + strlen(secret)) * 2; buf = flb_malloc(buf_size); if (!buf) { flb_errno(); return -1; } /* Encode header */ len = strlen(headers); ret = flb_base64_encode((unsigned char *) buf, buf_size - 1, &olen, (unsigned char *) headers, len); if (ret != 0) { flb_free(buf); return ret; } /* Create buffer to store JWT */ out = flb_sds_create_size(2048); if (!out) { flb_errno(); flb_free(buf); return -1; } /* Append header */ flb_sds_cat(out, buf, olen); flb_sds_cat(out, ".", 1); /* Encode Payload */ len = strlen(payload); jwt_base64_url_encode((unsigned char *) buf, buf_size, (unsigned char *) payload, len, &olen); /* Append Payload */ flb_sds_cat(out, buf, olen); /* do sha256() of base64(header).base64(payload) */ ret = flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) out, flb_sds_len(out), sha256_buf, sizeof(sha256_buf)); if (ret != FLB_CRYPTO_SUCCESS) { flb_plg_error(ctx->ins, "error hashing token"); flb_free(buf); flb_sds_destroy(out); return -1; } len = strlen(secret); sig_len = sizeof(sig); ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY, FLB_CRYPTO_PADDING_PKCS1, FLB_HASH_SHA256, (unsigned char *) secret, len, sha256_buf, sizeof(sha256_buf), sig, &sig_len); if (ret != FLB_CRYPTO_SUCCESS) { flb_plg_error(ctx->ins, "error creating RSA context"); flb_free(buf); flb_sds_destroy(out); return -1; } sigd = flb_malloc(2048); if (!sigd) { flb_errno(); flb_free(buf); flb_sds_destroy(out); return -1; } jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen); flb_sds_cat(out, ".", 1); flb_sds_cat(out, sigd, olen); *out_signature = out; *out_size = flb_sds_len(out); flb_free(buf); flb_free(sigd); return 0; } /* Create a new oauth2 context and get a oauth2 token */ static int get_oauth2_token(struct flb_stackdriver *ctx) { int ret; char *token; char *sig_data; size_t sig_size; time_t issued; time_t expires; char payload[1024]; flb_oauth2_payload_clear(ctx->o); /* In case of using metadata server, fetch token from there */ if (ctx->metadata_server_auth) { return gce_metadata_read_token(ctx); } /* JWT encode for oauth2 */ issued = time(NULL); expires = issued + FLB_STD_TOKEN_REFRESH; snprintf(payload, sizeof(payload) - 1, "{\"iss\": \"%s\", \"scope\": \"%s\", " "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}", ctx->client_email, FLB_STD_SCOPE, FLB_STD_AUTH_URL, expires, issued); /* Compose JWT signature */ ret = jwt_encode(payload, ctx->private_key, &sig_data, &sig_size, ctx); if (ret != 0) { flb_plg_error(ctx->ins, "JWT signature generation failed"); return -1; } flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data); ret = flb_oauth2_payload_append(ctx->o, "grant_type", -1, "urn%3Aietf%3Aparams%3Aoauth%3A" "grant-type%3Ajwt-bearer", -1); if (ret == -1) { flb_plg_error(ctx->ins, "error appending oauth2 params"); flb_sds_destroy(sig_data); return -1; } ret = flb_oauth2_payload_append(ctx->o, "assertion", -1, sig_data, sig_size); if (ret == -1) { flb_plg_error(ctx->ins, "error appending oauth2 params"); flb_sds_destroy(sig_data); return -1; } flb_sds_destroy(sig_data); /* Retrieve access token */ token = flb_oauth2_token_get(ctx->o); if (!token) { flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); return -1; } return 0; } static flb_sds_t get_google_token(struct flb_stackdriver *ctx) { int ret = 0; flb_sds_t output = NULL; time_t cached_expiration = 0; ret = pthread_mutex_trylock(&ctx->token_mutex); if (ret == EBUSY) { /* * If the routine is locked we just use our pre-cached values and * compose the expected authorization value. * * If the routine fails it will return NULL and the caller will just * issue a FLB_RETRY. */ output = oauth2_cache_to_token(); cached_expiration = oauth2_cache_get_expiration(); if (time(NULL) >= cached_expiration) { return output; } else { /* * Cached token is expired. Wait on lock to use up-to-date token * by either waiting for it to be refreshed or refresh it ourselves. */ flb_plg_info(ctx->ins, "Cached token is expired. Waiting on lock."); ret = pthread_mutex_lock(&ctx->token_mutex); } } if (ret != 0) { flb_plg_error(ctx->ins, "error locking mutex"); return NULL; } if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { ret = get_oauth2_token(ctx); } /* Copy string to prevent race conditions (get_oauth2 can free the string) */ if (ret == 0) { /* Update pthread keys cached values */ oauth2_cache_set(ctx->o->token_type, ctx->o->access_token, ctx->o->expires); /* Compose outgoing buffer using cached values */ output = oauth2_cache_to_token(); } if (pthread_mutex_unlock(&ctx->token_mutex)){ flb_plg_error(ctx->ins, "error unlocking mutex"); if (output) { flb_sds_destroy(output); } return NULL; } return output; } void replace_prefix_dot(flb_sds_t s, int tag_prefix_len) { int i; int str_len; char c; if (!s) { return; } str_len = flb_sds_len(s); if (tag_prefix_len > str_len) { flb_error("[output] tag_prefix shouldn't be longer than local_resource_id"); return; } for (i = 0; i < tag_prefix_len; i++) { c = s[i]; if (c == '.') { s[i] = '_'; } } } static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map, const char *key, int key_size) { int i; msgpack_object k; msgpack_object v; flb_sds_t ptr = NULL; for (i = 0; i < map.size; i++) { k = map.ptr[i].key; v = map.ptr[i].val; if (k.type != MSGPACK_OBJECT_STR) { continue; } if (k.via.str.size == key_size && strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) { /* make sure to free it after use */ ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size); break; } } return ptr; } /* parse_monitored_resource is to extract the monitoired resource labels * from "logging.googleapis.com/monitored_resource" in log data * and append to 'resource'/'labels' in log entry. * Monitored resource type is already read from resource field in stackdriver * output plugin configuration parameters. * * The structure of monitored_resource is: * { * "logging.googleapis.com/monitored_resource": { * "labels": { * "resource_label": , * } * } * } * See https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types * for required labels for each monitored resource. */ static int parse_monitored_resource(struct flb_stackdriver *ctx, const void *data, size_t bytes, msgpack_packer *mp_pck) { int ret = -1; msgpack_object *obj; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); return -1; } while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { obj = log_event.body; msgpack_object_kv *kv = obj->via.map.ptr; msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; for (; kv < kvend; ++kv) { if (kv->val.type == MSGPACK_OBJECT_MAP && kv->key.type == MSGPACK_OBJECT_STR && strncmp (MONITORED_RESOURCE_KEY, kv->key.via.str.ptr, kv->key.via.str.size) == 0) { msgpack_object subobj = kv->val; msgpack_object_kv *p = subobj.via.map.ptr; msgpack_object_kv *pend = subobj.via.map.ptr + subobj.via.map.size; for (; p < pend; ++p) { if (p->key.type != MSGPACK_OBJECT_STR || p->val.type != MSGPACK_OBJECT_MAP) { continue; } if (strncmp("labels", p->key.via.str.ptr, p->key.via.str.size) == 0) { msgpack_object labels = p->val; msgpack_object_kv *q = labels.via.map.ptr; msgpack_object_kv *qend = labels.via.map.ptr + labels.via.map.size; int fields = 0; for (; q < qend; ++q) { if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "Key and value should be string in the %s/labels", MONITORED_RESOURCE_KEY); } ++fields; } if (fields > 0) { msgpack_pack_map(mp_pck, fields); q = labels.via.map.ptr; for (; q < qend; ++q) { if (q->key.type != MSGPACK_OBJECT_STR || q->val.type != MSGPACK_OBJECT_STR) { continue; } flb_plg_debug(ctx->ins, "[%s] found in the payload", MONITORED_RESOURCE_KEY); msgpack_pack_str(mp_pck, q->key.via.str.size); msgpack_pack_str_body(mp_pck, q->key.via.str.ptr, q->key.via.str.size); msgpack_pack_str(mp_pck, q->val.via.str.size); msgpack_pack_str_body(mp_pck, q->val.via.str.ptr, q->val.via.str.size); } flb_log_event_decoder_destroy(&log_decoder); return 0; } } } } } } flb_log_event_decoder_destroy(&log_decoder); flb_plg_debug(ctx->ins, "[%s] not found in the payload", MONITORED_RESOURCE_KEY); return ret; } /* * Given a local_resource_id, split the content using the proper separator generating * a linked list to store the spliited string */ static struct mk_list *parse_local_resource_id_to_list(char *local_resource_id, char *type) { int ret = -1; int max_split = -1; int len_k8s_container; int len_k8s_node; int len_k8s_pod; struct mk_list *list; len_k8s_container = sizeof(K8S_CONTAINER) - 1; len_k8s_node = sizeof(K8S_NODE) - 1; len_k8s_pod = sizeof(K8S_POD) - 1; /* Allocate list head */ list = flb_malloc(sizeof(struct mk_list)); if (!list) { flb_errno(); return NULL; } mk_list_init(list); /* Determinate the max split value based on type */ if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) { /* including the prefix of tag */ max_split = 4; } else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) { max_split = 2; } else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) { max_split = 3; } /* The local_resource_id is splitted by '.' */ ret = flb_slist_split_string(list, local_resource_id, '.', max_split); if (ret == -1 || mk_list_size(list) != max_split) { flb_error("error parsing local_resource_id [%s] for type %s", local_resource_id, type); flb_slist_destroy(list); flb_free(list); return NULL; } return list; } /* * extract_local_resource_id(): * - extract the value from "logging.googleapis.com/local_resource_id" field * - if local_resource_id is missing from the payLoad, use the tag of the log */ static int extract_local_resource_id(const void *data, size_t bytes, struct flb_stackdriver *ctx, const char *tag) { msgpack_object_map map; flb_sds_t local_resource_id; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; int ret; ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); return -1; } if ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { map = log_event.body->via.map; local_resource_id = get_str_value_from_msgpack_map(map, LOCAL_RESOURCE_ID_KEY, LEN_LOCAL_RESOURCE_ID_KEY); if (local_resource_id == NULL) { /* if local_resource_id is not found, use the tag of the log */ flb_plg_debug(ctx->ins, "local_resource_id not found, " "tag [%s] is assigned for local_resource_id", tag); local_resource_id = flb_sds_create(tag); } /* we need to create up the local_resource_id from previous log */ if (ctx->local_resource_id) { flb_sds_destroy(ctx->local_resource_id); } ctx->local_resource_id = flb_sds_create(local_resource_id); flb_sds_destroy(local_resource_id); ret = 0; } else { flb_plg_error(ctx->ins, "failed to unpack data"); ret = -1; } flb_log_event_decoder_destroy(&log_decoder); return ret; } /* * set_monitored_resource_labels(): * - use the extracted local_resource_id to assign the label keys for different * resource types that are specified in the configuration of stackdriver_out plugin */ static int set_monitored_resource_labels(struct flb_stackdriver *ctx, char *type) { int ret = -1; int first = FLB_TRUE; int counter = 0; int len_k8s_container; int len_k8s_node; int len_k8s_pod; size_t prefix_len = 0; struct local_resource_id_list *ptr; struct mk_list *list = NULL; struct mk_list *head; flb_sds_t new_local_resource_id; if (!ctx->local_resource_id) { flb_plg_error(ctx->ins, "local_resource_is is not assigned"); return -1; } len_k8s_container = sizeof(K8S_CONTAINER) - 1; len_k8s_node = sizeof(K8S_NODE) - 1; len_k8s_pod = sizeof(K8S_POD) - 1; prefix_len = flb_sds_len(ctx->tag_prefix); if (flb_sds_casecmp(ctx->tag_prefix, ctx->local_resource_id, prefix_len) != 0) { flb_plg_error(ctx->ins, "tag_prefix [%s] doesn't match the prefix of" " local_resource_id [%s]", ctx->tag_prefix, ctx->local_resource_id); return -1; } new_local_resource_id = flb_sds_create_len(ctx->local_resource_id, flb_sds_len(ctx->local_resource_id)); replace_prefix_dot(new_local_resource_id, prefix_len - 1); if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) { list = parse_local_resource_id_to_list(new_local_resource_id, K8S_CONTAINER); if (!list) { goto error; } /* iterate through the list */ mk_list_foreach(head, list) { ptr = mk_list_entry(head, struct local_resource_id_list, _head); if (first) { first = FLB_FALSE; continue; } /* Follow the order of fields in local_resource_id */ if (counter == 0) { if (ctx->namespace_name) { flb_sds_destroy(ctx->namespace_name); } ctx->namespace_name = flb_sds_create(ptr->val); } else if (counter == 1) { if (ctx->pod_name) { flb_sds_destroy(ctx->pod_name); } ctx->pod_name = flb_sds_create(ptr->val); } else if (counter == 2) { if (ctx->container_name) { flb_sds_destroy(ctx->container_name); } ctx->container_name = flb_sds_create(ptr->val); } counter++; } if (!ctx->namespace_name || !ctx->pod_name || !ctx->container_name) { goto error; } } else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) { list = parse_local_resource_id_to_list(new_local_resource_id, K8S_NODE); if (!list) { goto error; } mk_list_foreach(head, list) { ptr = mk_list_entry(head, struct local_resource_id_list, _head); if (first) { first = FLB_FALSE; continue; } if (ptr != NULL) { if (ctx->node_name) { flb_sds_destroy(ctx->node_name); } ctx->node_name = flb_sds_create(ptr->val); } } if (!ctx->node_name) { goto error; } } else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) { list = parse_local_resource_id_to_list(new_local_resource_id, K8S_POD); if (!list) { goto error; } mk_list_foreach(head, list) { ptr = mk_list_entry(head, struct local_resource_id_list, _head); if (first) { first = FLB_FALSE; continue; } /* Follow the order of fields in local_resource_id */ if (counter == 0) { if (ctx->namespace_name) { flb_sds_destroy(ctx->namespace_name); } ctx->namespace_name = flb_sds_create(ptr->val); } else if (counter == 1) { if (ctx->pod_name) { flb_sds_destroy(ctx->pod_name); } ctx->pod_name = flb_sds_create(ptr->val); } counter++; } if (!ctx->namespace_name || !ctx->pod_name) { goto error; } } ret = 0; if (list) { flb_slist_destroy(list); flb_free(list); } flb_sds_destroy(new_local_resource_id); return ret; error: if (list) { flb_slist_destroy(list); flb_free(list); } if (strncmp(type, K8S_CONTAINER, len_k8s_container) == 0) { if (ctx->namespace_name) { flb_sds_destroy(ctx->namespace_name); } if (ctx->pod_name) { flb_sds_destroy(ctx->pod_name); } if (ctx->container_name) { flb_sds_destroy(ctx->container_name); } } else if (strncmp(type, K8S_NODE, len_k8s_node) == 0) { if (ctx->node_name) { flb_sds_destroy(ctx->node_name); } } else if (strncmp(type, K8S_POD, len_k8s_pod) == 0) { if (ctx->namespace_name) { flb_sds_destroy(ctx->namespace_name); } if (ctx->pod_name) { flb_sds_destroy(ctx->pod_name); } } flb_sds_destroy(new_local_resource_id); return -1; } static int is_tag_match_regex(struct flb_stackdriver *ctx, const char *tag, int tag_len) { int ret; int tag_prefix_len; int len_to_be_matched; const char *tag_str_to_be_matcheds; tag_prefix_len = flb_sds_len(ctx->tag_prefix); if (tag_len > tag_prefix_len && flb_sds_cmp(ctx->tag_prefix, tag, tag_prefix_len) != 0) { return 0; } tag_str_to_be_matcheds = tag + tag_prefix_len; len_to_be_matched = tag_len - tag_prefix_len; ret = flb_regex_match(ctx->regex, (unsigned char *) tag_str_to_be_matcheds, len_to_be_matched); /* 1 -> match; 0 -> doesn't match; < 0 -> error */ return ret; } static int is_local_resource_id_match_regex(struct flb_stackdriver *ctx) { int ret; int prefix_len; int len_to_be_matched; const char *str_to_be_matcheds; if (!ctx->local_resource_id) { flb_plg_warn(ctx->ins, "local_resource_id not found in the payload"); return -1; } prefix_len = flb_sds_len(ctx->tag_prefix); str_to_be_matcheds = ctx->local_resource_id + prefix_len; len_to_be_matched = flb_sds_len(ctx->local_resource_id) - prefix_len; ret = flb_regex_match(ctx->regex, (unsigned char *) str_to_be_matcheds, len_to_be_matched); /* 1 -> match; 0 -> doesn't match; < 0 -> error */ return ret; } static void cb_results(const char *name, const char *value, size_t vlen, void *data); /* * extract_resource_labels_from_regex(4) will only be called if the * tag or local_resource_id field matches the regex rule */ static int extract_resource_labels_from_regex(struct flb_stackdriver *ctx, const char *tag, int tag_len, int from_tag) { int ret = 1; int prefix_len; int len_to_be_matched; int local_resource_id_len; const char *str_to_be_matcheds; struct flb_regex_search result; prefix_len = flb_sds_len(ctx->tag_prefix); if (from_tag == FLB_TRUE) { local_resource_id_len = tag_len; str_to_be_matcheds = tag + prefix_len; } else { // this will be called only if the payload contains local_resource_id local_resource_id_len = flb_sds_len(ctx->local_resource_id); str_to_be_matcheds = ctx->local_resource_id + prefix_len; } len_to_be_matched = local_resource_id_len - prefix_len; ret = flb_regex_do(ctx->regex, str_to_be_matcheds, len_to_be_matched, &result); if (ret <= 0) { flb_plg_warn(ctx->ins, "invalid pattern for given value %s when" " extracting resource labels", str_to_be_matcheds); return -1; } flb_regex_parse(ctx->regex, &result, cb_results, ctx); return ret; } static int process_local_resource_id(struct flb_stackdriver *ctx, const char *tag, int tag_len, char *type) { int ret; // parsing local_resource_id from tag takes higher priority if (is_tag_match_regex(ctx, tag, tag_len) > 0) { ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_TRUE); } else if (is_local_resource_id_match_regex(ctx) > 0) { ret = extract_resource_labels_from_regex(ctx, tag, tag_len, FLB_FALSE); } else { ret = set_monitored_resource_labels(ctx, type); } return ret; } /* * get_payload_labels * - Iterate throught the original payload (obj) and find out the entry that matches * the labels_key * - Used to convert all labels under labels_key to root-level `labels` field */ static msgpack_object *get_payload_labels(struct flb_stackdriver *ctx, msgpack_object *obj) { int i; int len; msgpack_object_kv *kv = NULL; if (!obj || obj->type != MSGPACK_OBJECT_MAP) { return NULL; } len = flb_sds_len(ctx->labels_key); for (i = 0; i < obj->via.map.size; i++) { kv = &obj->via.map.ptr[i]; if (flb_sds_casecmp(ctx->labels_key, kv->key.via.str.ptr, len) == 0) { /* only the first matching entry will be returned */ return &kv->val; } } //flb_plg_debug(ctx->ins, "labels_key [%s] not found in the payload", // ctx->labels_key); return NULL; } /* * pack_resource_labels(): * - Looks through the resource_labels parameter and appends new key value * pair to the log entry. * - Supports field access, plaintext assignment and environment variables. */ static int pack_resource_labels(struct flb_stackdriver *ctx, struct flb_mp_map_header *mh, msgpack_packer *mp_pck, const void *data, size_t bytes) { struct mk_list *head; struct flb_kv *label_kv; struct flb_record_accessor *ra; struct flb_ra_value *rval; int len; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; int ret; if (ctx->should_skip_resource_labels_api == FLB_TRUE) { return -1; } len = mk_list_size(&ctx->resource_labels_kvs); if (len == 0) { return -1; } ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); return -1; } if ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { flb_mp_map_header_init(mh, mp_pck); mk_list_foreach(head, &ctx->resource_labels_kvs) { label_kv = mk_list_entry(head, struct flb_kv, _head); /* * KVs have the form destination=original, so the original key is the value. * If the value starts with '$', it will be processed using record accessor. * Otherwise, it will be treated as a plaintext assignment. */ if (label_kv->val[0] == '$') { ra = flb_ra_create(label_kv->val, FLB_TRUE); rval = flb_ra_get_value_object(ra, *log_event.body); if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string)); msgpack_pack_str_body(mp_pck, rval->val.string, flb_sds_len(rval->val.string)); flb_ra_key_value_destroy(rval); } else { flb_plg_warn(ctx->ins, "failed to find a corresponding entry for " "resource label entry [%s=%s]", label_kv->key, label_kv->val); } flb_ra_destroy(ra); } else { flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key)); msgpack_pack_str_body(mp_pck, label_kv->key, flb_sds_len(label_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val)); msgpack_pack_str_body(mp_pck, label_kv->val, flb_sds_len(label_kv->val)); } } } else { flb_plg_error(ctx->ins, "failed to unpack data"); flb_log_event_decoder_destroy(&log_decoder); return -1; } /* project_id should always be packed from config parameter */ flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, 10); msgpack_pack_str_body(mp_pck, "project_id", 10); msgpack_pack_str(mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); flb_log_event_decoder_destroy(&log_decoder); flb_mp_map_header_end(mh); return 0; } static void pack_labels(struct flb_stackdriver *ctx, msgpack_packer *mp_pck, msgpack_object *payload_labels_ptr) { int i; int labels_size = 0; struct mk_list *head; struct flb_kv *list_kv; msgpack_object_kv *obj_kv = NULL; /* Determine size of labels map */ labels_size = mk_list_size(&ctx->config_labels); if (payload_labels_ptr != NULL && payload_labels_ptr->type == MSGPACK_OBJECT_MAP) { labels_size += payload_labels_ptr->via.map.size; } msgpack_pack_map(mp_pck, labels_size); /* pack labels from the payload */ if (payload_labels_ptr != NULL && payload_labels_ptr->type == MSGPACK_OBJECT_MAP) { for (i = 0; i < payload_labels_ptr->via.map.size; i++) { obj_kv = &payload_labels_ptr->via.map.ptr[i]; msgpack_pack_object(mp_pck, obj_kv->key); msgpack_pack_object(mp_pck, obj_kv->val); } } /* pack labels set in configuration */ /* in msgpack duplicate keys are overriden by the last set */ /* static label keys override payload labels */ mk_list_foreach(head, &ctx->config_labels){ list_kv = mk_list_entry(head, struct flb_kv, _head); msgpack_pack_str(mp_pck, flb_sds_len(list_kv->key)); msgpack_pack_str_body(mp_pck, list_kv->key, flb_sds_len(list_kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(list_kv->val)); msgpack_pack_str_body(mp_pck, list_kv->val, flb_sds_len(list_kv->val)); } } static void cb_results(const char *name, const char *value, size_t vlen, void *data) { struct flb_stackdriver *ctx = data; if (vlen == 0) { return; } if (strcmp(name, "pod_name") == 0) { if (ctx->pod_name != NULL) { flb_sds_destroy(ctx->pod_name); } ctx->pod_name = flb_sds_create_len(value, vlen); } else if (strcmp(name, "namespace_name") == 0) { if (ctx->namespace_name != NULL) { flb_sds_destroy(ctx->namespace_name); } ctx->namespace_name = flb_sds_create_len(value, vlen); } else if (strcmp(name, "container_name") == 0) { if (ctx->container_name != NULL) { flb_sds_destroy(ctx->container_name); } ctx->container_name = flb_sds_create_len(value, vlen); } else if (strcmp(name, "node_name") == 0) { if (ctx->node_name != NULL) { flb_sds_destroy(ctx->node_name); } ctx->node_name = flb_sds_create_len(value, vlen); } return; } int flb_stackdriver_regex_init(struct flb_stackdriver *ctx) { /* If a custom regex is not set, use the defaults */ ctx->regex = flb_regex_create(ctx->custom_k8s_regex); if (!ctx->regex) { return -1; } return 0; } static int cb_stackdriver_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { int ret; int io_flags = FLB_IO_TLS; char *token; struct flb_stackdriver *ctx; /* Create config context */ ctx = flb_stackdriver_conf_create(ins, config); if (!ctx) { flb_plg_error(ins, "configuration failed"); return -1; } /* Load config map */ ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { return -1; } /* Set context */ flb_output_set_context(ins, ctx); /* Network mode IPv6 */ if (ins->host.ipv6 == FLB_TRUE) { io_flags |= FLB_IO_IPV6; } /* Initialize oauth2 cache pthread keys */ oauth2_cache_init(); /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ pthread_mutex_init(&ctx->token_mutex, NULL); /* Create Upstream context for Stackdriver Logging (no oauth2 service) */ ctx->u = flb_upstream_create_url(config, FLB_STD_WRITE_URL, io_flags, ins->tls); ctx->metadata_u = flb_upstream_create_url(config, ctx->metadata_server, FLB_IO_TCP, NULL); /* Create oauth2 context */ ctx->o = flb_oauth2_create(ctx->config, FLB_STD_AUTH_URL, 3000); if (!ctx->u) { flb_plg_error(ctx->ins, "upstream creation failed"); return -1; } if (!ctx->metadata_u) { flb_plg_error(ctx->ins, "metadata upstream creation failed"); return -1; } if (!ctx->o) { flb_plg_error(ctx->ins, "cannot create oauth2 context"); return -1; } flb_output_upstream_set(ctx->u, ins); /* Metadata Upstream Sync flags */ flb_stream_disable_async_mode(&ctx->metadata_u->base); if (ins->test_mode == FLB_FALSE) { /* Retrieve oauth2 token */ token = get_google_token(ctx); if (!token) { flb_plg_warn(ctx->ins, "token retrieval failed"); } else { flb_sds_destroy(token); } } if (ctx->metadata_server_auth) { ret = gce_metadata_read_project_id(ctx); if (ret == -1) { return -1; } if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE && ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) { ret = gce_metadata_read_zone(ctx); if (ret == -1) { return -1; } ret = gce_metadata_read_instance_id(ctx); if (ret == -1) { return -1; } } } /* Validate project_id */ if (!ctx->project_id) { flb_plg_error(ctx->ins, "property 'project_id' is not set"); return -1; } if (!ctx->export_to_project_id) { ctx->export_to_project_id = ctx->project_id; } ret = flb_stackdriver_regex_init(ctx); if (ret == -1) { flb_plg_error(ctx->ins, "failed to init stackdriver custom regex"); return -1; } return 0; } static int validate_severity_level(severity_t * s, const char * str, const unsigned int str_size) { int i = 0; const static struct { severity_t s; const unsigned int str_size; const char * str; } enum_mapping[] = { {FLB_STD_EMERGENCY, 9, "EMERGENCY"}, {FLB_STD_EMERGENCY, 5, "EMERG" }, {FLB_STD_ALERT , 1, "A" }, {FLB_STD_ALERT , 5, "ALERT" }, {FLB_STD_CRITICAL , 1, "C" }, {FLB_STD_CRITICAL , 1, "F" }, {FLB_STD_CRITICAL , 4, "CRIT" }, {FLB_STD_CRITICAL , 5, "FATAL" }, {FLB_STD_CRITICAL , 8, "CRITICAL" }, {FLB_STD_ERROR , 1, "E" }, {FLB_STD_ERROR , 3, "ERR" }, {FLB_STD_ERROR , 5, "ERROR" }, {FLB_STD_ERROR , 6, "SEVERE" }, {FLB_STD_WARNING , 1, "W" }, {FLB_STD_WARNING , 4, "WARN" }, {FLB_STD_WARNING , 7, "WARNING" }, {FLB_STD_NOTICE , 1, "N" }, {FLB_STD_NOTICE , 6, "NOTICE" }, {FLB_STD_INFO , 1, "I" }, {FLB_STD_INFO , 4, "INFO" }, {FLB_STD_DEBUG , 1, "D" }, {FLB_STD_DEBUG , 5, "DEBUG" }, {FLB_STD_DEBUG , 5, "TRACE" }, {FLB_STD_DEBUG , 9, "TRACE_INT"}, {FLB_STD_DEBUG , 4, "FINE" }, {FLB_STD_DEBUG , 5, "FINER" }, {FLB_STD_DEBUG , 6, "FINEST" }, {FLB_STD_DEBUG , 6, "CONFIG" }, {FLB_STD_DEFAULT , 7, "DEFAULT" } }; for (i = 0; i < sizeof (enum_mapping) / sizeof (enum_mapping[0]); ++i) { if (enum_mapping[i].str_size != str_size) { continue; } if (strncasecmp(str, enum_mapping[i].str, str_size) == 0) { *s = enum_mapping[i].s; return 0; } } return -1; } static int get_msgpack_obj(msgpack_object * subobj, const msgpack_object * o, const flb_sds_t key, const int key_size, msgpack_object_type type) { int i = 0; msgpack_object_kv * p = NULL; if (o == NULL || subobj == NULL) { return -1; } for (i = 0; i < o->via.map.size; i++) { p = &o->via.map.ptr[i]; if (p->val.type != type) { continue; } if (flb_sds_cmp(key, p->key.via.str.ptr, p->key.via.str.size) == 0) { *subobj = p->val; return 0; } } return -1; } static int get_string(flb_sds_t * s, const msgpack_object * o, const flb_sds_t key) { msgpack_object tmp; if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0) { *s = flb_sds_create_len(tmp.via.str.ptr, tmp.via.str.size); return 0; } *s = 0; return -1; } static int get_severity_level(severity_t * s, const msgpack_object * o, const flb_sds_t key) { msgpack_object tmp; if (get_msgpack_obj(&tmp, o, key, flb_sds_len(key), MSGPACK_OBJECT_STR) == 0 && validate_severity_level(s, tmp.via.str.ptr, tmp.via.str.size) == 0) { return 0; } *s = 0; return -1; } static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * src_obj, const flb_sds_t key) { msgpack_object tmp; int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN); if (ret == 0 && tmp.via.boolean == true) { *trace_sampled_value = FLB_TRUE; return 0; } else if (ret == 0 && tmp.via.boolean == false) { *trace_sampled_value = FLB_FALSE; return 0; } return -1; } static insert_id_status validate_insert_id(msgpack_object * insert_id_value, const msgpack_object * obj) { int i = 0; msgpack_object_kv * p = NULL; insert_id_status ret = INSERTID_NOT_PRESENT; if (obj == NULL) { return ret; } for (i = 0; i < obj->via.map.size; i++) { p = &obj->via.map.ptr[i]; if (p->key.type != MSGPACK_OBJECT_STR) { continue; } if (validate_key(p->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) { if (p->val.type == MSGPACK_OBJECT_STR && p->val.via.str.size > 0) { *insert_id_value = p->val; ret = INSERTID_VALID; } else { ret = INSERTID_INVALID; } break; } } return ret; } static int pack_json_payload(int insert_id_extracted, int operation_extracted, int operation_extra_size, int source_location_extracted, int source_location_extra_size, int http_request_extracted, int http_request_extra_size, timestamp_status tms_status, msgpack_packer *mp_pck, msgpack_object *obj, struct flb_stackdriver *ctx) { /* Specified fields include local_resource_id, operation, sourceLocation ... */ int i, j; int to_remove = 0; int ret; int map_size; int new_map_size; int len; int len_to_be_removed; int key_not_found; flb_sds_t removed; flb_sds_t monitored_resource_key; flb_sds_t local_resource_id_key; flb_sds_t stream; msgpack_object_kv *kv = obj->via.map.ptr; msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size; monitored_resource_key = flb_sds_create(MONITORED_RESOURCE_KEY); local_resource_id_key = flb_sds_create(LOCAL_RESOURCE_ID_KEY); stream = flb_sds_create("stream"); /* * array of elements that need to be removed from payload, * special field 'operation' will be processed individually */ flb_sds_t to_be_removed[] = { monitored_resource_key, local_resource_id_key, ctx->labels_key, ctx->severity_key, ctx->trace_key, ctx->span_id_key, ctx->trace_sampled_key, ctx->log_name_key, stream /* more special fields are required to be added, but, if this grows with more than a few records, it might need to be converted to flb_hash */ }; if (insert_id_extracted == FLB_TRUE) { to_remove += 1; } if (operation_extracted == FLB_TRUE && operation_extra_size == 0) { to_remove += 1; } if (source_location_extracted == FLB_TRUE && source_location_extra_size == 0) { to_remove += 1; } if (http_request_extracted == FLB_TRUE && http_request_extra_size == 0) { to_remove += 1; } if (tms_status == FORMAT_TIMESTAMP_OBJECT) { to_remove += 1; } if (tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) { to_remove += 2; } map_size = obj->via.map.size; len_to_be_removed = sizeof(to_be_removed) / sizeof(to_be_removed[0]); for (i = 0; i < map_size; i++) { kv = &obj->via.map.ptr[i]; len = kv->key.via.str.size; for (j = 0; j < len_to_be_removed; j++) { removed = to_be_removed[j]; /* * check length of key to avoid partial matching * e.g. labels key = labels && kv->key = labelss */ if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { to_remove += 1; break; } } } new_map_size = map_size - to_remove; ret = msgpack_pack_map(mp_pck, new_map_size); if (ret < 0) { goto error; } /* points back to the beginning of map */ kv = obj->via.map.ptr; for(; kv != kvend; ++kv ) { key_not_found = 1; /* processing logging.googleapis.com/insertId */ if (insert_id_extracted == FLB_TRUE && validate_key(kv->key, DEFAULT_INSERT_ID_KEY, INSERT_ID_SIZE)) { continue; } /* processing logging.googleapis.com/operation */ if (validate_key(kv->key, OPERATION_FIELD_IN_JSON, OPERATION_KEY_SIZE) && kv->val.type == MSGPACK_OBJECT_MAP) { if (operation_extra_size > 0) { msgpack_pack_object(mp_pck, kv->key); pack_extra_operation_subfields(mp_pck, &kv->val, operation_extra_size); } continue; } if (validate_key(kv->key, SOURCELOCATION_FIELD_IN_JSON, SOURCE_LOCATION_SIZE) && kv->val.type == MSGPACK_OBJECT_MAP) { if (source_location_extra_size > 0) { msgpack_pack_object(mp_pck, kv->key); pack_extra_source_location_subfields(mp_pck, &kv->val, source_location_extra_size); } continue; } if (validate_key(kv->key, ctx->http_request_key, ctx->http_request_key_size) && kv->val.type == MSGPACK_OBJECT_MAP) { if(http_request_extra_size > 0) { msgpack_pack_object(mp_pck, kv->key); pack_extra_http_request_subfields(mp_pck, &kv->val, http_request_extra_size); } continue; } if (validate_key(kv->key, "timestamp", 9) && tms_status == FORMAT_TIMESTAMP_OBJECT) { continue; } if (validate_key(kv->key, "timestampSeconds", 16) && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) { continue; } if (validate_key(kv->key, "timestampNanos", 14) && tms_status == FORMAT_TIMESTAMP_DUO_FIELDS) { continue; } len = kv->key.via.str.size; for (j = 0; j < len_to_be_removed; j++) { removed = to_be_removed[j]; if (removed && flb_sds_cmp(removed, kv->key.via.str.ptr, len) == 0) { key_not_found = 0; break; } } if (key_not_found) { ret = msgpack_pack_object(mp_pck, kv->key); if (ret < 0) { goto error; } ret = msgpack_pack_object(mp_pck, kv->val); if (ret < 0) { goto error; } } } flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); return 0; error: flb_sds_destroy(monitored_resource_key); flb_sds_destroy(local_resource_id_key); flb_sds_destroy(stream); return ret; } static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, int total_records, const char *tag, int tag_len, const void *data, size_t bytes) { int len; int ret; int array_size = 0; /* The default value is 3: timestamp, jsonPayload, logName. */ int entry_size = 3; size_t s; // size_t off = 0; char path[PATH_MAX]; char time_formatted[255]; const char *newtag; const char *new_log_name; msgpack_object *obj; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; flb_sds_t out_buf; struct flb_mp_map_header mh; /* Parameters for severity */ int severity_extracted = FLB_FALSE; severity_t severity; /* Parameters for trace */ int trace_extracted = FLB_FALSE; flb_sds_t trace; char stackdriver_trace[PATH_MAX]; const char *new_trace; /* Parameters for span id */ int span_id_extracted = FLB_FALSE; flb_sds_t span_id; /* Parameters for trace sampled */ int trace_sampled_extracted = FLB_FALSE; int trace_sampled = FLB_FALSE; /* Parameters for log name */ int log_name_extracted = FLB_FALSE; flb_sds_t log_name = NULL; flb_sds_t stream = NULL; flb_sds_t stream_key; /* Parameters for insertId */ msgpack_object insert_id_obj; insert_id_status in_status; int insert_id_extracted; /* Parameters in Operation */ flb_sds_t operation_id; flb_sds_t operation_producer; int operation_first = FLB_FALSE; int operation_last = FLB_FALSE; int operation_extracted = FLB_FALSE; int operation_extra_size = 0; /* Parameters for sourceLocation */ flb_sds_t source_location_file; int64_t source_location_line = 0; flb_sds_t source_location_function; int source_location_extracted = FLB_FALSE; int source_location_extra_size = 0; /* Parameters for httpRequest */ struct http_request_field http_request; int http_request_extracted = FLB_FALSE; int http_request_extra_size = 0; /* Parameters for Timestamp */ struct tm tm; // struct flb_time tms; timestamp_status tms_status; /* Count number of records */ array_size = total_records; /* Parameters for labels */ msgpack_object *payload_labels_ptr; int labels_size = 0; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); return NULL; } /* * Search each entry and validate insertId. * Reject the entry if insertId is invalid. * If all the entries are rejected, stop formatting. * */ while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { /* Extract insertId */ in_status = validate_insert_id(&insert_id_obj, log_event.body); if (in_status == INSERTID_INVALID) { flb_plg_error(ctx->ins, "Incorrect insertId received. InsertId should be non-empty string."); array_size -= 1; } } flb_log_event_decoder_destroy(&log_decoder); /* Sounds like this should compare to -1 instead of zero */ if (array_size == 0) { return NULL; } /* Create temporal msgpack buffer */ msgpack_sbuffer_init(&mp_sbuf); msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); /* * Pack root map (resource & entries): * * {"resource": {"type": "...", "labels": {...}, * "entries": [] */ msgpack_pack_map(&mp_pck, 2); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "resource", 8); /* type & labels */ msgpack_pack_map(&mp_pck, 2); /* type */ msgpack_pack_str(&mp_pck, 4); msgpack_pack_str_body(&mp_pck, "type", 4); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->resource)); msgpack_pack_str_body(&mp_pck, ctx->resource, flb_sds_len(ctx->resource)); msgpack_pack_str(&mp_pck, 6); msgpack_pack_str_body(&mp_pck, "labels", 6); ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes); if (ret != 0) { if (ctx->resource_type == RESOURCE_TYPE_K8S) { ret = extract_local_resource_id(data, bytes, ctx, tag); if (ret != 0) { flb_plg_error(ctx->ins, "fail to construct local_resource_id"); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } } ret = parse_monitored_resource(ctx, data, bytes, &mp_pck); if (ret != 0) { if (strcmp(ctx->resource, "global") == 0) { /* global resource has field project_id */ msgpack_pack_map(&mp_pck, 1); msgpack_pack_str(&mp_pck, 10); msgpack_pack_str_body(&mp_pck, "project_id", 10); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(&mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); } else if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE || ctx->resource_type == RESOURCE_TYPE_GENERIC_TASK) { flb_mp_map_header_init(&mh, &mp_pck); if (ctx->resource_type == RESOURCE_TYPE_GENERIC_NODE && ctx->node_id) { /* generic_node has fields project_id, location, namespace, node_id */ flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "node_id", 7); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_id)); msgpack_pack_str_body(&mp_pck, ctx->node_id, flb_sds_len(ctx->node_id)); } else { /* generic_task has fields project_id, location, namespace, job, task_id */ if (ctx->job) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 3); msgpack_pack_str_body(&mp_pck, "job", 3); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->job)); msgpack_pack_str_body(&mp_pck, ctx->job, flb_sds_len(ctx->job)); } if (ctx->task_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "task_id", 7); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->task_id)); msgpack_pack_str_body(&mp_pck, ctx->task_id, flb_sds_len(ctx->task_id)); } } if (ctx->project_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 10); msgpack_pack_str_body(&mp_pck, "project_id", 10); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(&mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); } if (ctx->location) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "location", 8); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->location)); msgpack_pack_str_body(&mp_pck, ctx->location, flb_sds_len(ctx->location)); } if (ctx->namespace_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 9); msgpack_pack_str_body(&mp_pck, "namespace", 9); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_id)); msgpack_pack_str_body(&mp_pck, ctx->namespace_id, flb_sds_len(ctx->namespace_id)); } flb_mp_map_header_end(&mh); } else if (strcmp(ctx->resource, "gce_instance") == 0) { /* gce_instance resource has fields project_id, zone, instance_id */ flb_mp_map_header_init(&mh, &mp_pck); if (ctx->project_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 10); msgpack_pack_str_body(&mp_pck, "project_id", 10); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(&mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); } if (ctx->zone) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 4); msgpack_pack_str_body(&mp_pck, "zone", 4); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->zone)); msgpack_pack_str_body(&mp_pck, ctx->zone, flb_sds_len(ctx->zone)); } if (ctx->instance_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 11); msgpack_pack_str_body(&mp_pck, "instance_id", 11); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->instance_id)); msgpack_pack_str_body(&mp_pck, ctx->instance_id, flb_sds_len(ctx->instance_id)); } flb_mp_map_header_end(&mh); } else if (strcmp(ctx->resource, K8S_CONTAINER) == 0) { /* k8s_container resource has fields project_id, location, cluster_name, * namespace_name, pod_name, container_name * * The local_resource_id for k8s_container is in format: * k8s_container... */ ret = process_local_resource_id(ctx, tag, tag_len, K8S_CONTAINER); if (ret == -1) { flb_plg_error(ctx->ins, "fail to extract resource labels " "for k8s_container resource type"); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } flb_mp_map_header_init(&mh, &mp_pck); if (ctx->project_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 10); msgpack_pack_str_body(&mp_pck, "project_id", 10); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(&mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); } if (ctx->cluster_location) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "location", 8); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); msgpack_pack_str_body(&mp_pck, ctx->cluster_location, flb_sds_len(ctx->cluster_location)); } if (ctx->cluster_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 12); msgpack_pack_str_body(&mp_pck, "cluster_name", 12); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); msgpack_pack_str_body(&mp_pck, ctx->cluster_name, flb_sds_len(ctx->cluster_name)); } if (ctx->namespace_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 14); msgpack_pack_str_body(&mp_pck, "namespace_name", 14); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name)); msgpack_pack_str_body(&mp_pck, ctx->namespace_name, flb_sds_len(ctx->namespace_name)); } if (ctx->pod_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "pod_name", 8); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name)); msgpack_pack_str_body(&mp_pck, ctx->pod_name, flb_sds_len(ctx->pod_name)); } if (ctx->container_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 14); msgpack_pack_str_body(&mp_pck, "container_name", 14); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->container_name)); msgpack_pack_str_body(&mp_pck, ctx->container_name, flb_sds_len(ctx->container_name)); } flb_mp_map_header_end(&mh); } else if (strcmp(ctx->resource, K8S_NODE) == 0) { /* k8s_node resource has fields project_id, location, cluster_name, node_name * * The local_resource_id for k8s_node is in format: * k8s_node. */ ret = process_local_resource_id(ctx, tag, tag_len, K8S_NODE); if (ret == -1) { flb_plg_error(ctx->ins, "fail to process local_resource_id from " "log entry for k8s_node"); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } flb_mp_map_header_init(&mh, &mp_pck); if (ctx->project_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 10); msgpack_pack_str_body(&mp_pck, "project_id", 10); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(&mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); } if (ctx->cluster_location) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "location", 8); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); msgpack_pack_str_body(&mp_pck, ctx->cluster_location, flb_sds_len(ctx->cluster_location)); } if (ctx->cluster_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 12); msgpack_pack_str_body(&mp_pck, "cluster_name", 12); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); msgpack_pack_str_body(&mp_pck, ctx->cluster_name, flb_sds_len(ctx->cluster_name)); } if (ctx->node_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 9); msgpack_pack_str_body(&mp_pck, "node_name", 9); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->node_name)); msgpack_pack_str_body(&mp_pck, ctx->node_name, flb_sds_len(ctx->node_name)); } flb_mp_map_header_end(&mh); } else if (strcmp(ctx->resource, K8S_POD) == 0) { /* k8s_pod resource has fields project_id, location, cluster_name, * namespace_name, pod_name. * * The local_resource_id for k8s_pod is in format: * k8s_pod.. */ ret = process_local_resource_id(ctx, tag, tag_len, K8S_POD); if (ret != 0) { flb_plg_error(ctx->ins, "fail to process local_resource_id from " "log entry for k8s_pod"); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } flb_mp_map_header_init(&mh, &mp_pck); if (ctx->project_id) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 10); msgpack_pack_str_body(&mp_pck, "project_id", 10); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->project_id)); msgpack_pack_str_body(&mp_pck, ctx->project_id, flb_sds_len(ctx->project_id)); } if (ctx->cluster_location) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "location", 8); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_location)); msgpack_pack_str_body(&mp_pck, ctx->cluster_location, flb_sds_len(ctx->cluster_location)); } if (ctx->cluster_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 12); msgpack_pack_str_body(&mp_pck, "cluster_name", 12); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->cluster_name)); msgpack_pack_str_body(&mp_pck, ctx->cluster_name, flb_sds_len(ctx->cluster_name)); } if (ctx->namespace_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 14); msgpack_pack_str_body(&mp_pck, "namespace_name", 14); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->namespace_name)); msgpack_pack_str_body(&mp_pck, ctx->namespace_name, flb_sds_len(ctx->namespace_name)); } if (ctx->pod_name) { flb_mp_map_header_append(&mh); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "pod_name", 8); msgpack_pack_str(&mp_pck, flb_sds_len(ctx->pod_name)); msgpack_pack_str_body(&mp_pck, ctx->pod_name, flb_sds_len(ctx->pod_name)); } flb_mp_map_header_end(&mh); } else { flb_plg_error(ctx->ins, "unsupported resource type '%s'", ctx->resource); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } } } msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "entries", 7); /* Append entries */ msgpack_pack_array(&mp_pck, array_size); ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { obj = log_event.body; tms_status = extract_timestamp(obj, &log_event.timestamp); /* * Pack entry * * { * "severity": "...", * "labels": "...", * "logName": "...", * "jsonPayload": {...}, * "timestamp": "...", * "spanId": "...", * "traceSampled": , * "trace": "..." * } */ entry_size = 3; /* Extract severity */ severity_extracted = FLB_FALSE; if (ctx->severity_key && get_severity_level(&severity, obj, ctx->severity_key) == 0) { severity_extracted = FLB_TRUE; entry_size += 1; } /* Extract trace */ trace_extracted = FLB_FALSE; if (ctx->trace_key && get_string(&trace, obj, ctx->trace_key) == 0) { trace_extracted = FLB_TRUE; entry_size += 1; } /* Extract span id */ span_id_extracted = FLB_FALSE; if (ctx->span_id_key && get_string(&span_id, obj, ctx->span_id_key) == 0) { span_id_extracted = FLB_TRUE; entry_size += 1; } /* Extract trace sampled */ trace_sampled_extracted = FLB_FALSE; if (ctx->trace_sampled_key && get_trace_sampled(&trace_sampled, obj, ctx->trace_sampled_key) == 0) { trace_sampled_extracted = FLB_TRUE; entry_size += 1; } /* Extract log name */ log_name_extracted = FLB_FALSE; if (ctx->log_name_key && get_string(&log_name, obj, ctx->log_name_key) == 0) { log_name_extracted = FLB_TRUE; } /* Extract insertId */ in_status = validate_insert_id(&insert_id_obj, obj); if (in_status == INSERTID_VALID) { insert_id_extracted = FLB_TRUE; entry_size += 1; } else if (in_status == INSERTID_NOT_PRESENT) { insert_id_extracted = FLB_FALSE; } else { if (log_name_extracted == FLB_TRUE) { flb_sds_destroy(log_name); } continue; } /* Extract operation */ operation_id = flb_sds_create(""); operation_producer = flb_sds_create(""); operation_first = FLB_FALSE; operation_last = FLB_FALSE; operation_extra_size = 0; operation_extracted = extract_operation(&operation_id, &operation_producer, &operation_first, &operation_last, obj, &operation_extra_size); if (operation_extracted == FLB_TRUE) { entry_size += 1; } /* Extract sourceLocation */ source_location_file = flb_sds_create(""); source_location_line = 0; source_location_function = flb_sds_create(""); source_location_extra_size = 0; source_location_extracted = extract_source_location(&source_location_file, &source_location_line, &source_location_function, obj, &source_location_extra_size); if (source_location_extracted == FLB_TRUE) { entry_size += 1; } /* Extract httpRequest */ init_http_request(&http_request); http_request_extra_size = 0; http_request_extracted = extract_http_request(&http_request, ctx->http_request_key, ctx->http_request_key_size, obj, &http_request_extra_size); if (http_request_extracted == FLB_TRUE) { entry_size += 1; } /* Extract payload labels */ payload_labels_ptr = get_payload_labels(ctx, obj); if (payload_labels_ptr != NULL && payload_labels_ptr->type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "the type of payload labels should be map"); flb_sds_destroy(operation_id); flb_sds_destroy(operation_producer); flb_log_event_decoder_destroy(&log_decoder); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; } /* Number of parsed labels */ labels_size = mk_list_size(&ctx->config_labels); if (payload_labels_ptr != NULL && payload_labels_ptr->type == MSGPACK_OBJECT_MAP) { labels_size += payload_labels_ptr->via.map.size; } if (labels_size > 0) { entry_size += 1; } msgpack_pack_map(&mp_pck, entry_size); /* Add severity into the log entry */ if (severity_extracted == FLB_TRUE) { msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "severity", 8); msgpack_pack_int(&mp_pck, severity); } /* Add trace into the log entry */ if (trace_extracted == FLB_TRUE) { msgpack_pack_str(&mp_pck, 5); msgpack_pack_str_body(&mp_pck, "trace", 5); if (ctx->autoformat_stackdriver_trace) { len = snprintf(stackdriver_trace, sizeof(stackdriver_trace) - 1, "projects/%s/traces/%s", ctx->project_id, trace); new_trace = stackdriver_trace; } else { len = flb_sds_len(trace); new_trace = trace; } msgpack_pack_str(&mp_pck, len); msgpack_pack_str_body(&mp_pck, new_trace, len); flb_sds_destroy(trace); } /* Add spanId field into the log entry */ if (span_id_extracted == FLB_TRUE) { msgpack_pack_str_with_body(&mp_pck, "spanId", 6); len = flb_sds_len(span_id); msgpack_pack_str_with_body(&mp_pck, span_id, len); flb_sds_destroy(span_id); } /* Add traceSampled field into the log entry */ if (trace_sampled_extracted == FLB_TRUE) { msgpack_pack_str_with_body(&mp_pck, "traceSampled", 12); if (trace_sampled == FLB_TRUE) { msgpack_pack_true(&mp_pck); } else { msgpack_pack_false(&mp_pck); } } /* Add insertId field into the log entry */ if (insert_id_extracted == FLB_TRUE) { msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "insertId", 8); msgpack_pack_object(&mp_pck, insert_id_obj); } /* Add operation field into the log entry */ if (operation_extracted == FLB_TRUE) { add_operation_field(&operation_id, &operation_producer, &operation_first, &operation_last, &mp_pck); } /* Add sourceLocation field into the log entry */ if (source_location_extracted == FLB_TRUE) { add_source_location_field(&source_location_file, source_location_line, &source_location_function, &mp_pck); } /* Add httpRequest field into the log entry */ if (http_request_extracted == FLB_TRUE) { add_http_request_field(&http_request, &mp_pck); } /* labels */ if (labels_size > 0) { msgpack_pack_str(&mp_pck, 6); msgpack_pack_str_body(&mp_pck, "labels", 6); pack_labels(ctx, &mp_pck, payload_labels_ptr); } /* Clean up id and producer if operation extracted */ flb_sds_destroy(operation_id); flb_sds_destroy(operation_producer); flb_sds_destroy(source_location_file); flb_sds_destroy(source_location_function); destroy_http_request(&http_request); /* jsonPayload */ msgpack_pack_str(&mp_pck, 11); msgpack_pack_str_body(&mp_pck, "jsonPayload", 11); pack_json_payload(insert_id_extracted, operation_extracted, operation_extra_size, source_location_extracted, source_location_extra_size, http_request_extracted, http_request_extra_size, tms_status, &mp_pck, obj, ctx); /* avoid modifying the original tag */ newtag = tag; stream_key = flb_sds_create("stream"); if (ctx->resource_type == RESOURCE_TYPE_K8S && get_string(&stream, obj, stream_key) == 0) { if (flb_sds_cmp(stream, STDOUT, flb_sds_len(stream)) == 0) { newtag = "stdout"; } else if (flb_sds_cmp(stream, STDERR, flb_sds_len(stream)) == 0) { newtag = "stderr"; } } if (log_name_extracted == FLB_FALSE) { new_log_name = newtag; } else { new_log_name = log_name; } /* logName */ len = snprintf(path, sizeof(path) - 1, "projects/%s/logs/%s", ctx->export_to_project_id, new_log_name); if (log_name_extracted == FLB_TRUE) { flb_sds_destroy(log_name); } msgpack_pack_str(&mp_pck, 7); msgpack_pack_str_body(&mp_pck, "logName", 7); msgpack_pack_str(&mp_pck, len); msgpack_pack_str_body(&mp_pck, path, len); flb_sds_destroy(stream_key); flb_sds_destroy(stream); /* timestamp */ msgpack_pack_str(&mp_pck, 9); msgpack_pack_str_body(&mp_pck, "timestamp", 9); /* Format the time */ /* * If format is timestamp_object or timestamp_duo_fields, * tms has been updated. * * If timestamp is not presen, * use the default tms(current time). */ gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, FLB_STD_TIME_FMT, &tm); len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%09" PRIu64 "Z", (uint64_t) log_event.timestamp.tm.tv_nsec); s += len; msgpack_pack_str(&mp_pck, s); msgpack_pack_str_body(&mp_pck, time_formatted, s); } flb_log_event_decoder_destroy(&log_decoder); /* Convert from msgpack to JSON */ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); msgpack_sbuffer_destroy(&mp_sbuf); if (!out_buf) { flb_plg_error(ctx->ins, "error formatting JSON payload"); return NULL; } return out_buf; } static int stackdriver_format_test(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, void *flush_ctx, int event_type, const char *tag, int tag_len, const void *data, size_t bytes, void **out_data, size_t *out_size) { int total_records; flb_sds_t payload = NULL; struct flb_stackdriver *ctx = plugin_context; /* Count number of records */ total_records = flb_mp_count(data, bytes); payload = stackdriver_format(ctx, total_records, (char *) tag, tag_len, data, bytes); if (payload == NULL) { return -1; } *out_data = payload; *out_size = flb_sds_len(payload); return 0; } #ifdef FLB_HAVE_METRICS static void update_http_metrics(struct flb_stackdriver *ctx, struct flb_event_chunk *event_chunk, uint64_t ts, int http_status) { char tmp[32]; /* convert status to string format */ snprintf(tmp, sizeof(tmp) - 1, "%i", http_status); char *name = (char *) flb_output_name(ctx->ins); /* processed records total */ cmt_counter_add(ctx->cmt_proc_records_total, ts, event_chunk->total_events, 2, (char *[]) {tmp, name}); /* HTTP status */ if (http_status != STACKDRIVER_NET_ERROR) { cmt_counter_inc(ctx->cmt_requests_total, ts, 2, (char *[]) {tmp, name}); } } static void update_retry_metric(struct flb_stackdriver *ctx, struct flb_event_chunk *event_chunk, uint64_t ts, int http_status, int ret_code) { char tmp[32]; char *name = (char *) flb_output_name(ctx->ins); if (ret_code != FLB_RETRY) { return; } /* convert status to string format */ snprintf(tmp, sizeof(tmp) - 1, "%i", http_status); cmt_counter_add(ctx->cmt_retried_records_total, ts, event_chunk->total_events, 2, (char *[]) {tmp, name}); } #endif static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, void *out_context, struct flb_config *config) { (void) i_ins; (void) config; int ret; int ret_code = FLB_RETRY; size_t b_sent; flb_sds_t token; flb_sds_t payload_buf; void *compressed_payload_buffer = NULL; size_t compressed_payload_size; struct flb_stackdriver *ctx = out_context; struct flb_connection *u_conn; struct flb_http_client *c; int compressed = FLB_FALSE; #ifdef FLB_HAVE_METRICS char *name = (char *) flb_output_name(ctx->ins); uint64_t ts = cfl_time_now(); #endif /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { #ifdef FLB_HAVE_METRICS cmt_counter_inc(ctx->cmt_failed_requests, ts, 1, (char *[]) {name}); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); update_retry_metric(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR, FLB_RETRY); #endif FLB_OUTPUT_RETURN(FLB_RETRY); } /* Reformat msgpack to stackdriver JSON payload */ payload_buf = stackdriver_format(ctx, event_chunk->total_events, event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data, event_chunk->size); if (!payload_buf) { #ifdef FLB_HAVE_METRICS cmt_counter_inc(ctx->cmt_failed_requests, ts, 1, (char *[]) {name}); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); #endif flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Get or renew Token */ token = get_google_token(ctx); if (!token) { flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); flb_upstream_conn_release(u_conn); flb_sds_destroy(payload_buf); #ifdef FLB_HAVE_METRICS cmt_counter_inc(ctx->cmt_failed_requests, ts, 1, (char *[]) {name}); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); #endif FLB_OUTPUT_RETURN(FLB_RETRY); } compressed_payload_buffer = payload_buf; compressed_payload_size = flb_sds_len(payload_buf); if (ctx->compress_gzip == FLB_TRUE) { ret = flb_gzip_compress((void *) payload_buf, flb_sds_len(payload_buf), &compressed_payload_buffer, &compressed_payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression"); } else { compressed = FLB_TRUE; flb_sds_destroy(payload_buf); } } /* Compose HTTP Client request */ c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_STD_WRITE_URI, compressed_payload_buffer, compressed_payload_size, NULL, 0, NULL, 0); flb_http_buffer_size(c, 4192); if (ctx->stackdriver_agent) { flb_http_add_header(c, "User-Agent", 10, ctx->stackdriver_agent, flb_sds_len(ctx->stackdriver_agent)); } else { flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); } flb_http_add_header(c, "Content-Type", 12, "application/json", 16); flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); /* Content Encoding: gzip */ if (compressed == FLB_TRUE) { flb_http_set_content_encoding_gzip(c); } /* Send HTTP request */ ret = flb_http_do(c, &b_sent); /* validate response */ if (ret != 0) { flb_plg_warn(ctx->ins, "http_do=%i", ret); ret_code = FLB_RETRY; #ifdef FLB_HAVE_METRICS update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); #endif } else { /* The request was issued successfully, validate the 'error' field */ flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); if (c->resp.status == 200) { ret_code = FLB_OK; } else if (c->resp.status >= 400 && c->resp.status < 500) { ret_code = FLB_ERROR; flb_plg_warn(ctx->ins, "error\n%s", c->resp.payload); } else { if (c->resp.payload_size > 0) { /* we got an error */ flb_plg_warn(ctx->ins, "error\n%s", c->resp.payload); } else { flb_plg_debug(ctx->ins, "response\n%s", c->resp.payload); } ret_code = FLB_RETRY; } } /* Update specific stackdriver metrics */ #ifdef FLB_HAVE_METRICS if (ret_code == FLB_OK) { cmt_counter_inc(ctx->cmt_successful_requests, ts, 1, (char *[]) {name}); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics); } else { cmt_counter_inc(ctx->cmt_failed_requests, ts, 1, (char *[]) {name}); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); } /* Update metrics counter by using labels/http status code */ if (ret == 0) { update_http_metrics(ctx, event_chunk, ts, c->resp.status); } /* Update retry count if necessary */ update_retry_metric(ctx, event_chunk, ts, c->resp.status, ret_code); #endif /* Cleanup */ if (compressed == FLB_TRUE) { flb_free(compressed_payload_buffer); } else { flb_sds_destroy(payload_buf); } flb_sds_destroy(token); flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); /* Done */ FLB_OUTPUT_RETURN(ret_code); } static int cb_stackdriver_exit(void *data, struct flb_config *config) { struct flb_stackdriver *ctx = data; if (!ctx) { return -1; } flb_stackdriver_conf_destroy(ctx); return 0; } static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, credentials_file), "Set the path for the google service credentials file" }, { FLB_CONFIG_MAP_STR, "metadata_server", (char *)NULL, 0, FLB_FALSE, 0, "Set the metadata server" }, { FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, client_email), "Set the service account email" }, // set in flb_bigquery_oauth_credentials { FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, private_key), "Set the service account secret" }, { FLB_CONFIG_MAP_STR, "export_to_project_id", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, export_to_project_id), "Export to project id" }, { FLB_CONFIG_MAP_STR, "resource", FLB_SDS_RESOURCE_TYPE, 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource), "Set the resource" }, { FLB_CONFIG_MAP_STR, "severity_key", DEFAULT_SEVERITY_KEY, 0, FLB_TRUE, offsetof(struct flb_stackdriver, severity_key), "Set the severity key" }, { FLB_CONFIG_MAP_BOOL, "autoformat_stackdriver_trace", "false", 0, FLB_TRUE, offsetof(struct flb_stackdriver, autoformat_stackdriver_trace), "Autoformat the stackdriver trace" }, { FLB_CONFIG_MAP_STR, "trace_key", DEFAULT_TRACE_KEY, 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_key), "Set the trace key" }, { FLB_CONFIG_MAP_STR, "span_id_key", DEFAULT_SPAN_ID_KEY, 0, FLB_TRUE, offsetof(struct flb_stackdriver, span_id_key), "Set the span id key" }, { FLB_CONFIG_MAP_STR, "trace_sampled_key", DEFAULT_TRACE_SAMPLED_KEY, 0, FLB_TRUE, offsetof(struct flb_stackdriver, trace_sampled_key), "Set the trace sampled key" }, { FLB_CONFIG_MAP_STR, "log_name_key", DEFAULT_LOG_NAME_KEY, 0, FLB_TRUE, offsetof(struct flb_stackdriver, log_name_key), "Set the logname key" }, { FLB_CONFIG_MAP_STR, "http_request_key", HTTPREQUEST_FIELD_IN_JSON, 0, FLB_TRUE, offsetof(struct flb_stackdriver, http_request_key), "Set the http request key" }, { FLB_CONFIG_MAP_STR, "k8s_cluster_name", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_name), "Set the kubernetes cluster name" }, { FLB_CONFIG_MAP_STR, "k8s_cluster_location", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, cluster_location), "Set the kubernetes cluster location" }, { FLB_CONFIG_MAP_STR, "location", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, location), "Set the resource location" }, { FLB_CONFIG_MAP_STR, "namespace", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, namespace_id), "Set the resource namespace" }, { FLB_CONFIG_MAP_STR, "node_id", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, node_id), "Set the resource node id" }, { FLB_CONFIG_MAP_STR, "job", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, job), "Set the resource job" }, { FLB_CONFIG_MAP_STR, "task_id", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, task_id), "Set the resource task id" }, { FLB_CONFIG_MAP_STR, "compress", NULL, 0, FLB_FALSE, 0, "Set log payload compression method. Option available is 'gzip'" }, { FLB_CONFIG_MAP_CLIST, "labels", NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels), "Set the labels" }, { FLB_CONFIG_MAP_STR, "labels_key", DEFAULT_LABELS_KEY, 0, FLB_TRUE, offsetof(struct flb_stackdriver, labels_key), "Set the labels key" }, { FLB_CONFIG_MAP_STR, "tag_prefix", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, tag_prefix), "Set the tag prefix" }, { FLB_CONFIG_MAP_STR, "stackdriver_agent", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, stackdriver_agent), "Set the stackdriver agent" }, /* Custom Regex */ { FLB_CONFIG_MAP_STR, "custom_k8s_regex", DEFAULT_TAG_REGEX, 0, FLB_TRUE, offsetof(struct flb_stackdriver, custom_k8s_regex), "Set a custom kubernetes regex filter" }, { FLB_CONFIG_MAP_CLIST, "resource_labels", NULL, 0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels), "Set the resource labels" }, /* EOF */ {0} }; struct flb_output_plugin out_stackdriver_plugin = { .name = "stackdriver", .description = "Send events to Google Stackdriver Logging", .cb_init = cb_stackdriver_init, .cb_flush = cb_stackdriver_flush, .cb_exit = cb_stackdriver_exit, .workers = 1, .config_map = config_map, /* Test */ .test_formatter.callback = stackdriver_format_test, /* Plugin flags */ .flags = FLB_OUTPUT_NET | FLB_IO_TLS, };