summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_skywalking/skywalking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_skywalking/skywalking.c')
-rw-r--r--src/fluent-bit/plugins/out_skywalking/skywalking.c427
1 files changed, 427 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_skywalking/skywalking.c b/src/fluent-bit/plugins/out_skywalking/skywalking.c
new file mode 100644
index 000000000..c5a9a1e2d
--- /dev/null
+++ b/src/fluent-bit/plugins/out_skywalking/skywalking.c
@@ -0,0 +1,427 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+
+#include "skywalking.h"
+
+#define DEFAULT_SW_OAP_HOST "127.0.0.1"
+#define DEFAULT_SW_OAP_PORT 12800
+#define DEFAULT_SW_SVC_NAME "sw-service"
+#define DEFAULT_SW_INS_NAME "fluent-bit"
+#define DEFAULT_SW_LOG_PATH "/v3/logs"
+
+static void sw_output_ctx_destroy(struct flb_output_sw* ctx) {
+ if (!ctx) {
+ return;
+ }
+
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ }
+
+ flb_sds_destroy(ctx->http_scheme);
+ flb_sds_destroy(ctx->uri);
+ flb_free(ctx);
+}
+
+static int cb_sw_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ int io_flags;
+ struct flb_output_sw *ctx;
+
+ /* Allocate plugin context */
+ ctx = flb_calloc(1, sizeof(struct flb_output_sw));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+
+ ctx->ins = ins;
+
+ ret = flb_output_config_map_set(ins, (void *)ctx);
+ if (ret == -1) {
+ sw_output_ctx_destroy(ctx);
+ return -1;
+ }
+
+ flb_output_net_default(DEFAULT_SW_OAP_HOST, DEFAULT_SW_OAP_PORT, ctx->ins);
+
+ ctx->uri = flb_sds_create(DEFAULT_SW_LOG_PATH);
+ if (!ctx->uri) {
+ flb_plg_error(ctx->ins, "failed to configure endpoint");
+ sw_output_ctx_destroy(ctx);
+ return -1;
+ }
+
+ if (!ctx->svc_name) {
+ flb_plg_error(ctx->ins, "failed to configure service name");
+ sw_output_ctx_destroy(ctx);
+ return -1;
+ }
+
+ if (!ctx->svc_inst_name) {
+ flb_plg_error(ctx->ins, "failed to configure instance name");
+ sw_output_ctx_destroy(ctx);
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "configured %s/%s", ctx->svc_name, ctx->svc_inst_name);
+ flb_plg_debug(ctx->ins, "OAP address is %s:%d", ins->host.name, ins->host.port);
+
+ /* scheme configuration */
+ if (ins->use_tls == FLB_TRUE) {
+ io_flags = FLB_IO_TLS;
+ ctx->http_scheme = flb_sds_create("https://");
+ }
+ else {
+ io_flags = FLB_IO_TCP;
+ ctx->http_scheme = flb_sds_create("http://");
+ }
+
+ /* configure upstream instance */
+ ctx->u = flb_upstream_create(config, ins->host.name, ins->host.port, io_flags, ins->tls);
+ if (!ctx->u) {
+ flb_plg_error(ctx->ins, "failed to create upstream context");
+ sw_output_ctx_destroy(ctx);
+ return -1;
+ }
+
+ flb_output_upstream_set(ctx->u, ins);
+
+ /* Set the plugin context */
+ flb_output_set_context(ins, ctx);
+ flb_output_set_http_debug_callbacks(ins);
+
+ return 0;
+}
+
+static int64_t timestamp_format(const struct flb_time* tms)
+{
+ int64_t timestamp = 0;
+
+ /* Format the time, use milliseconds precision not nanoseconds */
+ timestamp = tms->tm.tv_sec * 1000;
+ timestamp += tms->tm.tv_nsec / 1000000;
+
+ /* round up if necessary */
+ if (tms->tm.tv_nsec % 1000000 >= 500000) {
+ ++timestamp;
+ }
+ return timestamp;
+}
+
+static void sw_msgpack_pack_kv_str(msgpack_packer* pk, const char* key,
+ size_t key_len, const char *value,
+ size_t value_len)
+{
+ msgpack_pack_str(pk, key_len);
+ msgpack_pack_str_body(pk, key, key_len);
+ msgpack_pack_str(pk, value_len);
+ msgpack_pack_str_body(pk, value, value_len);
+}
+
+static void sw_msgpack_pack_kv_int64_t(msgpack_packer* pk, const char* key,
+ size_t key_len, int64_t value)
+{
+ msgpack_pack_str(pk, key_len);
+ msgpack_pack_str_body(pk, key, key_len);
+ msgpack_pack_int64(pk, value);
+}
+
+static void sw_msgpack_pack_log_body(msgpack_packer* pk,
+ msgpack_object* obj, size_t obj_size)
+{
+ int i, j = 0;
+ int log_entry_num = 0;
+ msgpack_sbuffer sbuf;
+ msgpack_packer body_pk;
+ msgpack_object key;
+ msgpack_object value;
+ flb_sds_t out_body_str;
+ size_t out_body_str_len;
+ int* valid_log_entry = NULL;
+
+ valid_log_entry = (int*)flb_malloc(obj_size * sizeof(int));
+ if (!valid_log_entry) {
+ flb_errno();
+ return;
+ }
+
+ msgpack_sbuffer_init(&sbuf);
+ msgpack_packer_init(&body_pk, &sbuf, msgpack_sbuffer_write);
+
+ for (i = 0; i < obj_size; ++i) {
+ key = obj->via.map.ptr[i].key;
+ value = obj->via.map.ptr[i].val;
+
+ if (key.type != MSGPACK_OBJECT_STR ||
+ value.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ valid_log_entry[j] = i;
+ ++j;
+ ++log_entry_num;
+ }
+
+ msgpack_pack_map(&body_pk, log_entry_num);
+
+ for (i = 0; i < log_entry_num; ++i) {
+ key = obj->via.map.ptr[valid_log_entry[i]].key;
+ value = obj->via.map.ptr[valid_log_entry[i]].val;
+ sw_msgpack_pack_kv_str(&body_pk, key.via.str.ptr, key.via.str.size,
+ value.via.str.ptr, value.via.str.size);
+ }
+
+ out_body_str = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size);
+ if (!out_body_str) {
+ msgpack_sbuffer_destroy(&sbuf);
+ flb_free(valid_log_entry);
+ return;
+ }
+ out_body_str_len = flb_sds_len(out_body_str);
+
+ msgpack_pack_str(pk, 4);
+ msgpack_pack_str_body(pk, "body", 4);
+ msgpack_pack_map(pk, 1);
+
+ /* body['json'] */
+ msgpack_pack_str(pk, 4);
+ msgpack_pack_str_body(pk, "json", 4);
+ msgpack_pack_map(pk, 1);
+
+ /* body['json']['json'] */
+ msgpack_pack_str(pk, 4);
+ msgpack_pack_str_body(pk, "json", 4);
+ msgpack_pack_str(pk, out_body_str_len);
+ msgpack_pack_str_body(pk, out_body_str, out_body_str_len);
+
+ flb_sds_destroy(out_body_str);
+ msgpack_sbuffer_destroy(&sbuf);
+ flb_free(valid_log_entry);
+}
+
+static int sw_format(struct flb_output_sw* ctx, const void *data, size_t bytes,
+ void** buf, size_t* buf_len)
+{
+ int ret = 0;
+ int chunk_size = 0;
+ uint32_t map_size;
+ msgpack_sbuffer sbuf;
+ msgpack_packer pk;
+ msgpack_object map;
+ int64_t timestamp;
+ flb_sds_t out_str;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ msgpack_sbuffer_init(&sbuf);
+ msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
+
+ chunk_size = flb_mp_count(data, bytes);
+ flb_plg_debug(ctx->ins, "%i messages flushed", chunk_size);
+
+ msgpack_pack_array(&pk, chunk_size);
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ timestamp = timestamp_format(&log_event.timestamp);
+
+ map = *log_event.body;
+ map_size = map.via.map.size;
+
+ msgpack_pack_map(&pk, 4);
+
+ sw_msgpack_pack_kv_int64_t(&pk, "timestamp", 9, timestamp);
+ sw_msgpack_pack_kv_str(&pk, "service", 7, ctx->svc_name,
+ flb_sds_len(ctx->svc_name));
+ sw_msgpack_pack_kv_str(&pk, "serviceInstance", 15,
+ ctx->svc_inst_name, flb_sds_len(ctx->svc_inst_name));
+ sw_msgpack_pack_log_body(&pk, &map, map_size);
+ }
+
+ out_str = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size);
+ if (!out_str) {
+ ret = -1;
+ goto done;
+ }
+ else {
+ ret = 0;
+ }
+
+ *buf = out_str;
+ *buf_len = flb_sds_len(out_str);
+
+done:
+ msgpack_sbuffer_destroy(&sbuf);
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return ret;
+}
+
+static int mock_oap_request(struct flb_http_client* client, int mock_status)
+{
+ client->resp.status = mock_status;
+ return 0;
+}
+
+static bool check_sw_under_test()
+{
+ if (getenv("FLB_SW_PLUGIN_UNDER_TEST") != NULL) {
+ return FLB_TRUE;
+ }
+ return FLB_FALSE;
+}
+
+static void cb_sw_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context, struct flb_config *config)
+{
+ int flush_ret = -1;
+ int tmp_ret = -1;
+ struct flb_output_sw *ctx = out_context;
+ struct flb_connection *conn = NULL;
+ struct flb_http_client *client = NULL;
+ void* buf = NULL;
+ size_t buf_len;
+ size_t sent_size;
+
+ tmp_ret = sw_format(ctx,
+ event_chunk->data,
+ event_chunk->size,
+ &buf, &buf_len);
+ if (tmp_ret != 0) {
+ flb_plg_error(ctx->ins, "failed to create buffer");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ conn = flb_upstream_conn_get(ctx->u);
+ if (!conn) {
+ flb_plg_error(ctx->ins, "failed to establish connection to %s:%i",
+ ctx->ins->host.name, ctx->ins->host.port);
+ flb_sds_destroy(buf);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ client = flb_http_client(conn, FLB_HTTP_POST, ctx->uri,
+ (const char*)buf, buf_len, ctx->ins->host.name, ctx->ins->host.port,
+ NULL, 0);
+ if (!client) {
+ flb_plg_error(ctx->ins, "failed to create HTTP client");
+ flb_sds_destroy(buf);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ if (ctx->auth_token && flb_sds_len(ctx->auth_token) != 0) {
+ flb_http_add_header(client, "Authentication", 14,
+ ctx->auth_token, strlen(ctx->auth_token));
+ }
+
+ flb_http_add_header(client, "Content-Type", 12,
+ "application/json", 16);
+ flb_http_add_header(client, "User-Agent", 10,
+ "Fluent-Bit", 10);
+
+ if (check_sw_under_test() == FLB_TRUE) {
+ tmp_ret = mock_oap_request(client, 200);
+ }
+ else {
+ tmp_ret = flb_http_do(client, &sent_size);
+ }
+
+ if (tmp_ret == 0) {
+ flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i", ctx->ins->host.name,
+ ctx->ins->host.port, client->resp.status);
+
+ if (client->resp.status < 200 || client->resp.status > 205) {
+ flush_ret = FLB_RETRY;
+ }
+ else {
+ flush_ret = FLB_OK;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "failed to flush buffer to %s:%i",
+ ctx->ins->host.name, ctx->ins->host.port);
+ flush_ret = FLB_RETRY;
+ }
+
+ flb_sds_destroy(buf);
+ flb_http_client_destroy(client);
+ flb_upstream_conn_release(conn);
+
+ FLB_OUTPUT_RETURN(flush_ret);
+}
+
+static int cb_sw_exit(void *data, struct flb_config *config)
+{
+ struct flb_output_sw *ctx;
+
+ ctx = (struct flb_output_sw*)data;
+ sw_output_ctx_destroy(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "auth_token", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_output_sw, auth_token),
+ "Auth token for SkyWalking OAP"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "svc_name", DEFAULT_SW_SVC_NAME,
+ 0, FLB_TRUE, offsetof(struct flb_output_sw, svc_name),
+ "Service name"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "svc_inst_name", DEFAULT_SW_INS_NAME,
+ 0, FLB_TRUE, offsetof(struct flb_output_sw, svc_inst_name),
+ "Instance name"
+ },
+ {0}
+};
+
+struct flb_output_plugin out_skywalking_plugin = {
+ .name = "skywalking",
+ .description = "Send logs into log collector on SkyWalking OAP",
+ .cb_init = cb_sw_init,
+ .cb_flush = cb_sw_flush,
+ .cb_exit = cb_sw_exit,
+ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
+ .config_map = config_map
+};