summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_forward
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/fluent-bit/plugins/out_forward
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/plugins/out_forward')
-rw-r--r--src/fluent-bit/plugins/out_forward/CMakeLists.txt6
-rw-r--r--src/fluent-bit/plugins/out_forward/README.md12
-rw-r--r--src/fluent-bit/plugins/out_forward/forward.c1832
-rw-r--r--src/fluent-bit/plugins/out_forward/forward.h146
-rw-r--r--src/fluent-bit/plugins/out_forward/forward_format.c640
-rw-r--r--src/fluent-bit/plugins/out_forward/forward_format.h48
6 files changed, 2684 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_forward/CMakeLists.txt b/src/fluent-bit/plugins/out_forward/CMakeLists.txt
new file mode 100644
index 000000000..fd639c2eb
--- /dev/null
+++ b/src/fluent-bit/plugins/out_forward/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(src
+ forward.c
+ forward_format.c
+ )
+
+FLB_PLUGIN(out_forward "${src}" "")
diff --git a/src/fluent-bit/plugins/out_forward/README.md b/src/fluent-bit/plugins/out_forward/README.md
new file mode 100644
index 000000000..f08b45619
--- /dev/null
+++ b/src/fluent-bit/plugins/out_forward/README.md
@@ -0,0 +1,12 @@
+# Fluentd Forward Protocol Implementation
+
+This plugin is based in Fluentd Forward Protocol Spec v1 available here:
+
+- https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1
+
+The following Event modes are implemented:
+
+- Message Mode
+- Forward Mode
+
+Depending of the configuration, the plugin will decide to go with Message Mode or Forward Mode.
diff --git a/src/fluent-bit/plugins/out_forward/forward.c b/src/fluent-bit/plugins/out_forward/forward.c
new file mode 100644
index 000000000..8cc2ca2cc
--- /dev/null
+++ b/src/fluent-bit/plugins/out_forward/forward.c
@@ -0,0 +1,1832 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_upstream_ha.h>
+#include <fluent-bit/flb_hash.h>
+#include <fluent-bit/flb_crypto.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_random.h>
+#include <fluent-bit/flb_gzip.h>
+#include <fluent-bit/flb_log_event.h>
+#include <msgpack.h>
+
+#include "forward.h"
+#include "forward_format.h"
+
+#ifdef FLB_HAVE_UNIX_SOCKET
+#include <sys/socket.h>
+#include <sys/un.h>
+#endif
+
+#define SECURED_BY "Fluent Bit"
+
+pthread_once_t uds_connection_tls_slot_init_once_control = PTHREAD_ONCE_INIT;
+FLB_TLS_DEFINE(struct flb_forward_uds_connection, uds_connection);
+
+void initialize_uds_connection_tls_slot()
+{
+ FLB_TLS_INIT(uds_connection);
+}
+
+#ifdef FLB_HAVE_UNIX_SOCKET
+static flb_sockfd_t forward_unix_connect(struct flb_forward_config *config,
+ struct flb_forward *ctx)
+{
+ flb_sockfd_t fd = -1;
+ struct sockaddr_un address;
+
+ if (sizeof(address.sun_path) <= flb_sds_len(config->unix_path)) {
+ flb_plg_error(ctx->ins, "unix_path is too long");
+ return -1;
+ }
+
+ memset(&address, 0, sizeof(struct sockaddr_un));
+
+ fd = flb_net_socket_create(AF_UNIX, FLB_FALSE);
+ if (fd < 0) {
+ flb_plg_error(ctx->ins, "flb_net_socket_create error");
+ return -1;
+ }
+
+ address.sun_family = AF_UNIX;
+ strncpy(address.sun_path, config->unix_path, flb_sds_len(config->unix_path));
+
+ if(connect(fd, (const struct sockaddr*) &address, sizeof(address)) < 0) {
+ flb_errno();
+ close(fd);
+
+ return -1;
+ }
+
+ return fd;
+}
+
+static flb_sockfd_t forward_uds_get_conn(struct flb_forward_config *config,
+ struct flb_forward *ctx)
+{
+ struct flb_forward_uds_connection *connection_entry;
+ flb_sockfd_t connection;
+
+ connection_entry = FLB_TLS_GET(uds_connection);
+
+ /* We need to allow the code to try to get the value from the TLS
+ * regardless of if it's provided with a config and context because
+ * when we establish the connection we do have both of them but those
+ * are not passed along to the functions in charge of doing IO.
+ */
+
+ if (connection_entry == NULL) {
+ if (config == NULL ||
+ ctx == NULL) {
+ return -1;
+ }
+
+ connection_entry = flb_calloc(1, sizeof(struct flb_forward_uds_connection));
+
+ if (connection_entry == NULL) {
+ flb_errno();
+
+ return -1;
+ }
+
+ connection = forward_unix_connect(config, ctx);
+
+ if (connection == -1) {
+ flb_free(connection_entry);
+
+ return -1;
+ }
+
+ connection_entry->descriptor = connection;
+
+ pthread_mutex_lock(&ctx->uds_connection_list_mutex);
+
+ cfl_list_add(&connection_entry->_head, &ctx->uds_connection_list);
+
+ pthread_mutex_unlock(&ctx->uds_connection_list_mutex);
+
+ FLB_TLS_SET(uds_connection, connection_entry);
+ }
+
+ return connection_entry->descriptor;
+}
+
+static void forward_uds_drop_conn(struct flb_forward *ctx,
+ flb_sockfd_t connection)
+{
+ struct flb_forward_uds_connection *connection_entry;
+
+ if (ctx != NULL) {
+ connection_entry = FLB_TLS_GET(uds_connection);
+
+ if (connection_entry != NULL) {
+ pthread_mutex_lock(&ctx->uds_connection_list_mutex);
+
+ if (connection == connection_entry->descriptor) {
+ close(connection);
+
+ if (!cfl_list_entry_is_orphan(&connection_entry->_head)) {
+ cfl_list_del(&connection_entry->_head);
+ }
+
+ free(connection_entry);
+
+ FLB_TLS_SET(uds_connection, NULL);
+ }
+
+ pthread_mutex_unlock(&ctx->uds_connection_list_mutex);
+ }
+ }
+}
+
+static void forward_uds_drop_all(struct flb_forward *ctx)
+{
+ struct flb_forward_uds_connection *connection_entry;
+ struct cfl_list *head;
+ struct cfl_list *tmp;
+
+ if (ctx != NULL) {
+ pthread_mutex_lock(&ctx->uds_connection_list_mutex);
+
+ cfl_list_foreach_safe(head, tmp, &ctx->uds_connection_list) {
+ connection_entry = cfl_list_entry(head,
+ struct flb_forward_uds_connection,
+ _head);
+
+ if (connection_entry->descriptor != -1) {
+ close(connection_entry->descriptor);
+
+ connection_entry->descriptor = -1;
+ }
+
+ if (!cfl_list_entry_is_orphan(&connection_entry->_head)) {
+ cfl_list_del(&connection_entry->_head);
+ }
+
+ free(connection_entry);
+ }
+
+ pthread_mutex_unlock(&ctx->uds_connection_list_mutex);
+ }
+}
+
+/* In these functions forward_uds_get_conn
+ * should not return -1 because it should have been
+ * called earlier with a proper context and it should
+ * have saved a file descriptor to the TLS.
+ */
+
+static int io_unix_write(struct flb_connection *unused, int deprecated_fd, const void* data,
+ size_t len, size_t *out_len)
+{
+ flb_sockfd_t uds_conn;
+
+ uds_conn = forward_uds_get_conn(NULL, NULL);
+
+ return flb_io_fd_write(uds_conn, data, len, out_len);
+}
+
+static int io_unix_read(struct flb_connection *unused, int deprecated_fd, void* buf,size_t len)
+{
+ flb_sockfd_t uds_conn;
+
+ uds_conn = forward_uds_get_conn(NULL, NULL);
+
+ return flb_io_fd_read(uds_conn, buf, len);
+}
+
+#else
+
+static flb_sockfd_t forward_uds_get_conn(struct flb_forward_config *config,
+ struct flb_forward *ctx)
+{
+ (void) config;
+ (void) ctx;
+
+ return -1;
+}
+
+static void forward_uds_drop_conn(struct flb_forward *ctx,
+ flb_sockfd_t connection)
+{
+ (void) ctx;
+ (void) connection;
+}
+
+static void forward_uds_drop_all(struct flb_forward *ctx)
+{
+ (void) ctx;
+}
+
+#endif
+
+#ifdef FLB_HAVE_TLS
+
+static int io_net_write(struct flb_connection *conn, int unused_fd,
+ const void* data, size_t len, size_t *out_len)
+{
+ return flb_io_net_write(conn, data, len, out_len);
+}
+
+static int io_net_read(struct flb_connection *conn, int unused_fd,
+ void* buf, size_t len)
+{
+ return flb_io_net_read(conn, buf, len);
+}
+
+static int secure_forward_init(struct flb_forward *ctx,
+ struct flb_forward_config *fc)
+{
+ return 0;
+}
+
+#endif
+
+static inline void print_msgpack_status(struct flb_forward *ctx,
+ int ret, char *context)
+{
+ switch (ret) {
+ case MSGPACK_UNPACK_EXTRA_BYTES:
+ flb_plg_error(ctx->ins, "%s MSGPACK_UNPACK_EXTRA_BYTES", context);
+ break;
+ case MSGPACK_UNPACK_CONTINUE:
+ flb_plg_trace(ctx->ins, "%s MSGPACK_UNPACK_CONTINUE", context);
+ break;
+ case MSGPACK_UNPACK_PARSE_ERROR:
+ flb_plg_error(ctx->ins, "%s MSGPACK_UNPACK_PARSE_ERROR", context);
+ break;
+ case MSGPACK_UNPACK_NOMEM_ERROR:
+ flb_plg_error(ctx->ins, "%s MSGPACK_UNPACK_NOMEM_ERROR", context);
+ break;
+ }
+}
+
+/* Read a secure forward msgpack message */
+static int secure_forward_read(struct flb_forward *ctx,
+ struct flb_connection *u_conn,
+ struct flb_forward_config *fc,
+ char *buf, size_t size, size_t *out_len)
+{
+ int ret;
+ size_t off;
+ size_t avail;
+ size_t buf_off = 0;
+ msgpack_unpacked result;
+
+ msgpack_unpacked_init(&result);
+ while (1) {
+ avail = size - buf_off;
+ if (avail < 1) {
+ goto error;
+ }
+
+ /* Read the message */
+ ret = fc->io_read(u_conn, fc->unix_fd, buf + buf_off, size - buf_off);
+ if (ret <= 0) {
+ goto error;
+ }
+ buf_off += ret;
+
+ /* Validate */
+ off = 0;
+ ret = msgpack_unpack_next(&result, buf, buf_off, &off);
+ switch (ret) {
+ case MSGPACK_UNPACK_SUCCESS:
+ msgpack_unpacked_destroy(&result);
+ *out_len = buf_off;
+ return 0;
+ default:
+ print_msgpack_status(ctx, ret, "handshake");
+ goto error;
+ };
+ }
+
+ error:
+ msgpack_unpacked_destroy(&result);
+ return -1;
+}
+
+static void secure_forward_set_ping(struct flb_forward_ping *ping,
+ msgpack_object *map)
+{
+ int i;
+ msgpack_object key;
+ msgpack_object val;
+ const char *ptr;
+ int len;
+
+ memset(ping, 0, sizeof(struct flb_forward_ping));
+ ping->keepalive = 1; /* default, as per spec */
+
+ for (i = 0; i < map->via.map.size; i++) {
+ key = map->via.map.ptr[i].key;
+ val = map->via.map.ptr[i].val;
+
+ ptr = key.via.str.ptr;
+ len = key.via.str.size;
+
+ if (len == 5 && memcmp(ptr, "nonce", len) == 0) {
+ ping->nonce = val.via.bin.ptr;
+ ping->nonce_len = val.via.bin.size;
+ }
+ else if (len == 4 && memcmp(ptr, "auth", len) == 0) {
+ ping->auth = val.via.bin.ptr;
+ ping->auth_len = val.via.bin.size;
+ }
+ else if (len == 9 && memcmp(ptr, "keepalive", len) == 0) {
+ ping->keepalive = val.via.boolean;
+ }
+ }
+}
+
+static int secure_forward_hash_shared_key(struct flb_forward_config *fc,
+ struct flb_forward_ping *ping,
+ char *buf, int buflen)
+{
+ size_t length_entries[4];
+ unsigned char *data_entries[4];
+ uint8_t hash[64];
+ int result;
+
+ if (buflen < 128) {
+ return -1;
+ }
+
+ data_entries[0] = (unsigned char *) fc->shared_key_salt;
+ length_entries[0] = 16;
+
+ data_entries[1] = (unsigned char *) fc->self_hostname;
+ length_entries[1] = strlen(fc->self_hostname);
+
+ data_entries[2] = (unsigned char *) ping->nonce;
+ length_entries[2] = ping->nonce_len;
+
+ data_entries[3] = (unsigned char *) fc->shared_key;
+ length_entries[3] = strlen(fc->shared_key);
+
+ result = flb_hash_simple_batch(FLB_HASH_SHA512,
+ 4,
+ data_entries,
+ length_entries,
+ hash,
+ sizeof(hash));
+
+ if (result != FLB_CRYPTO_SUCCESS) {
+ return -1;
+ }
+
+ flb_forward_format_bin_to_hex(hash, 64, buf);
+
+ return 0;
+}
+
+static int secure_forward_hash_password(struct flb_forward_config *fc,
+ struct flb_forward_ping *ping,
+ char *buf, int buflen)
+{
+ size_t length_entries[3];
+ unsigned char *data_entries[3];
+ uint8_t hash[64];
+ int result;
+
+ if (buflen < 128) {
+ return -1;
+ }
+
+ data_entries[0] = (unsigned char *) ping->auth;
+ length_entries[0] = ping->auth_len;
+
+ data_entries[1] = (unsigned char *) fc->username;
+ length_entries[1] = strlen(fc->username);
+
+ data_entries[2] = (unsigned char *) fc->password;
+ length_entries[2] = strlen(fc->password);
+
+ result = flb_hash_simple_batch(FLB_HASH_SHA512,
+ 3,
+ data_entries,
+ length_entries,
+ hash,
+ sizeof(hash));
+
+ if (result != FLB_CRYPTO_SUCCESS) {
+ return -1;
+ }
+
+ flb_forward_format_bin_to_hex(hash, 64, buf);
+
+ return 0;
+}
+
+static int secure_forward_ping(struct flb_connection *u_conn,
+ msgpack_object map,
+ struct flb_forward_config *fc,
+ struct flb_forward *ctx)
+{
+ int ret;
+ size_t bytes_sent;
+ char shared_key_hexdigest[128];
+ char password_hexdigest[128];
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ struct flb_forward_ping ping;
+
+ secure_forward_set_ping(&ping, &map);
+
+ if (ping.nonce == NULL) {
+ flb_plg_error(ctx->ins, "nonce not found");
+ return -1;
+ }
+
+ if (secure_forward_hash_shared_key(fc, &ping, shared_key_hexdigest, 128)) {
+ flb_plg_error(ctx->ins, "failed to hash shared_key");
+ return -1;
+ }
+
+ if (ping.auth != NULL) {
+ if (secure_forward_hash_password(fc, &ping, password_hexdigest, 128)) {
+ flb_plg_error(ctx->ins, "failed to hash password");
+ return -1;
+ }
+ }
+
+ /* Prepare outgoing msgpack PING */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+ msgpack_pack_array(&mp_pck, 6);
+
+ /* [0] PING */
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "PING", 4);
+
+ /* [1] Hostname */
+ msgpack_pack_str(&mp_pck, flb_sds_len(fc->self_hostname));
+ msgpack_pack_str_body(&mp_pck, fc->self_hostname,
+ flb_sds_len(fc->self_hostname));
+
+ /* [2] Shared key salt */
+ msgpack_pack_str(&mp_pck, 16);
+ msgpack_pack_str_body(&mp_pck, fc->shared_key_salt, 16);
+
+ /* [3] Shared key in Hexdigest format */
+ msgpack_pack_str(&mp_pck, 128);
+ msgpack_pack_str_body(&mp_pck, shared_key_hexdigest, 128);
+
+ /* [4] Username and password (optional) */
+ if (ping.auth != NULL) {
+ msgpack_pack_str(&mp_pck, strlen(fc->username));
+ msgpack_pack_str_body(&mp_pck, fc->username, strlen(fc->username));
+ msgpack_pack_str(&mp_pck, 128);
+ msgpack_pack_str_body(&mp_pck, password_hexdigest, 128);
+ }
+ else {
+ msgpack_pack_str(&mp_pck, 0);
+ msgpack_pack_str_body(&mp_pck, "", 0);
+ msgpack_pack_str(&mp_pck, 0);
+ msgpack_pack_str_body(&mp_pck, "", 0);
+ }
+
+ ret = fc->io_write(u_conn, fc->unix_fd, mp_sbuf.data, mp_sbuf.size, &bytes_sent);
+ flb_plg_debug(ctx->ins, "PING sent: ret=%i bytes sent=%lu", ret, bytes_sent);
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ if (ret > -1 && bytes_sent > 0) {
+ return 0;
+ }
+
+ return -1;
+}
+
+static int secure_forward_pong(struct flb_forward *ctx, char *buf, int buf_size)
+{
+ int ret;
+ char msg[32] = {0};
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object o;
+
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf, buf_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ return -1;
+ }
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_ARRAY) {
+ goto error;
+ }
+
+ if (root.via.array.size < 4) {
+ goto error;
+ }
+
+ o = root.via.array.ptr[0];
+ if (o.type != MSGPACK_OBJECT_STR) {
+ goto error;
+ }
+
+ if (strncmp(o.via.str.ptr, "PONG", 4) != 0 || o.via.str.size != 4) {
+ goto error;
+ }
+
+ o = root.via.array.ptr[1];
+ if (o.type != MSGPACK_OBJECT_BOOLEAN) {
+ goto error;
+ }
+
+ if (o.via.boolean) {
+ msgpack_unpacked_destroy(&result);
+ return 0;
+ }
+ else {
+ o = root.via.array.ptr[2];
+ memcpy(msg, o.via.str.ptr, o.via.str.size);
+ flb_plg_error(ctx->ins, "failed authorization: %s", msg);
+ }
+
+ error:
+ msgpack_unpacked_destroy(&result);
+ return -1;
+}
+
+static int secure_forward_handshake(struct flb_connection *u_conn,
+ struct flb_forward_config *fc,
+ struct flb_forward *ctx)
+{
+ int ret;
+ char buf[1024];
+ size_t out_len;
+ size_t off;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object o;
+
+ /* Wait for server HELO */
+ ret = secure_forward_read(ctx, u_conn, fc, buf, sizeof(buf) - 1, &out_len);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "handshake error expecting HELO");
+ return -1;
+ }
+
+ /* Unpack message and validate */
+ off = 0;
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf, out_len, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ print_msgpack_status(ctx, ret, "HELO");
+ return -1;
+ }
+
+ /* Parse HELO message */
+ root = result.data;
+ if (root.via.array.size < 2) {
+ flb_plg_error(ctx->ins, "Invalid HELO message");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ o = root.via.array.ptr[0];
+ if (o.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "Invalid HELO type message");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ if (strncmp(o.via.str.ptr, "HELO", 4) != 0 || o.via.str.size != 4) {
+ flb_plg_error(ctx->ins, "Invalid HELO content message");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "protocol: received HELO");
+
+ /* Compose and send PING message */
+ o = root.via.array.ptr[1];
+ ret = secure_forward_ping(u_conn, o, fc, ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed PING");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ /* Expect a PONG */
+ ret = secure_forward_read(ctx, u_conn, fc, buf, sizeof(buf) - 1, &out_len);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "handshake error expecting HELO");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ /* Process PONG */
+ ret = secure_forward_pong(ctx, buf, out_len);
+ if (ret == -1) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ msgpack_unpacked_destroy(&result);
+ return 0;
+}
+
+static int forward_read_ack(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_connection *u_conn,
+ char *chunk, int chunk_len)
+{
+ int ret;
+ int i;
+ size_t out_len;
+ size_t off;
+ const char *ack;
+ size_t ack_len;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object_map map;
+ msgpack_object key;
+ msgpack_object val;
+ char buf[512]; /* ack should never be bigger */
+
+ flb_plg_trace(ctx->ins, "wait ACK (%.*s)", chunk_len, chunk);
+
+ /* Wait for server ACK */
+ ret = secure_forward_read(ctx, u_conn, fc, buf, sizeof(buf) - 1, &out_len);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot get ack");
+ return -1;
+ }
+
+ /* Unpack message and validate */
+ off = 0;
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf, out_len, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ print_msgpack_status(ctx, ret, "ACK");
+ goto error;
+ }
+
+ /* Parse ACK message */
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "ACK response not MAP (type:%d)", root.type);
+ goto error;
+ }
+
+ map = root.via.map;
+ ack = NULL;
+ /* Lookup ack field */
+ for (i = 0; i < map.size; i++) {
+ key = map.ptr[i].key;
+ if (key.via.str.size == 3 && strncmp(key.via.str.ptr, "ack", 3) == 0) {
+ val = map.ptr[i].val;
+ ack_len = val.via.str.size;
+ ack = val.via.str.ptr;
+ break;
+ }
+ }
+
+ if (!ack) {
+ flb_plg_error(ctx->ins, "ack: ack not found");
+ goto error;
+ }
+
+ if (ack_len != chunk_len) {
+ flb_plg_error(ctx->ins,
+ "ack: ack len does not match ack(%ld)(%.*s) chunk(%d)(%.*s)",
+ ack_len, (int) ack_len, ack,
+ chunk_len, (int) chunk_len, chunk);
+ goto error;
+ }
+
+ if (strncmp(ack, chunk, ack_len) != 0) {
+ flb_plg_error(ctx->ins, "ACK: mismatch received=%s, expected=(%.*s)",
+ ack, chunk_len, chunk);
+ goto error;
+ }
+
+ flb_plg_debug(ctx->ins, "protocol: received ACK %.*s", (int)ack_len, ack);
+ msgpack_unpacked_destroy(&result);
+ return 0;
+
+ error:
+ msgpack_unpacked_destroy(&result);
+ return -1;
+}
+
+
+static int forward_config_init(struct flb_forward_config *fc,
+ struct flb_forward *ctx)
+{
+ if (fc->io_read == NULL || fc->io_write == NULL) {
+ flb_plg_error(ctx->ins, "io_read/io_write is NULL");
+ return -1;
+ }
+
+#ifdef FLB_HAVE_TLS
+ /* Initialize Secure Forward mode */
+ if (fc->secured == FLB_TRUE) {
+ secure_forward_init(ctx, fc);
+ }
+#endif
+
+ /* Generate the shared key salt */
+ if (flb_random_bytes(fc->shared_key_salt, 16)) {
+ flb_plg_error(ctx->ins, "cannot generate shared key salt");
+ return -1;
+ }
+
+ mk_list_add(&fc->_head, &ctx->configs);
+ return 0;
+}
+
+static flb_sds_t config_get_property(char *prop,
+ struct flb_upstream_node *node,
+ struct flb_forward *ctx)
+{
+ if (node) {
+ return (flb_sds_t) flb_upstream_node_get_property(prop, node);
+ }
+ else {
+ return (flb_sds_t) flb_output_get_property(prop, ctx->ins);
+ }
+}
+
+static int config_set_properties(struct flb_upstream_node *node,
+ struct flb_forward_config *fc,
+ struct flb_forward *ctx)
+{
+ flb_sds_t tmp;
+
+ /* Shared Key */
+ tmp = config_get_property("empty_shared_key", node, ctx);
+ if (tmp && flb_utils_bool(tmp)) {
+ fc->empty_shared_key = FLB_TRUE;
+ }
+ else {
+ fc->empty_shared_key = FLB_FALSE;
+ }
+
+ tmp = config_get_property("shared_key", node, ctx);
+ if (fc->empty_shared_key) {
+ fc->shared_key = flb_sds_create("");
+ }
+ else if (tmp) {
+ fc->shared_key = flb_sds_create(tmp);
+ }
+ else {
+ fc->shared_key = NULL;
+ }
+
+ tmp = config_get_property("username", node, ctx);
+ if (tmp) {
+ fc->username = tmp;
+ }
+ else {
+ fc->username = "";
+ }
+
+ tmp = config_get_property("password", node, ctx);
+ if (tmp) {
+ fc->password = tmp;
+ }
+ else {
+ fc->password = "";
+ }
+
+ /* Self Hostname */
+ tmp = config_get_property("self_hostname", node, ctx);
+ if (tmp) {
+ fc->self_hostname = flb_sds_create(tmp);
+ }
+ else {
+ fc->self_hostname = flb_sds_create("localhost");
+ }
+
+ /* Backward compatible timing mode */
+ tmp = config_get_property("time_as_integer", node, ctx);
+ if (tmp) {
+ fc->time_as_integer = flb_utils_bool(tmp);
+ }
+ else {
+ fc->time_as_integer = FLB_FALSE;
+ }
+
+ /* send always options (with size) */
+ tmp = config_get_property("send_options", node, ctx);
+ if (tmp) {
+ fc->send_options = flb_utils_bool(tmp);
+ }
+
+ /* add_option -> extra_options: if the user has defined 'add_option'
+ * we need to enable the 'send_options' flag
+ */
+ if (fc->extra_options && mk_list_size(fc->extra_options) > 0) {
+ fc->send_options = FLB_TRUE;
+ }
+
+ /* require ack response (implies send_options) */
+ tmp = config_get_property("require_ack_response", node, ctx);
+ if (tmp) {
+ fc->require_ack_response = flb_utils_bool(tmp);
+ if (fc->require_ack_response) {
+ fc->send_options = FLB_TRUE;
+ }
+ }
+
+ /* Tag Overwrite */
+ tmp = config_get_property("tag", node, ctx);
+ if (tmp) {
+ /* Set the tag */
+ fc->tag = flb_sds_create(tmp);
+ if (!fc->tag) {
+ flb_plg_error(ctx->ins, "cannot allocate tag");
+ return -1;
+ }
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ /* Record Accessor */
+ fc->ra_tag = flb_ra_create(fc->tag, FLB_TRUE);
+ if (!fc->ra_tag) {
+ flb_plg_error(ctx->ins, "cannot create record accessor for tag: %s",
+ fc->tag);
+ return -1;
+ }
+
+ /* Static record accessor ? (no dynamic values from map) */
+ fc->ra_static = flb_ra_is_static(fc->ra_tag);
+#endif
+ }
+ else {
+ fc->tag = NULL;
+
+ }
+
+ /* compress (implies send_options) */
+ tmp = config_get_property("compress", node, ctx);
+ if (tmp) {
+ if (!strcasecmp(tmp, "text")) {
+ fc->compress = COMPRESS_NONE;
+ }
+ else if (!strcasecmp(tmp, "gzip")) {
+ fc->compress = COMPRESS_GZIP;
+ fc->send_options = FLB_TRUE;
+ }
+ else {
+ flb_plg_error(ctx->ins, "invalid compress mode: %s", tmp);
+ return -1;
+ }
+ }
+ else {
+ fc->compress = COMPRESS_NONE;
+ }
+
+ if (fc->compress != COMPRESS_NONE && fc->time_as_integer == FLB_TRUE) {
+ flb_plg_error(ctx->ins, "compress mode %s is incompatible with "
+ "time_as_integer", tmp);
+ return -1;
+ }
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ if (fc->compress != COMPRESS_NONE &&
+ (fc->ra_tag && fc->ra_static == FLB_FALSE) ) {
+ flb_plg_error(ctx->ins, "compress mode %s is incompatible with dynamic "
+ "tags", tmp);
+ return -1;
+ }
+#endif
+
+ return 0;
+}
+
+static void forward_config_destroy(struct flb_forward_config *fc)
+{
+ flb_sds_destroy(fc->shared_key);
+ flb_sds_destroy(fc->self_hostname);
+ flb_sds_destroy(fc->tag);
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ if (fc->ra_tag) {
+ flb_ra_destroy(fc->ra_tag);
+ }
+#endif
+
+ flb_free(fc);
+}
+
+/* Configure in HA mode */
+static int forward_config_ha(const char *upstream_file,
+ struct flb_forward *ctx,
+ struct flb_config *config)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_upstream_node *node;
+ struct flb_forward_config *fc = NULL;
+
+ ctx->ha_mode = FLB_TRUE;
+ ctx->ha = flb_upstream_ha_from_file(upstream_file, config);
+ if (!ctx->ha) {
+ flb_plg_error(ctx->ins, "cannot load Upstream file");
+ return -1;
+ }
+
+ /* Iterate nodes and create a forward_config context */
+ mk_list_foreach(head, &ctx->ha->nodes) {
+ node = mk_list_entry(head, struct flb_upstream_node, _head);
+
+ /* create forward_config context */
+ fc = flb_calloc(1, sizeof(struct flb_forward_config));
+ if (!fc) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "failed config allocation");
+ continue;
+ }
+ fc->unix_fd = -1;
+ fc->secured = FLB_FALSE;
+ fc->io_write = io_net_write;
+ fc->io_read = io_net_read;
+
+ /* Is TLS enabled ? */
+ if (node->tls_enabled == FLB_TRUE) {
+ fc->secured = FLB_TRUE;
+ }
+
+ /* Read properties into 'fc' context */
+ config_set_properties(node, fc, ctx);
+
+ /* Initialize and validate forward_config context */
+ ret = forward_config_init(fc, ctx);
+ if (ret == -1) {
+ if (fc) {
+ forward_config_destroy(fc);
+ }
+ return -1;
+ }
+
+ /* Set our forward_config context into the node */
+ flb_upstream_node_set_data(fc, node);
+ }
+
+ flb_output_upstream_ha_set(ctx->ha, ctx->ins);
+
+ return 0;
+}
+
+static int forward_config_simple(struct flb_forward *ctx,
+ struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ int io_flags;
+ struct flb_forward_config *fc = NULL;
+ struct flb_upstream *upstream;
+
+ /* Set default network configuration if not set */
+ flb_output_net_default("127.0.0.1", 24224, ins);
+
+ /* Configuration context */
+ fc = flb_calloc(1, sizeof(struct flb_forward_config));
+ if (!fc) {
+ flb_errno();
+ return -1;
+ }
+ fc->unix_fd = -1;
+ fc->secured = FLB_FALSE;
+ fc->io_write = NULL;
+ fc->io_read = NULL;
+
+ /* Set default values */
+ ret = flb_output_config_map_set(ins, fc);
+ if (ret == -1) {
+ flb_free(fc);
+ return -1;
+ }
+
+ /* Check if TLS is enabled */
+#ifdef FLB_HAVE_TLS
+ if (ins->use_tls == FLB_TRUE) {
+ io_flags = FLB_IO_TLS;
+ fc->secured = FLB_TRUE;
+ }
+ else {
+ io_flags = FLB_IO_TCP;
+ }
+#else
+ io_flags = FLB_IO_TCP;
+#endif
+
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ if (fc->unix_path) {
+#ifdef FLB_HAVE_UNIX_SOCKET
+ /* In older versions if the UDS server was not up
+ * at this point fluent-bit would fail because it
+ * would not be able to establish the conntection.
+ *
+ * With the concurrency fixes we moved the connection
+ * to a later stage which will cause fluent-bit to
+ * properly launch but if the UDS server is not
+ * available at flush time then an error similar to
+ * the one we would get for a network based output
+ * plugin will be logged and FLB_RETRY will be returned.
+ */
+
+ fc->io_write = io_unix_write;
+ fc->io_read = io_unix_read;
+#else
+ flb_plg_error(ctx->ins, "unix_path is not supported");
+ flb_free(fc);
+ flb_free(ctx);
+ return -1;
+#endif /* FLB_HAVE_UNIX_SOCKET */
+ }
+ else {
+ /* Prepare an upstream handler */
+ upstream = flb_upstream_create(config,
+ ins->host.name,
+ ins->host.port,
+ io_flags, ins->tls);
+ if (!upstream) {
+ flb_free(fc);
+ flb_free(ctx);
+ return -1;
+ }
+ fc->io_write = io_net_write;
+ fc->io_read = io_net_read;
+ ctx->u = upstream;
+ flb_output_upstream_set(ctx->u, ins);
+ }
+ /* Read properties into 'fc' context */
+ config_set_properties(NULL, fc, ctx);
+
+ /* Initialize and validate forward_config context */
+ ret = forward_config_init(fc, ctx);
+ if (ret == -1) {
+ if (fc) {
+ forward_config_destroy(fc);
+ }
+ return -1;
+ }
+
+ return 0;
+}
+
+static int cb_forward_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ const char *tmp;
+ struct flb_forward *ctx;
+ (void) data;
+
+ ctx = flb_calloc(1, sizeof(struct flb_forward));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+
+ ret = pthread_once(&uds_connection_tls_slot_init_once_control,
+ initialize_uds_connection_tls_slot);
+
+ if (ret != 0) {
+ flb_errno();
+ flb_free(ctx);
+
+ return -1;
+ }
+
+ ret = pthread_mutex_init(&ctx->uds_connection_list_mutex, NULL);
+
+ if (ret != 0) {
+ flb_errno();
+ flb_free(ctx);
+
+ return -1;
+ }
+
+ cfl_list_init(&ctx->uds_connection_list);
+
+ ctx->ins = ins;
+ mk_list_init(&ctx->configs);
+ flb_output_set_context(ins, ctx);
+
+
+ /* Configure HA or simple mode ? */
+ tmp = flb_output_get_property("upstream", ins);
+ if (tmp) {
+ ret = forward_config_ha(tmp, ctx, config);
+ }
+ else {
+ ret = forward_config_simple(ctx, ins, config);
+ }
+
+ return ret;
+}
+
+struct flb_forward_config *flb_forward_target(struct flb_forward *ctx,
+ struct flb_upstream_node **node)
+{
+ struct flb_forward_config *fc = NULL;
+ struct flb_upstream_node *f_node;
+
+ if (ctx->ha_mode == FLB_TRUE) {
+ f_node = flb_upstream_ha_node_get(ctx->ha);
+ if (!f_node) {
+ return NULL;
+ }
+
+ /* Get forward_config stored in node opaque data */
+ fc = flb_upstream_node_get_data(f_node);
+ *node = f_node;
+ }
+ else {
+ fc = mk_list_entry_first(&ctx->configs,
+ struct flb_forward_config,
+ _head);
+ *node = NULL;
+ }
+ return fc;
+}
+
+static int flush_message_mode(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_connection *u_conn,
+ char *buf, size_t size)
+{
+ int ret;
+ int ok = MSGPACK_UNPACK_SUCCESS;
+ size_t sent = 0;
+ size_t rec_size;
+ size_t pre = 0;
+ size_t off = 0;
+ msgpack_object root;
+ msgpack_object options;
+ msgpack_object chunk;
+ msgpack_unpacked result;
+
+ /* If the sender requires 'ack' from the remote end-point */
+ if (fc->require_ack_response) {
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, buf, size, &off) == ok) {
+ /* get the record size */
+ rec_size = off - pre;
+
+ /* write single message */
+ ret = fc->io_write(u_conn,fc->unix_fd,
+ buf + pre, rec_size, &sent);
+ pre = off;
+
+ if (ret == -1) {
+ /*
+ * FIXME: we might take advantage of 'flush_ctx' and store the
+ * message that failed it delivery, we could have retries but with
+ * the flush context.
+ */
+ flb_plg_error(ctx->ins, "message_mode: error sending message");
+ msgpack_unpacked_destroy(&result);
+ return FLB_RETRY;
+ }
+
+ /* Sucessful delivery, now get message 'chunk' and wait for it */
+ root = result.data;
+ options = root.via.array.ptr[3];
+ chunk = options.via.map.ptr[0].val;
+
+ /* Read ACK */
+ ret = forward_read_ack(ctx, fc, u_conn,
+ (char *) chunk.via.str.ptr, chunk.via.str.size);
+ if (ret == -1) {
+ msgpack_unpacked_destroy(&result);
+ return FLB_RETRY;
+ }
+ }
+
+ /* All good */
+ msgpack_unpacked_destroy(&result);
+ return FLB_OK;
+ }
+
+ /* Normal data write */
+ ret = fc->io_write(u_conn, fc->unix_fd, buf, size, &sent);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "message_mode: error sending data");
+ return FLB_RETRY;
+ }
+
+ return FLB_OK;
+}
+
+/* pack payloads of cmetrics or ctraces with Fluentd compat format */
+static int pack_metricses_payload(msgpack_packer *mp_pck, const void *data, size_t bytes) {
+ int entries;
+ struct flb_time tm;
+
+ /* Format with event stream format of entries: [[time, [{entries map}]]] */
+ msgpack_pack_array(mp_pck, 1);
+ msgpack_pack_array(mp_pck, 2);
+ flb_time_get(&tm);
+ flb_time_append_to_msgpack(&tm, mp_pck, 0);
+ entries = flb_mp_count(data, bytes);
+ msgpack_pack_array(mp_pck, entries);
+
+ return 0;
+}
+
+#include <fluent-bit/flb_pack.h>
+/*
+ * Forward Mode: this is the generic mechanism used in Fluent Bit, it takes
+ * advantage of the internal data representation and avoid re-formatting data,
+ * it only sends a msgpack header, pre-existent 'data' records and options.
+ *
+ * note: if the user has enabled time_as_integer (compat mode for Fluentd <= 0.12),
+ * the 'flush_forward_compat_mode' is used instead.
+ */
+static int flush_forward_mode(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_connection *u_conn,
+ int event_type,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ char *opts_buf, size_t opts_size)
+{
+ int ret;
+ int entries;
+ int send_options;
+ size_t off = 0;
+ size_t bytes_sent;
+ msgpack_object root;
+ msgpack_object chunk;
+ msgpack_unpacked result;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ void *final_data;
+ size_t final_bytes;
+ char *transcoded_buffer;
+ size_t transcoded_length;
+
+ transcoded_buffer = NULL;
+ transcoded_length = 0;
+
+ /* Pack message header */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ send_options = fc->send_options;
+ if (event_type == FLB_EVENT_TYPE_METRICS || event_type == FLB_EVENT_TYPE_TRACES) {
+ send_options = FLB_TRUE;
+ }
+ msgpack_pack_array(&mp_pck, send_options ? 3 : 2);
+
+ /* Tag */
+ flb_forward_format_append_tag(ctx, fc, &mp_pck, NULL, tag, tag_len);
+
+ if (!fc->fwd_retain_metadata && event_type == FLB_EVENT_TYPE_LOGS) {
+ ret = flb_forward_format_transcode(ctx, FLB_LOG_EVENT_FORMAT_FORWARD,
+ (char *) data, bytes,
+ &transcoded_buffer,
+ &transcoded_length);
+
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "could not transcode entries");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ return FLB_RETRY;
+ }
+ }
+
+ if (fc->compress == COMPRESS_GZIP) {
+ /* When compress is set, we switch from using Forward mode to using
+ * CompressedPackedForward mode.
+ */
+
+ if (transcoded_buffer != NULL) {
+ ret = flb_gzip_compress((void *) transcoded_buffer,
+ transcoded_length,
+ &final_data,
+ &final_bytes);
+ }
+ else {
+ ret = flb_gzip_compress((void *) data, bytes, &final_data, &final_bytes);
+ }
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not compress entries");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ if (transcoded_buffer != NULL) {
+ flb_free(transcoded_buffer);
+ }
+
+ return FLB_RETRY;
+ }
+
+ msgpack_pack_bin(&mp_pck, final_bytes);
+ }
+ else {
+ if (transcoded_buffer != NULL) {
+ final_data = (void *) transcoded_buffer;
+ final_bytes = transcoded_length;
+ }
+ else {
+ final_data = (void *) data;
+ final_bytes = bytes;
+ }
+
+ if (event_type == FLB_EVENT_TYPE_LOGS) {
+ /* for log events we create an array for the serialized messages */
+ entries = flb_mp_count(data, bytes);
+ msgpack_pack_array(&mp_pck, entries);
+ }
+ else {
+ /* FLB_EVENT_TYPE_METRICS and FLB_EVENT_TYPE_TRACES */
+ if (fc->fluentd_compat) {
+ pack_metricses_payload(&mp_pck, data, bytes);
+ }
+ else {
+ msgpack_pack_bin(&mp_pck, final_bytes);
+ }
+ }
+ }
+
+ /* Write message header */
+ ret = fc->io_write(u_conn, fc->unix_fd, mp_sbuf.data, mp_sbuf.size, &bytes_sent);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not write forward header");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (fc->compress == COMPRESS_GZIP) {
+ flb_free(final_data);
+ }
+
+ if (transcoded_buffer != NULL) {
+ flb_free(transcoded_buffer);
+ }
+
+ return FLB_RETRY;
+ }
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ /* Write msgpack content / entries */
+ ret = fc->io_write(u_conn, fc->unix_fd, final_data, final_bytes, &bytes_sent);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not write forward entries");
+ if (fc->compress == COMPRESS_GZIP) {
+ flb_free(final_data);
+ }
+
+ if (transcoded_buffer != NULL) {
+ flb_free(transcoded_buffer);
+ }
+
+ return FLB_RETRY;
+ }
+
+ if (fc->compress == COMPRESS_GZIP) {
+ flb_free(final_data);
+ }
+
+ if (transcoded_buffer != NULL) {
+ flb_free(transcoded_buffer);
+ }
+
+ /* Write options */
+ if (send_options == FLB_TRUE) {
+ ret = fc->io_write(u_conn, fc->unix_fd, opts_buf, opts_size, &bytes_sent);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not write forward options");
+ return FLB_RETRY;
+ }
+ }
+
+ /* If the sender requires 'ack' from the remote end-point */
+ if (fc->require_ack_response) {
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, opts_buf, opts_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ /* Sucessful delivery, now get message 'chunk' and wait for it */
+ root = result.data;
+
+ /* 'chunk' is always in the first key of the map */
+ chunk = root.via.map.ptr[0].val;
+
+ /* Read ACK */
+ ret = forward_read_ack(ctx, fc, u_conn,
+ (char *) chunk.via.str.ptr, chunk.via.str.size);
+ if (ret == -1) {
+ msgpack_unpacked_destroy(&result);
+ return FLB_RETRY;
+ }
+
+ /* All good */
+ msgpack_unpacked_destroy(&result);
+ return FLB_OK;
+ }
+
+ return FLB_OK;
+}
+
+/*
+ * Forward Mode Compat: data is packaged in Forward mode but the timestamps are
+ * integers (compat mode for Fluentd <= 0.12).
+ */
+static int flush_forward_compat_mode(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_connection *u_conn,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes)
+{
+ int ret;
+ size_t off = 0;
+ size_t bytes_sent;
+ msgpack_object root;
+ msgpack_object chunk;
+ msgpack_object map; /* dummy parameter */
+ msgpack_unpacked result;
+
+ /* Write message header */
+ ret = fc->io_write(u_conn, fc->unix_fd, data, bytes, &bytes_sent);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not write forward compat mode records");
+ return FLB_RETRY;
+ }
+
+ /* If the sender requires 'ack' from the remote end-point */
+ if (fc->require_ack_response) {
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, data, bytes, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ /* Sucessful delivery, now get message 'chunk' and wait for it */
+ root = result.data;
+
+ map = root.via.array.ptr[2];
+
+ /* 'chunk' is always in the first key of the map */
+ chunk = map.via.map.ptr[0].val;
+
+ /* Read ACK */
+ ret = forward_read_ack(ctx, fc, u_conn,
+ (char *) chunk.via.str.ptr, chunk.via.str.size);
+ if (ret == -1) {
+ msgpack_unpacked_destroy(&result);
+ return FLB_RETRY;
+ }
+
+ /* All good */
+ msgpack_unpacked_destroy(&result);
+ return FLB_OK;
+ }
+
+ return FLB_OK;
+}
+
+static void cb_forward_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret = -1;
+ int mode;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ void *out_buf = NULL;
+ size_t out_size = 0;
+ struct flb_forward *ctx = out_context;
+ struct flb_forward_config *fc = NULL;
+ struct flb_connection *u_conn = NULL;
+ struct flb_upstream_node *node = NULL;
+ struct flb_forward_flush *flush_ctx;
+ flb_sockfd_t uds_conn;
+
+ (void) i_ins;
+ (void) config;
+
+ fc = flb_forward_target(ctx, &node);
+ if (!fc) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ flb_plg_debug(ctx->ins, "request %lu bytes to flush",
+ event_chunk->size);
+
+ /* Initialize packager */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /*
+ * Flush context: structure used to pass custom information to the
+ * formatter function.
+ */
+ flush_ctx = flb_calloc(1, sizeof(struct flb_forward_flush));
+ if (!flush_ctx) {
+ flb_errno();
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ flush_ctx->fc = fc;
+
+ /* Format the right payload and retrieve the 'forward mode' used */
+ mode = flb_forward_format(config, i_ins, ctx, flush_ctx,
+ event_chunk->type,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ event_chunk->data, event_chunk->size,
+ &out_buf, &out_size);
+
+ /* Get a TCP connection instance */
+ if (fc->unix_path == NULL) {
+ if (ctx->ha_mode == FLB_TRUE) {
+ u_conn = flb_upstream_conn_get(node->u);
+ }
+ else {
+ u_conn = flb_upstream_conn_get(ctx->u);
+ }
+
+ if (!u_conn) {
+ flb_plg_error(ctx->ins, "no upstream connections available");
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (fc->time_as_integer == FLB_TRUE) {
+ flb_free(out_buf);
+ }
+ flb_free(flush_ctx);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ uds_conn = -1;
+ }
+ else {
+ uds_conn = forward_uds_get_conn(fc, ctx);
+
+ if (uds_conn == -1) {
+ flb_plg_error(ctx->ins, "no unix socket connection available");
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (fc->time_as_integer == FLB_TRUE) {
+ flb_free(out_buf);
+ }
+ flb_free(flush_ctx);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* This is a hack, because the rest of the code is written to use
+ * the shared forward config unix_fd field so at this point we need
+ * to ensure that we either have a working connection or we can
+ * establish one regardless of not passing it along.
+ *
+ * Later on we will get the file descriptor from the TLS.
+ */
+ }
+
+ /*
+ * Shared Key: if ka_count > 0 it means the handshake has already been done lately
+ */
+ if (fc->shared_key && u_conn->ka_count == 0) {
+ ret = secure_forward_handshake(u_conn, fc, ctx);
+ flb_plg_debug(ctx->ins, "handshake status = %i", ret);
+ if (ret == -1) {
+ if (u_conn) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ if (uds_conn != -1) {
+ forward_uds_drop_conn(ctx, uds_conn);
+ }
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ if (fc->time_as_integer == FLB_TRUE) {
+ flb_free(out_buf);
+ }
+ flb_free(flush_ctx);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ }
+
+ /*
+ * Note about the mode used for different type of events/messages:
+ *
+ * - Logs can be send either by using MODE_MESSAGE, MODE_FORWARD
+ * OR MODE_FORWARD_COMPAT.
+ *
+ * - Metrics and Traces uses MODE_FORWARD only.
+ */
+
+ if (mode == MODE_MESSAGE) {
+ ret = flush_message_mode(ctx, fc, u_conn, out_buf, out_size);
+ flb_free(out_buf);
+ }
+ else if (mode == MODE_FORWARD) {
+ ret = flush_forward_mode(ctx, fc, u_conn,
+ event_chunk->type,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ event_chunk->data, event_chunk->size,
+ out_buf, out_size);
+ flb_free(out_buf);
+ }
+ else if (mode == MODE_FORWARD_COMPAT) {
+ ret = flush_forward_compat_mode(ctx, fc, u_conn,
+ event_chunk->tag,
+ flb_sds_len(event_chunk->tag),
+ out_buf, out_size);
+ flb_free(out_buf);
+ }
+
+ if (u_conn) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ if (ret != FLB_OK) {
+ /* Since UDS connections have been used as permanent
+ * connections up to this point we only release the
+ * connection in case of error.
+ *
+ * There could be a logical error in here but what
+ * I think at the moment is, if something goes wrong
+ * we can just drop the connection and let the worker
+ * establish a new one the next time a flush happens.
+ */
+
+ if (uds_conn != -1) {
+ forward_uds_drop_conn(ctx, uds_conn);
+ }
+ }
+
+ flb_free(flush_ctx);
+ FLB_OUTPUT_RETURN(ret);
+}
+
+static int cb_forward_exit(void *data, struct flb_config *config)
+{
+ struct flb_forward *ctx = data;
+ struct flb_forward_config *fc;
+ struct mk_list *head;
+ struct mk_list *tmp;
+ (void) config;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ /* Destroy forward_config contexts */
+ mk_list_foreach_safe(head, tmp, &ctx->configs) {
+ fc = mk_list_entry(head, struct flb_forward_config, _head);
+
+ mk_list_del(&fc->_head);
+ forward_config_destroy(fc);
+ }
+
+ forward_uds_drop_all(ctx);
+
+ if (ctx->ha_mode == FLB_TRUE) {
+ if (ctx->ha) {
+ flb_upstream_ha_destroy(ctx->ha);
+ }
+ }
+ else {
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ }
+ }
+
+ pthread_mutex_destroy(&ctx->uds_connection_list_mutex);
+
+ flb_free(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_BOOL, "time_as_integer", "false",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, time_as_integer),
+ "Set timestamp in integer format (compat mode for old Fluentd v0.12)"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "retain_metadata_in_forward_mode", "false",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, fwd_retain_metadata),
+ "Retain metadata when operating in forward mode"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "shared_key", NULL,
+ 0, FLB_FALSE, 0,
+ "Shared key for authentication"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "self_hostname", NULL,
+ 0, FLB_FALSE, 0,
+ "Hostname"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "empty_shared_key", "false",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, empty_shared_key),
+ "Set an empty shared key for authentication"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "send_options", "false",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, send_options),
+ "Send 'forward protocol options' to remote endpoint"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "require_ack_response", "false",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, require_ack_response),
+ "Require that remote endpoint confirms data reception"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "username", "",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, username),
+ "Username for authentication"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "password", "",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, password),
+ "Password for authentication"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "unix_path", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, unix_path),
+ "Path to unix socket. It is ignored when 'upstream' property is set"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "upstream", NULL,
+ 0, FLB_FALSE, 0,
+ "Path to 'upstream' configuration file (define multiple nodes)"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tag", NULL,
+ 0, FLB_FALSE, 0,
+ "Set a custom Tag for the outgoing records"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "compress", NULL,
+ 0, FLB_FALSE, 0,
+ "Compression mode"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "fluentd_compat", "false",
+ 0, FLB_TRUE, offsetof(struct flb_forward_config, fluentd_compat),
+ "Send metrics and traces with Fluentd compatible format"
+ },
+
+ {
+ FLB_CONFIG_MAP_SLIST_2, "add_option", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_forward_config, extra_options),
+ "Set an extra Forward protocol option. This is an advance feature, use it only for "
+ "very specific use-cases."
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_output_plugin out_forward_plugin = {
+ .name = "forward",
+ .description = "Forward (Fluentd protocol)",
+
+ /* Callbacks */
+ .cb_init = cb_forward_init,
+ .cb_pre_run = NULL,
+ .cb_flush = cb_forward_flush,
+ .cb_exit = cb_forward_exit,
+ .workers = 2,
+
+ /* Config map validator */
+ .config_map = config_map,
+
+ /* Test */
+ .test_formatter.callback = flb_forward_format,
+
+ /* Flags */
+ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
+
+ /* Event types */
+ .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES
+};
diff --git a/src/fluent-bit/plugins/out_forward/forward.h b/src/fluent-bit/plugins/out_forward/forward.h
new file mode 100644
index 000000000..8e77e6e11
--- /dev/null
+++ b/src/fluent-bit/plugins/out_forward/forward.h
@@ -0,0 +1,146 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_FORWARD
+#define FLB_OUT_FORWARD
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_upstream_ha.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_connection.h>
+#include <fluent-bit/flb_pthread.h>
+#include <cfl/cfl_list.h>
+
+/*
+ * Forward modes
+ * =============
+ */
+
+/*
+ * Message mode
+ * ------------
+ * https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#message-modes
+ */
+#define MODE_MESSAGE 0
+
+/*
+ * Forward mode
+ * ------------
+ * https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode
+ */
+#define MODE_FORWARD 1
+
+/*
+ * Forward Compat: similar to MODE_FORWARD, but it sends the timestamps as unsigned
+ * integers for compatibility with very old versions of Fluentd that don't have timestamps
+ * with nanoseconds. This mode only applies for Logs.
+ */
+#define MODE_FORWARD_COMPAT 3
+
+/* Compression options */
+#define COMPRESS_NONE 0
+#define COMPRESS_GZIP 1
+
+/*
+ * Configuration: we put this separate from the main
+ * context so every Upstream Node can have it own configuration
+ * reference and pass it smoothly to the required caller.
+ *
+ * On simple mode (no HA), the structure is referenced
+ * by flb_forward->config. In HA mode the structure is referenced
+ * by the Upstream node context as an opaque data type.
+ */
+struct flb_forward_config {
+ int secured; /* Using Secure Forward mode ? */
+ int compress; /* Using compression ? */
+ int time_as_integer; /* Use backward compatible timestamp ? */
+ int fluentd_compat; /* Use Fluentd compatible payload for
+ * metrics and ctraces */
+
+ /* add extra options to the Forward payload (advanced) */
+ struct mk_list *extra_options;
+
+ int fwd_retain_metadata; /* Do not drop metadata in forward mode */
+
+ /* config */
+ flb_sds_t shared_key; /* shared key */
+ flb_sds_t self_hostname; /* hostname used in certificate */
+ flb_sds_t tag; /* Overwrite tag on forward */
+ int empty_shared_key; /* use an empty string as shared key */
+ int require_ack_response; /* Require acknowledge for "chunk" */
+ int send_options; /* send options in messages */
+ flb_sds_t unix_path; /* unix socket path */
+ int unix_fd;
+
+ const char *username;
+ const char *password;
+
+ /* mbedTLS specifics */
+ unsigned char shared_key_salt[16];
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ struct flb_record_accessor *ra_tag; /* Tag Record accessor */
+ int ra_static; /* Is the record accessor static ? */
+#endif
+ int (*io_write)(struct flb_connection* conn, int fd, const void* data,
+ size_t len, size_t *out_len);
+ int (*io_read)(struct flb_connection* conn, int fd, void* buf, size_t len);
+ struct mk_list _head; /* Link to list flb_forward->configs */
+};
+
+struct flb_forward_uds_connection {
+ flb_sockfd_t descriptor;
+ struct cfl_list _head; /* Link to list flb_forward->uds_connnection_list */
+};
+
+/* Plugin Context */
+struct flb_forward {
+ /* if HA mode is enabled */
+ int ha_mode; /* High Availability mode enabled ? */
+ char *ha_upstream; /* Upstream configuration file */
+ struct flb_upstream_ha *ha;
+
+ struct cfl_list uds_connection_list;
+ pthread_mutex_t uds_connection_list_mutex;
+
+ /* Upstream handler and config context for single mode (no HA) */
+ struct flb_upstream *u;
+ struct mk_list configs;
+ struct flb_output_instance *ins;
+};
+
+struct flb_forward_ping {
+ const char *nonce;
+ int nonce_len;
+ const char *auth;
+ int auth_len;
+ int keepalive;
+};
+
+/* Flush callback context */
+struct flb_forward_flush {
+ struct flb_forward_config *fc;
+ char checksum_hex[33];
+};
+
+struct flb_forward_config *flb_forward_target(struct flb_forward *ctx,
+ struct flb_upstream_node **node);
+
+#endif
diff --git a/src/fluent-bit/plugins/out_forward/forward_format.c b/src/fluent-bit/plugins/out_forward/forward_format.c
new file mode 100644
index 000000000..48dedd862
--- /dev/null
+++ b/src/fluent-bit/plugins/out_forward/forward_format.c
@@ -0,0 +1,640 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_mp.h>
+#include <fluent-bit/flb_hash.h>
+#include <fluent-bit/flb_crypto.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+
+#include "forward.h"
+
+void flb_forward_format_bin_to_hex(uint8_t *buf, size_t len, char *out)
+{
+ int i;
+ static char map[] = "0123456789abcdef";
+
+ for (i = 0; i < len; i++) {
+ out[i * 2] = map[buf[i] >> 4];
+ out[i * 2 + 1] = map[buf[i] & 0x0f];
+ }
+}
+
+int flb_forward_format_append_tag(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ msgpack_packer *mp_pck,
+ msgpack_object *map,
+ const char *tag, int tag_len)
+{
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ flb_sds_t tmp;
+ msgpack_object m;
+
+ memset(&m, 0, sizeof(m));
+
+ if (!fc->ra_tag) {
+ msgpack_pack_str(mp_pck, tag_len);
+ msgpack_pack_str_body(mp_pck, tag, tag_len);
+ return 0;
+ }
+
+ if (map) {
+ m = *map;
+ }
+
+ /* Tag */
+ tmp = flb_ra_translate(fc->ra_tag, (char *) tag, tag_len, m, NULL);
+ if (!tmp) {
+ flb_plg_warn(ctx->ins, "Tag translation failed, using default Tag");
+ msgpack_pack_str(mp_pck, tag_len);
+ msgpack_pack_str_body(mp_pck, tag, tag_len);
+ }
+ else {
+ msgpack_pack_str(mp_pck, flb_sds_len(tmp));
+ msgpack_pack_str_body(mp_pck, tmp, flb_sds_len(tmp));
+ flb_sds_destroy(tmp);
+ }
+#else
+ msgpack_pack_str(mp_pck, tag_len);
+ msgpack_pack_str_body(mp_pck, tag, tag_len);
+
+#endif
+
+ return 0;
+}
+
+static int append_options(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ int event_type,
+ msgpack_packer *mp_pck,
+ int entries, void *data, size_t bytes,
+ msgpack_object *metadata,
+ char *out_chunk)
+{
+ char *chunk = NULL;
+ uint8_t checksum[64];
+ int result;
+ struct mk_list *head;
+ struct flb_config_map_val *mv;
+ struct flb_mp_map_header mh;
+ struct flb_slist_entry *eopt_key;
+ struct flb_slist_entry *eopt_val;
+
+ /* options is map, use the dynamic map type */
+ flb_mp_map_header_init(&mh, mp_pck);
+
+ if (fc->require_ack_response == FLB_TRUE) {
+ /*
+ * for ack we calculate sha512 of context, take 16 bytes,
+ * make 32 byte hex string of it
+ */
+ result = flb_hash_simple(FLB_HASH_SHA512,
+ data, bytes,
+ checksum, sizeof(checksum));
+
+ if (result != FLB_CRYPTO_SUCCESS) {
+ return -1;
+ }
+
+ flb_forward_format_bin_to_hex(checksum, 16, out_chunk);
+
+ out_chunk[32] = '\0';
+ chunk = (char *) out_chunk;
+ }
+
+ /* "chunk": '<checksum-base-64>' */
+ if (chunk) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(mp_pck, 5);
+ msgpack_pack_str_body(mp_pck, "chunk", 5);
+ msgpack_pack_str(mp_pck, 32);
+ msgpack_pack_str_body(mp_pck, out_chunk, 32);
+ }
+
+ /* "size": entries */
+ if (entries > 0) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(mp_pck, 4);
+ msgpack_pack_str_body(mp_pck, "size", 4);
+ msgpack_pack_int64(mp_pck, entries);
+ }
+
+ /* "compressed": "gzip" */
+ if (entries > 0 && /* not message mode */
+ fc->time_as_integer == FLB_FALSE && /* not compat mode */
+ fc->compress == COMPRESS_GZIP) {
+
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(mp_pck, 10);
+ msgpack_pack_str_body(mp_pck, "compressed", 10);
+ msgpack_pack_str(mp_pck, 4);
+ msgpack_pack_str_body(mp_pck, "gzip", 4);
+ }
+
+ /* event type (FLB_EVENT_TYPE_LOGS, FLB_EVENT_TYPE_METRICS, FLB_EVENT_TYPE_TRACES) */
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(mp_pck, 13);
+ msgpack_pack_str_body(mp_pck, "fluent_signal", 13);
+ msgpack_pack_int64(mp_pck, event_type);
+
+ /* process 'extra_option(s)' */
+ if (fc->extra_options) {
+ flb_config_map_foreach(head, mv, fc->extra_options) {
+ eopt_key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
+ eopt_val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
+
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(mp_pck, flb_sds_len(eopt_key->str));
+ msgpack_pack_str_body(mp_pck, eopt_key->str, flb_sds_len(eopt_key->str));
+ msgpack_pack_str(mp_pck, flb_sds_len(eopt_val->str));
+ msgpack_pack_str_body(mp_pck, eopt_val->str, flb_sds_len(eopt_val->str));
+ }
+ }
+
+ if (metadata != NULL &&
+ metadata->type == MSGPACK_OBJECT_MAP &&
+ metadata->via.map.size > 0) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str_with_body(mp_pck, "metadata", 8);
+ msgpack_pack_object(mp_pck, *metadata);
+ }
+
+ flb_mp_map_header_end(&mh);
+
+ flb_plg_debug(ctx->ins,
+ "send options records=%d chunk='%s'",
+ entries, out_chunk ? out_chunk : "NULL");
+ return 0;
+}
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+/*
+ * Forward Protocol: Message Mode
+ * ------------------------------
+ * This mode is only used if the Tag is dynamically composed using some
+ * content of the records.
+ *
+ * [
+ * "TAG",
+ * TIMESTAMP,
+ * RECORD/MAP,
+ * *OPTIONS*
+ * ]
+ *
+ */
+static int flb_forward_format_message_mode(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_forward_flush *ff,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_buf, size_t *out_size)
+{
+ int entries = 0;
+ size_t pre = 0;
+ size_t off = 0;
+ size_t record_size;
+ char *chunk;
+ char chunk_buf[33];
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct flb_time tm;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+
+ /*
+ * Our only reason to use Message Mode is because the user wants to generate
+ * dynamic Tags based on records content.
+ */
+ if (!fc->ra_tag) {
+ return -1;
+ }
+
+ /*
+ * if the case, we need to compose a new outgoing buffer instead
+ * of use the original one.
+ */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ flb_time_copy(&tm, &log_event.timestamp);
+
+ /* Prepare main array: tag, timestamp and record/map */
+ msgpack_pack_array(&mp_pck, 4);
+
+ /* Generate dynamic Tag or use default one */
+ flb_forward_format_append_tag(ctx, fc, &mp_pck,
+ log_event.body,
+ tag, tag_len);
+
+ /* Pack timestamp */
+ if (fc->time_as_integer == FLB_TRUE) {
+ flb_time_append_to_msgpack(&log_event.timestamp,
+ &mp_pck,
+ FLB_TIME_ETFMT_INT);
+ }
+ else {
+ flb_time_append_to_msgpack(&log_event.timestamp,
+ &mp_pck,
+ FLB_TIME_ETFMT_V1_FIXEXT);
+ }
+
+ /* Pack records */
+ msgpack_pack_object(&mp_pck, *log_event.body);
+
+ record_size = off - pre;
+
+ if (ff) {
+ chunk = ff->checksum_hex;
+ }
+ else {
+ chunk = chunk_buf;
+ }
+
+ append_options(ctx, fc, FLB_EVENT_TYPE_LOGS, &mp_pck, 0,
+ (char *) data + pre, record_size,
+ log_event.metadata,
+ chunk);
+
+ pre = off;
+ entries++;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return entries;
+}
+#endif
+
+int flb_forward_format_transcode(
+ struct flb_forward *ctx, int format,
+ char *input_buffer, size_t input_length,
+ char **output_buffer, size_t *output_length)
+{
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int result;
+
+ result = flb_log_event_decoder_init(&log_decoder, input_buffer, input_length);
+
+ if (result != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", result);
+
+ return -1;
+ }
+
+ result = flb_log_event_encoder_init(&log_encoder, format);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", result);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return -1;
+ }
+
+ while ((result = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+
+ result = flb_log_event_encoder_begin_record(&log_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(
+ &log_encoder, &log_event.timestamp);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_metadata_from_msgpack_object(
+ &log_encoder,
+ log_event.metadata);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_body_from_msgpack_object(
+ &log_encoder,
+ log_event.body);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(&log_encoder);
+ }
+ }
+
+ if (log_encoder.output_length > 0) {
+ *output_buffer = log_encoder.output_buffer;
+ *output_length = log_encoder.output_length;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+
+ result = 0;
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", result);
+
+ result = -1;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return result;
+}
+
+/*
+ * Forward Protocol: Forward Mode
+ * ------------------------------
+ * In forward mode we don't format the serialized entries. We just compose
+ * the outgoing 'options'.
+ */
+static int flb_forward_format_forward_mode(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_forward_flush *ff,
+ int event_type,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_buf, size_t *out_size)
+{
+ int result;
+ int entries = 0;
+ char *chunk;
+ char chunk_buf[33];
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ char *transcoded_buffer;
+ size_t transcoded_length;
+
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ if (ff) {
+ chunk = ff->checksum_hex;
+ }
+ else {
+ chunk = chunk_buf;
+ }
+
+ if (fc->send_options == FLB_TRUE || (event_type == FLB_EVENT_TYPE_METRICS || event_type == FLB_EVENT_TYPE_TRACES)) {
+ if (event_type == FLB_EVENT_TYPE_LOGS) {
+ entries = flb_mp_count(data, bytes);
+ }
+ else {
+ /* for non logs, we don't count the number of entries */
+ entries = 0;
+ }
+
+ if (!fc->fwd_retain_metadata && event_type == FLB_EVENT_TYPE_LOGS) {
+ result = flb_forward_format_transcode(ctx, FLB_LOG_EVENT_FORMAT_FORWARD,
+ (char *) data, bytes,
+ &transcoded_buffer,
+ &transcoded_length);
+
+ if (result == 0) {
+ append_options(ctx, fc, event_type, &mp_pck, entries,
+ transcoded_buffer,
+ transcoded_length,
+ NULL, chunk);
+
+ free(transcoded_buffer);
+ }
+ }
+ else {
+ append_options(ctx, fc, event_type, &mp_pck, entries, (char *) data, bytes, NULL, chunk);
+ }
+ }
+
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return 0;
+}
+
+/*
+ * Forward Protocol: Forward Mode Compat (for Fluentd <= 0.12)
+ * -----------------------------------------------------------
+ * Use Forward mode but format the timestamp as integers
+ *
+ * note: yes, the function name it's a big long...
+ */
+static int flb_forward_format_forward_compat_mode(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ struct flb_forward_flush *ff,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_buf, size_t *out_size)
+{
+ int entries = 0;
+ char *chunk;
+ char chunk_buf[33];
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ if (ff) {
+ chunk = ff->checksum_hex;
+ }
+ else {
+ chunk = chunk_buf;
+ }
+
+ msgpack_pack_array(&mp_pck, fc->send_options ? 3 : 2);
+
+ /* Tag */
+ flb_forward_format_append_tag(ctx, fc, &mp_pck,
+ NULL, tag, tag_len);
+
+ /* Entries */
+ entries = flb_mp_count(data, bytes);
+ msgpack_pack_array(&mp_pck, entries);
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ msgpack_pack_array(&mp_pck, 2);
+
+ /* Pack timestamp */
+ if (fc->time_as_integer == FLB_TRUE) {
+ flb_time_append_to_msgpack(&log_event.timestamp,
+ &mp_pck,
+ FLB_TIME_ETFMT_INT);
+ }
+ else {
+ flb_time_append_to_msgpack(&log_event.timestamp,
+ &mp_pck,
+ FLB_TIME_ETFMT_V1_FIXEXT);
+ }
+
+ /* Pack records */
+ msgpack_pack_object(&mp_pck, *log_event.body);
+ }
+
+ if (fc->send_options == FLB_TRUE) {
+ append_options(ctx, fc, FLB_EVENT_TYPE_LOGS, &mp_pck, entries,
+ (char *) data, bytes, NULL, chunk);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return 0;
+}
+
+int flb_forward_format(struct flb_config *config,
+ struct flb_input_instance *ins,
+ void *ins_ctx,
+ void *flush_ctx,
+ int event_type,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_buf, size_t *out_size)
+{
+ int ret = 0;
+ int mode = MODE_FORWARD;
+ struct flb_upstream_node *node = NULL;
+ struct flb_forward_config *fc;
+ struct flb_forward_flush *ff = flush_ctx;
+ struct flb_forward *ctx = ins_ctx;
+
+ if (!flush_ctx) {
+ fc = flb_forward_target(ctx, &node);
+ }
+ else {
+ fc = ff->fc;
+ }
+
+ if (!fc) {
+ flb_plg_error(ctx->ins, "cannot get an Upstream single or HA node");
+ return -1;
+ }
+
+ if (event_type == FLB_EVENT_TYPE_METRICS) {
+ mode = MODE_FORWARD;
+ goto do_formatting;
+ }
+ else if (event_type == FLB_EVENT_TYPE_TRACES) {
+ mode = MODE_FORWARD;
+ goto do_formatting;
+ }
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ /*
+ * Based in the configuration, decide the preferred protocol mode
+ */
+ if (fc->ra_tag && fc->ra_static == FLB_FALSE) {
+ /*
+ * Dynamic tag per records needs to include the Tag for every entry,
+ * if record accessor option has been enabled we jump into this
+ * mode.
+ */
+ mode = MODE_MESSAGE;
+ }
+ else {
+#endif
+ /* Forward Modes */
+ if (fc->time_as_integer == FLB_FALSE) {
+ /*
+ * In forward mode we optimize in memory allocation and we reuse the
+ * original msgpack buffer. So we don't compose the outgoing buffer
+ * and just let the caller handle it.
+ */
+ mode = MODE_FORWARD;
+ }
+ else if (fc->time_as_integer == FLB_TRUE) {
+ /*
+ * This option is similar to MODE_FORWARD but since we have to convert the
+ * timestamp to integer type, we need to format the buffer (in the previous
+ * case we avoid that step.
+ */
+ mode = MODE_FORWARD_COMPAT;
+ }
+
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ }
+#endif
+
+
+do_formatting:
+
+ /* Message Mode: the user needs custom Tags */
+ if (mode == MODE_MESSAGE) {
+#ifdef FLB_HAVE_RECORD_ACCESSOR
+ ret = flb_forward_format_message_mode(ctx, fc, ff,
+ tag, tag_len,
+ data, bytes,
+ out_buf, out_size);
+#endif
+ }
+ else if (mode == MODE_FORWARD) {
+ ret = flb_forward_format_forward_mode(ctx, fc, ff,
+ event_type,
+ tag, tag_len,
+ data, bytes,
+ out_buf, out_size);
+ }
+ else if (mode == MODE_FORWARD_COMPAT) {
+ ret = flb_forward_format_forward_compat_mode(ctx, fc, ff,
+ tag, tag_len,
+ data, bytes,
+ out_buf, out_size);
+ }
+
+ if (ret == -1) {
+ return -1;
+ }
+
+ return mode;
+}
diff --git a/src/fluent-bit/plugins/out_forward/forward_format.h b/src/fluent-bit/plugins/out_forward/forward_format.h
new file mode 100644
index 000000000..bc6c47349
--- /dev/null
+++ b/src/fluent-bit/plugins/out_forward/forward_format.h
@@ -0,0 +1,48 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_FORWARD_FORMAT_H
+#define FLB_OUT_FORWARD_FORMAT_H
+
+#include <fluent-bit/flb_output_plugin.h>
+#include "forward.h"
+
+void flb_forward_format_bin_to_hex(uint8_t *buf, size_t len, char *out);
+
+int flb_forward_format_append_tag(struct flb_forward *ctx,
+ struct flb_forward_config *fc,
+ msgpack_packer *mp_pck,
+ msgpack_object *map,
+ const char *tag, int tag_len);
+
+int flb_forward_format(struct flb_config *config,
+ struct flb_input_instance *ins,
+ void *ins_ctx,
+ void *flush_ctx,
+ int event_type,
+ const char *tag, int tag_len,
+ const void *data, size_t bytes,
+ void **out_buf, size_t *out_size);
+
+int flb_forward_format_transcode(
+ struct flb_forward *ctx, int format,
+ char *input_buffer, size_t input_length,
+ char **output_buffer, size_t *output_length);
+
+#endif