diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_bigquery/bigquery.c')
-rw-r--r-- | src/fluent-bit/plugins/out_bigquery/bigquery.c | 1159 |
1 files changed, 0 insertions, 1159 deletions
diff --git a/src/fluent-bit/plugins/out_bigquery/bigquery.c b/src/fluent-bit/plugins/out_bigquery/bigquery.c deleted file mode 100644 index ab5b4657f..000000000 --- a/src/fluent-bit/plugins/out_bigquery/bigquery.c +++ /dev/null @@ -1,1159 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_oauth2.h> -#include <fluent-bit/flb_base64.h> -#include <fluent-bit/flb_hash.h> -#include <fluent-bit/flb_crypto.h> -#include <fluent-bit/flb_signv4.h> -#include <fluent-bit/flb_log_event_decoder.h> -#include <fluent-bit/flb_kv.h> - -#include <msgpack.h> - -#include "bigquery.h" -#include "bigquery_conf.h" - -// TODO: The following code is copied from the Stackdriver plugin and should be -// factored into common library functions. - -/* - * Base64 Encoding in JWT must: - * - * - remove any trailing padding '=' character - * - replace '+' with '-' - * - replace '/' with '_' - * - * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C - */ -int bigquery_jwt_base64_url_encode(unsigned char *out_buf, size_t out_size, - unsigned char *in_buf, size_t in_size, - size_t *olen) - -{ - int i; - size_t len; - int result; - - /* do normal base64 encoding */ - result = flb_base64_encode((unsigned char *) out_buf, out_size - 1, - &len, in_buf, in_size); - if (result != 0) { - return -1; - } - - /* Replace '+' and '/' characters */ - for (i = 0; i < len && out_buf[i] != '='; i++) { - if (out_buf[i] == '+') { - out_buf[i] = '-'; - } - else if (out_buf[i] == '/') { - out_buf[i] = '_'; - } - } - - /* Now 'i' becomes the new length */ - *olen = i; - return 0; -} - -static int bigquery_jwt_encode(struct flb_bigquery *ctx, - char *payload, char *secret, - char **out_signature, size_t *out_size) -{ - int ret; - int len; - int buf_size; - size_t olen; - char *buf; - char *sigd; - char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}"; - unsigned char sha256_buf[32] = {0}; - flb_sds_t out; - unsigned char sig[256] = {0}; - size_t sig_len; - - buf_size = (strlen(payload) + strlen(secret)) * 2; - buf = flb_malloc(buf_size); - if (!buf) { - flb_errno(); - return -1; - } - - /* Encode header */ - len = strlen(headers); - ret = flb_base64_encode((unsigned char *) buf, buf_size - 1, - &olen, (unsigned char *) headers, len); - if (ret != 0) { - flb_free(buf); - - return ret; - } - - /* Create buffer to store JWT */ - out = flb_sds_create_size(2048); - if (!out) { - flb_errno(); - flb_free(buf); - return -1; - } - - /* Append header */ - out = flb_sds_cat(out, buf, olen); - out = flb_sds_cat(out, ".", 1); - - /* Encode Payload */ - len = strlen(payload); - bigquery_jwt_base64_url_encode((unsigned char *) buf, buf_size, - (unsigned char *) payload, len, &olen); - - /* Append Payload */ - out = flb_sds_cat(out, buf, olen); - - /* do sha256() of base64(header).base64(payload) */ - ret = flb_hash_simple(FLB_HASH_SHA256, - (unsigned char *) out, flb_sds_len(out), - sha256_buf, sizeof(sha256_buf)); - - if (ret != FLB_CRYPTO_SUCCESS) { - flb_plg_error(ctx->ins, "error hashing token"); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - /* In mbedTLS cert length must include the null byte */ - len = strlen(secret) + 1; - - sig_len = sizeof(sig); - - ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY, - FLB_CRYPTO_PADDING_PKCS1, - FLB_HASH_SHA256, - (unsigned char *) secret, len, - sha256_buf, sizeof(sha256_buf), - sig, &sig_len); - - if (ret != FLB_CRYPTO_SUCCESS) { - flb_plg_error(ctx->ins, "error creating RSA context"); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - sigd = flb_malloc(2048); - if (!sigd) { - flb_errno(); - flb_free(buf); - flb_sds_destroy(out); - return -1; - } - - bigquery_jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen); - - out = flb_sds_cat(out, ".", 1); - out = flb_sds_cat(out, sigd, olen); - - *out_signature = out; - *out_size = flb_sds_len(out); - - flb_free(buf); - flb_free(sigd); - - return 0; -} - -/* Create a new oauth2 context and get a oauth2 token */ -static int bigquery_get_oauth2_token(struct flb_bigquery *ctx) -{ - int ret; - char *token; - char *sig_data; - size_t sig_size; - time_t issued; - time_t expires; - char payload[1024]; - - /* Clear any previous oauth2 payload content */ - flb_oauth2_payload_clear(ctx->o); - - /* JWT encode for oauth2 */ - issued = time(NULL); - expires = issued + FLB_BIGQUERY_TOKEN_REFRESH; - - snprintf(payload, sizeof(payload) - 1, - "{\"iss\": \"%s\", \"scope\": \"%s\", " - "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}", - ctx->oauth_credentials->client_email, FLB_BIGQUERY_SCOPE, - FLB_BIGQUERY_AUTH_URL, - expires, issued); - - /* Compose JWT signature */ - ret = bigquery_jwt_encode(ctx, payload, ctx->oauth_credentials->private_key, - &sig_data, &sig_size); - if (ret != 0) { - flb_plg_error(ctx->ins, "JWT signature generation failed"); - return -1; - } - - flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data); - - ret = flb_oauth2_payload_append(ctx->o, - "grant_type", -1, - "urn%3Aietf%3Aparams%3Aoauth%3A" - "grant-type%3Ajwt-bearer", -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - flb_sds_destroy(sig_data); - return -1; - } - - ret = flb_oauth2_payload_append(ctx->o, - "assertion", -1, - sig_data, sig_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - flb_sds_destroy(sig_data); - return -1; - } - flb_sds_destroy(sig_data); - - /* Retrieve access token */ - token = flb_oauth2_token_get(ctx->o); - if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); - return -1; - } - - return 0; -} - -static flb_sds_t add_aws_signature(struct flb_http_client *c, struct flb_bigquery *ctx) { - flb_sds_t signature; - - flb_plg_debug(ctx->ins, "Signing the request with AWS SigV4 using IMDS credentials"); - - signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL), - ctx->aws_region, "sts", - 0, NULL, ctx->aws_provider); - if (!signature) { - flb_plg_error(ctx->ins, "Could not sign the request with AWS SigV4"); - return NULL; - } - - return signature; -} - -static inline int to_encode_path(char c) -{ - if ((c >= 48 && c <= 57) || /* 0-9 */ - (c >= 65 && c <= 90) || /* A-Z */ - (c >= 97 && c <= 122) || /* a-z */ - (c == '-' || c == '_' || c == '.' || c == '~' || c == '/')) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static flb_sds_t uri_encode(const char *uri, size_t len) -{ - int i; - flb_sds_t buf = NULL; - flb_sds_t tmp = NULL; - - buf = flb_sds_create_size(len * 2); - if (!buf) { - flb_error("[uri_encode] cannot allocate buffer for URI encoding"); - return NULL; - } - - for (i = 0; i < len; i++) { - if (to_encode_path(uri[i]) == FLB_TRUE) { - tmp = flb_sds_printf(&buf, "%%%02X", (unsigned char) *(uri + i)); - if (!tmp) { - flb_error("[uri_encode] error formatting special character"); - flb_sds_destroy(buf); - return NULL; - } - continue; - } - - /* Direct assignment, just copy the character */ - if (buf) { - tmp = flb_sds_cat(buf, uri + i, 1); - if (!tmp) { - flb_error("[uri_encode] error composing outgoing buffer"); - flb_sds_destroy(buf); - return NULL; - } - buf = tmp; - } - } - - return buf; -} - -/* https://cloud.google.com/iam/docs/using-workload-identity-federation */ -static int bigquery_exchange_aws_creds_for_google_oauth(struct flb_bigquery *ctx) -{ - struct flb_connection *aws_sts_conn; - struct flb_connection *google_sts_conn = NULL; - struct flb_connection *google_gen_access_token_conn = NULL; - struct flb_http_client *aws_sts_c = NULL; - struct flb_http_client *google_sts_c = NULL; - struct flb_http_client *google_gen_access_token_c = NULL; - int google_sts_ret; - int google_gen_access_token_ret; - size_t b_sent_google_sts; - size_t b_sent_google_gen_access_token; - flb_sds_t signature = NULL; - flb_sds_t sigv4_amz_date = NULL; - flb_sds_t sigv4_amz_sec_token = NULL; - flb_sds_t aws_gci_url = NULL; - flb_sds_t aws_gci_goog_target_resource = NULL; - flb_sds_t aws_gci_token = NULL; - flb_sds_t aws_gci_token_encoded = NULL; - flb_sds_t google_sts_token = NULL; - flb_sds_t google_gen_access_token_body = NULL; - flb_sds_t google_gen_access_token_url = NULL; - flb_sds_t google_federated_token = NULL; - flb_sds_t google_auth_header = NULL; - - if (ctx->sa_token) { - flb_sds_destroy(ctx->sa_token); - ctx->sa_token = NULL; - } - - /* Sign an AWS STS request with AWS SigV4 signature */ - aws_sts_conn = flb_upstream_conn_get(ctx->aws_sts_upstream); - if (!aws_sts_conn) { - flb_plg_error(ctx->ins, "Failed to get upstream connection for AWS STS"); - goto error; - } - - aws_sts_c = flb_http_client(aws_sts_conn, FLB_HTTP_POST, FLB_BIGQUERY_AWS_STS_ENDPOINT, - NULL, 0, NULL, 0, NULL, 0); - if (!aws_sts_c) { - flb_plg_error(ctx->ins, "Failed to create HTTP client for AWS STS"); - goto error; - } - - signature = add_aws_signature(aws_sts_c, ctx); - if (!signature) { - flb_plg_error(ctx->ins, "Failed to sign AWS STS request"); - goto error; - } - - sigv4_amz_date = flb_sds_create(flb_kv_get_key_value("x-amz-date", &aws_sts_c->headers)); - if (!sigv4_amz_date) { - flb_plg_error(ctx->ins, "Failed to extract `x-amz-date` header from AWS STS signed request"); - goto error; - } - - sigv4_amz_sec_token = flb_sds_create(flb_kv_get_key_value("x-amz-security-token", &aws_sts_c->headers)); - if (!sigv4_amz_sec_token) { - flb_plg_error(ctx->ins, "Failed to extract `x-amz-security-token` header from AWS STS signed request"); - goto error; - } - - /* Create an AWS GetCallerIdentity token */ - - /* AWS STS endpoint URL */ - aws_gci_url = flb_sds_create_size(128); - aws_gci_url = flb_sds_printf(&aws_gci_url, - "https://%s%s", - ctx->aws_sts_endpoint, - FLB_BIGQUERY_AWS_STS_ENDPOINT); - - /* x-goog-cloud-target-resource header */ - aws_gci_goog_target_resource = flb_sds_create_size(128); - aws_gci_goog_target_resource = flb_sds_printf(&aws_gci_goog_target_resource, - FLB_BIGQUERY_GOOGLE_CLOUD_TARGET_RESOURCE, - ctx->project_number, ctx->pool_id, ctx->provider_id); - - aws_gci_token = flb_sds_create_size(2048); - aws_gci_token = flb_sds_printf( - &aws_gci_token, - "{\"url\":\"%s\",\"method\":\"POST\",\"headers\":[" - "{\"key\":\"Authorization\",\"value\":\"%s\"}," - "{\"key\":\"host\",\"value\":\"%s\"}," - "{\"key\":\"x-amz-date\",\"value\":\"%s\"}," - "{\"key\":\"x-goog-cloud-target-resource\",\"value\":\"%s\"}," - "{\"key\":\"x-amz-security-token\",\"value\":\"%s\"}" - "]}", - aws_gci_url, - signature, - ctx->aws_sts_endpoint, - sigv4_amz_date, - aws_gci_goog_target_resource, - sigv4_amz_sec_token); - - aws_gci_token_encoded = uri_encode(aws_gci_token, flb_sds_len(aws_gci_token)); - if (!aws_gci_token_encoded) { - flb_plg_error(ctx->ins, "Failed to encode GetCallerIdentity token"); - goto error; - } - - /* To exchange the AWS credential for a federated access token, - * we need to pass the AWS GetCallerIdentity token to the Google Security Token Service's token() method */ - google_sts_token = flb_sds_create_size(2048); - google_sts_token = flb_sds_printf( - &google_sts_token, - "{\"audience\":\"%s\"," - "\"grantType\":\"%s\"," - "\"requestedTokenType\":\"%s\"," - "\"scope\":\"%s\"," - "\"subjectTokenType\":\"%s\"," - "\"subjectToken\":\"%s\"}", - aws_gci_goog_target_resource, - FLB_BIGQUERY_GOOGLE_STS_TOKEN_GRANT_TYPE, - FLB_BIGQUERY_GOOGLE_STS_TOKEN_REQUESTED_TOKEN_TYPE, - FLB_BIGQUERY_GOOGLE_STS_TOKEN_SCOPE, - FLB_BIGQUERY_GOOGLE_STS_TOKEN_SUBJECT_TOKEN_TYPE, - aws_gci_token_encoded); - - google_sts_conn = flb_upstream_conn_get(ctx->google_sts_upstream); - if (!google_sts_conn) { - flb_plg_error(ctx->ins, "Google STS connection setup failed"); - goto error; - } - - google_sts_c = flb_http_client(google_sts_conn, FLB_HTTP_POST, FLB_BIGQUERY_GOOGLE_CLOUD_TOKEN_ENDPOINT, - google_sts_token, flb_sds_len(google_sts_token), - NULL, 0, NULL, 0); - - google_sts_ret = flb_http_do(google_sts_c, &b_sent_google_sts); - if (google_sts_ret != 0) { - flb_plg_error(ctx->ins, "Google STS token request http_do=%i", google_sts_ret); - goto error; - } - - if (google_sts_c->resp.status != 200) { - flb_plg_error(ctx->ins, "Google STS token response status: %i, payload:\n%s", - google_sts_c->resp.status, google_sts_c->resp.payload); - goto error; - } - - /* To exchange the federated token for a service account access token, - * we need to call the Google Service Account Credentials API generateAccessToken() method */ - google_federated_token = flb_json_get_val(google_sts_c->resp.payload, - google_sts_c->resp.payload_size, - "access_token"); - if (!google_federated_token) { - flb_plg_error(ctx->ins, "Failed to extract Google federated access token from STS token() response"); - goto error; - } - - google_gen_access_token_conn = flb_upstream_conn_get(ctx->google_iam_upstream); - if (!google_gen_access_token_conn) { - flb_plg_error(ctx->ins, "Google Service Account Credentials API connection setup failed"); - goto error; - } - - google_gen_access_token_url = flb_sds_create_size(256); - google_gen_access_token_url = flb_sds_printf(&google_gen_access_token_url, - FLB_BIGQUERY_GOOGLE_GEN_ACCESS_TOKEN_URL, - ctx->google_service_account); - - google_gen_access_token_body = flb_sds_create(FLB_BIGQUERY_GOOGLE_GEN_ACCESS_TOKEN_REQUEST_BODY); - - google_gen_access_token_c = flb_http_client(google_gen_access_token_conn, FLB_HTTP_POST, google_gen_access_token_url, - google_gen_access_token_body, flb_sds_len(google_gen_access_token_body), - NULL, 0, NULL, 0); - - google_auth_header = flb_sds_create_size(2048 + 7); - google_auth_header = flb_sds_printf(&google_auth_header, "%s%s", - "Bearer ", google_federated_token); - - flb_http_add_header(google_gen_access_token_c, "Authorization", 13, - google_auth_header, flb_sds_len(google_auth_header)); - - flb_http_add_header(google_gen_access_token_c, "Content-Type", 12, - "application/json; charset=utf-8", 31); - - google_gen_access_token_ret = flb_http_do(google_gen_access_token_c, &b_sent_google_gen_access_token); - if (google_gen_access_token_ret != 0) { - flb_plg_error(ctx->ins, "Google Service Account Credentials API generateAccessToken() request http_do=%i", - google_gen_access_token_ret); - goto error; - } - - if (google_gen_access_token_c->resp.status != 200) { - flb_plg_error(ctx->ins, "Google Service Account Credentials API generateAccessToken() response " - "status: %i, payload:\n%s", - google_gen_access_token_c->resp.status, google_gen_access_token_c->resp.payload); - goto error; - } - - ctx->sa_token = flb_json_get_val(google_gen_access_token_c->resp.payload, - google_gen_access_token_c->resp.payload_size, - "accessToken"); - if (!ctx->sa_token) { - flb_plg_error(ctx->ins, "Failed to extract Google OAuth token " - "from Service Account Credentials API generateAccessToken() response"); - goto error; - } - - ctx->sa_token_expiry = time(NULL) + FLB_BIGQUERY_TOKEN_REFRESH; - - flb_sds_destroy(signature); - flb_sds_destroy(sigv4_amz_date); - flb_sds_destroy(sigv4_amz_sec_token); - flb_sds_destroy(aws_gci_url); - flb_sds_destroy(aws_gci_goog_target_resource); - flb_sds_destroy(aws_gci_token); - flb_sds_destroy(aws_gci_token_encoded); - flb_sds_destroy(google_sts_token); - flb_sds_destroy(google_gen_access_token_body); - flb_sds_destroy(google_gen_access_token_url); - flb_sds_destroy(google_federated_token); - flb_sds_destroy(google_auth_header); - - flb_http_client_destroy(aws_sts_c); - flb_http_client_destroy(google_sts_c); - flb_http_client_destroy(google_gen_access_token_c); - - flb_upstream_conn_release(aws_sts_conn); - flb_upstream_conn_release(google_sts_conn); - flb_upstream_conn_release(google_gen_access_token_conn); - - flb_plg_info(ctx->ins, "Retrieved Google service account OAuth token via Identity Federation"); - - return 0; - -error: - flb_sds_destroy(signature); - flb_sds_destroy(sigv4_amz_date); - flb_sds_destroy(sigv4_amz_sec_token); - flb_sds_destroy(aws_gci_url); - flb_sds_destroy(aws_gci_goog_target_resource); - flb_sds_destroy(aws_gci_token); - flb_sds_destroy(aws_gci_token_encoded); - flb_sds_destroy(google_sts_token); - flb_sds_destroy(google_gen_access_token_body); - flb_sds_destroy(google_gen_access_token_url); - flb_sds_destroy(google_federated_token); - flb_sds_destroy(google_auth_header); - - if (aws_sts_c) { - flb_http_client_destroy(aws_sts_c); - } - - if (google_sts_c) { - flb_http_client_destroy(google_sts_c); - } - - if (google_gen_access_token_c) { - flb_http_client_destroy(google_gen_access_token_c); - } - - if (aws_sts_conn) { - flb_upstream_conn_release(aws_sts_conn); - } - - if (google_sts_conn) { - flb_upstream_conn_release(google_sts_conn); - } - - if (google_gen_access_token_conn) { - flb_upstream_conn_release(google_gen_access_token_conn); - } - - return -1; -} - -static int flb_bigquery_google_token_expired(time_t expiry) -{ - time_t now; - - now = time(NULL); - if (expiry <= now) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -static flb_sds_t get_google_service_account_token(struct flb_bigquery *ctx) { - int ret = 0; - flb_sds_t output; - flb_plg_trace(ctx->ins, "Getting Google service account token"); - - if (!ctx->sa_token) { - flb_plg_trace(ctx->ins, "Acquiring new token"); - ret = bigquery_exchange_aws_creds_for_google_oauth(ctx); - } - else if (flb_bigquery_google_token_expired(ctx->sa_token_expiry) == FLB_TRUE) { - flb_plg_trace(ctx->ins, "Replacing expired token"); - ret = bigquery_exchange_aws_creds_for_google_oauth(ctx); - } - - if (ret != 0) { - return NULL; - } - - output = flb_sds_create_size(2048 + 7); - output = flb_sds_printf(&output, "%s%s", "Bearer ", ctx->sa_token); - return output; -} - -static flb_sds_t get_google_token(struct flb_bigquery *ctx) -{ - int ret = 0; - flb_sds_t output = NULL; - - if (pthread_mutex_lock(&ctx->token_mutex)){ - flb_plg_error(ctx->ins, "error locking mutex"); - return NULL; - } - - if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - ret = bigquery_get_oauth2_token(ctx); - } - - /* Copy string to prevent race conditions (get_oauth2 can free the string) */ - if (ret == 0) { - output = flb_sds_create(ctx->o->token_type); - flb_sds_printf(&output, " %s", ctx->o->access_token); - } - - if (pthread_mutex_unlock(&ctx->token_mutex)){ - flb_plg_error(ctx->ins, "error unlocking mutex"); - if (output) { - flb_sds_destroy(output); - } - return NULL; - } - - return output; -} - -static int cb_bigquery_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - char *token; - int io_flags = FLB_IO_TLS; - struct flb_bigquery *ctx; - - /* Create config context */ - ctx = flb_bigquery_conf_create(ins, config); - if (!ctx) { - flb_plg_error(ins, "configuration failed"); - return -1; - } - - flb_output_set_context(ins, ctx); - - /* Network mode IPv6 */ - if (ins->host.ipv6 == FLB_TRUE) { - io_flags |= FLB_IO_IPV6; - } - - /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ - pthread_mutex_init(&ctx->token_mutex, NULL); - - /* - * Create upstream context for BigQuery Streaming Inserts - * (no oauth2 service) - */ - ctx->u = flb_upstream_create_url(config, FLB_BIGQUERY_URL_BASE, - io_flags, ins->tls); - if (!ctx->u) { - flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; - } - - if (ctx->has_identity_federation) { - /* Configure AWS IMDS */ - ctx->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - - if (!ctx->aws_tls) { - flb_plg_error(ctx->ins, "Failed to create TLS context"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - ctx->aws_provider = flb_standard_chain_provider_create(config, - ctx->aws_tls, - NULL, - NULL, - NULL, - flb_aws_client_generator(), - NULL); - - if (!ctx->aws_provider) { - flb_plg_error(ctx->ins, "Failed to create AWS Credential Provider"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - /* initialize credentials in sync mode */ - ctx->aws_provider->provider_vtable->sync(ctx->aws_provider); - ctx->aws_provider->provider_vtable->init(ctx->aws_provider); - - /* set back to async */ - ctx->aws_provider->provider_vtable->async(ctx->aws_provider); - ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins); - - /* Configure AWS STS */ - ctx->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - - if (!ctx->aws_sts_tls) { - flb_plg_error(ctx->ins, "Failed to create TLS context"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - ctx->aws_sts_upstream = flb_upstream_create(config, - ctx->aws_sts_endpoint, - 443, - io_flags, - ctx->aws_sts_tls); - - if (!ctx->aws_sts_upstream) { - flb_plg_error(ctx->ins, "AWS STS upstream creation failed"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - ctx->aws_sts_upstream->base.net.keepalive = FLB_FALSE; - - /* Configure Google STS */ - ctx->google_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - - if (!ctx->google_sts_tls) { - flb_plg_error(ctx->ins, "Failed to create TLS context"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - ctx->google_sts_upstream = flb_upstream_create_url(config, - FLB_BIGQUERY_GOOGLE_STS_URL, - io_flags, - ctx->google_sts_tls); - - if (!ctx->google_sts_upstream) { - flb_plg_error(ctx->ins, "Google STS upstream creation failed"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - /* Configure Google IAM */ - ctx->google_iam_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - - if (!ctx->google_iam_tls) { - flb_plg_error(ctx->ins, "Failed to create TLS context"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - ctx->google_iam_upstream = flb_upstream_create_url(config, - FLB_BIGQUERY_GOOGLE_IAM_URL, - io_flags, - ctx->google_iam_tls); - - if (!ctx->google_iam_upstream) { - flb_plg_error(ctx->ins, "Google IAM upstream creation failed"); - flb_bigquery_conf_destroy(ctx); - return -1; - } - - /* Remove async flag from upstream */ - flb_stream_disable_async_mode(&ctx->aws_sts_upstream->base); - flb_stream_disable_async_mode(&ctx->google_sts_upstream->base); - flb_stream_disable_async_mode(&ctx->google_iam_upstream->base); - } - - /* Create oauth2 context */ - ctx->o = flb_oauth2_create(ctx->config, FLB_BIGQUERY_AUTH_URL, 3000); - if (!ctx->o) { - flb_plg_error(ctx->ins, "cannot create oauth2 context"); - return -1; - } - flb_output_upstream_set(ctx->u, ins); - - /* Get or renew the OAuth2 token */ - if (ctx->has_identity_federation) { - token = get_google_service_account_token(ctx); - } - else { - token = get_google_token(ctx); - } - - if (!token) { - flb_plg_warn(ctx->ins, "token retrieval failed"); - } - else { - flb_sds_destroy(token); - } - - return 0; -} - -static int bigquery_format(const void *data, size_t bytes, - const char *tag, size_t tag_len, - char **out_data, size_t *out_size, - struct flb_bigquery *ctx) -{ - int array_size = 0; - flb_sds_t out_buf; - msgpack_sbuffer mp_sbuf; - msgpack_packer mp_pck; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - int ret; - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - array_size = flb_mp_count(data, bytes); - - /* Create temporary msgpack buffer */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - /* - * Pack root map (kind & rows): - * - * { - * "kind": "bigquery#tableDataInsertAllRequest", - * "skipInvalidRows": boolean, - * "ignoreUnknownValues": boolean, - * "rows": [] - * } - */ - msgpack_pack_map(&mp_pck, 4); - - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "kind", 4); - - msgpack_pack_str(&mp_pck, 34); - msgpack_pack_str_body(&mp_pck, "bigquery#tableDataInsertAllRequest", 34); - - msgpack_pack_str(&mp_pck, 15); - msgpack_pack_str_body(&mp_pck, "skipInvalidRows", 15); - - if (ctx->skip_invalid_rows) { - msgpack_pack_true(&mp_pck); - } - else { - msgpack_pack_false(&mp_pck); - } - - msgpack_pack_str(&mp_pck, 19); - msgpack_pack_str_body(&mp_pck, "ignoreUnknownValues", 19); - - if (ctx->ignore_unknown_values) { - msgpack_pack_true(&mp_pck); - } - else { - msgpack_pack_false(&mp_pck); - } - - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "rows", 4); - - /* Append entries */ - msgpack_pack_array(&mp_pck, array_size); - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - /* - * Pack entry - * - * { - * "json": {...} - * } - * - * For now, we don't support the insertId that's required for duplicate detection. - */ - msgpack_pack_map(&mp_pck, 1); - - /* json */ - msgpack_pack_str(&mp_pck, 4); - msgpack_pack_str_body(&mp_pck, "json", 4); - msgpack_pack_object(&mp_pck, *log_event.body); - } - - /* Convert from msgpack to JSON */ - out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - - flb_log_event_decoder_destroy(&log_decoder); - msgpack_sbuffer_destroy(&mp_sbuf); - - if (!out_buf) { - flb_plg_error(ctx->ins, "error formatting JSON payload"); - return -1; - } - - *out_data = out_buf; - *out_size = flb_sds_len(out_buf); - - return 0; -} - -static void cb_bigquery_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - (void) i_ins; - (void) config; - int ret; - int ret_code = FLB_RETRY; - size_t b_sent; - flb_sds_t token; - flb_sds_t payload_buf; - size_t payload_size; - struct flb_bigquery *ctx = out_context; - struct flb_connection *u_conn; - struct flb_http_client *c; - - flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Get or renew Token */ - if (ctx->has_identity_federation) { - token = get_google_service_account_token(ctx); - } - else { - token = get_google_token(ctx); - } - - if (!token) { - flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); - flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Reformat msgpack to bigquery JSON payload */ - ret = bigquery_format(event_chunk->data, event_chunk->size, - event_chunk->tag, flb_sds_len(event_chunk->tag), - &payload_buf, &payload_size, ctx); - if (ret != 0) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(token); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, - payload_buf, payload_size, NULL, 0, NULL, 0); - if (!c) { - flb_plg_error(ctx->ins, "cannot create HTTP client context"); - flb_upstream_conn_release(u_conn); - flb_sds_destroy(token); - flb_sds_destroy(payload_buf); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - flb_http_buffer_size(c, 4192); - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/json", 16); - - /* Compose and append Authorization header */ - flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - - /* validate response */ - if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i", ret); - ret_code = FLB_RETRY; - } - else { - /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); - if (c->resp.status == 200) { - ret_code = FLB_OK; - } - else { - if (c->resp.payload && c->resp.payload_size > 0) { - /* we got an error */ - flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); - } - ret_code = FLB_RETRY; - } - } - - /* Cleanup */ - flb_sds_destroy(payload_buf); - flb_sds_destroy(token); - flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); - - /* Done */ - FLB_OUTPUT_RETURN(ret_code); -} - -static int cb_bigquery_exit(void *data, struct flb_config *config) -{ - struct flb_bigquery *ctx = data; - - if (!ctx) { - return -1; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - flb_bigquery_conf_destroy(ctx); - return 0; -} - -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, credentials_file), - "Set the path for the google service credentials file" - }, - { - FLB_CONFIG_MAP_BOOL, "enable_identity_federation", "false", - 0, FLB_TRUE, offsetof(struct flb_bigquery, has_identity_federation), - "Enable identity federation" - }, - { - FLB_CONFIG_MAP_STR, "aws_region", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, aws_region), - "Enable identity federation" - }, - { - FLB_CONFIG_MAP_STR, "project_number", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, project_number), - "Set project number" - }, - { - FLB_CONFIG_MAP_STR, "pool_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, pool_id), - "Set the pool id" - }, - { - FLB_CONFIG_MAP_STR, "provider_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, provider_id), - "Set the provider id" - }, - { - FLB_CONFIG_MAP_STR, "google_service_account", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, google_service_account), - "Set the google service account" - }, - // set in flb_bigquery_oauth_credentials - { - FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL, - 0, FLB_FALSE, 0, - "Set the service account email" - }, - // set in flb_bigquery_oauth_credentials - { - FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL, - 0, FLB_FALSE, 0, - "Set the service account secret" - }, - { - FLB_CONFIG_MAP_STR, "project_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, project_id), - "Set the project id" - }, - { - FLB_CONFIG_MAP_STR, "dataset_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, dataset_id), - "Set the dataset id" - }, - { - FLB_CONFIG_MAP_STR, "table_id", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_bigquery, table_id), - "Set the table id" - }, - { - FLB_CONFIG_MAP_BOOL, "skip_invalid_rows", "false", - 0, FLB_TRUE, offsetof(struct flb_bigquery, skip_invalid_rows), - "Enable skipping of invalid rows", - }, - { - FLB_CONFIG_MAP_BOOL, "ignore_unknown_values", "false", - 0, FLB_TRUE, offsetof(struct flb_bigquery, ignore_unknown_values), - "Enable ignoring unknown value", - }, - /* EOF */ - {0} -}; - -struct flb_output_plugin out_bigquery_plugin = { - .name = "bigquery", - .description = "Send events to BigQuery via streaming insert", - .cb_init = cb_bigquery_init, - .cb_flush = cb_bigquery_flush, - .cb_exit = cb_bigquery_exit, - .config_map = config_map, - /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; |