summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_io.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_io.c
parentInitial commit. (diff)
downloadnetdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz
netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_io.c')
-rw-r--r--fluent-bit/src/flb_io.c749
1 files changed, 749 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_io.c b/fluent-bit/src/flb_io.c
new file mode 100644
index 00000000..81303459
--- /dev/null
+++ b/fluent-bit/src/flb_io.c
@@ -0,0 +1,749 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * FLB_IO
+ * ======
+ * This interface is used by the output plugins which needs to write over
+ * the network in plain communication or through the TLS support. When dealing
+ * with network operation there are a few things to keep in mind:
+ *
+ * - TCP hosts can be down.
+ * - Network can be slow.
+ * - If the amount of data to flush requires multiple 'write' operations, we
+ * should not block the main thread, instead use event-driven mechanism to
+ * write when is possible.
+ *
+ * Output plugins that flag themselves with FLB_OUTPUT_TCP or FLB_OUTPUT_TLS
+ * can take advantage of this interface.
+ *
+ * The workflow to use this is the following:
+ *
+ * - A connection and data flow requires an flb_io_upstream context.
+ * - We write/read data through the flb_io_write()/flb_io_read() interfaces.
+ *
+ * Note that Upstreams context may define how network operations will work,
+ * basically synchronous or asynchronous (non-blocking).
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <assert.h>
+
+#include <monkey/mk_core.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_io.h>
+#include <fluent-bit/tls/flb_tls.h>
+#include <fluent-bit/flb_socket.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_downstream.h>
+
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_engine.h>
+#include <fluent-bit/flb_coro.h>
+#include <fluent-bit/flb_http_client.h>
+
+int flb_io_net_accept(struct flb_connection *connection,
+ struct flb_coro *coro)
+{
+ int ret;
+
+ if (connection->fd != FLB_INVALID_SOCKET) {
+ flb_socket_close(connection->fd);
+
+ connection->fd = FLB_INVALID_SOCKET;
+ connection->event.fd = FLB_INVALID_SOCKET;
+ }
+
+ /* Accept the new connection */
+ connection->fd = flb_net_accept(connection->downstream->server_fd);
+
+ if (connection->fd == -1) {
+ connection->fd = FLB_INVALID_SOCKET;
+
+ return -1;
+ }
+
+#ifdef FLB_HAVE_TLS
+ /* Check if TLS was enabled, if so perform the handshake */
+ if (flb_stream_is_secure(connection->stream) &&
+ connection->stream->tls_context != NULL) {
+ ret = flb_tls_session_create(connection->stream->tls_context,
+ connection,
+ coro);
+
+ if (ret != 0) {
+ return -1;
+ }
+ }
+#endif
+
+ flb_trace("[io] connection OK");
+
+ return 0;
+}
+
+int flb_io_net_connect(struct flb_connection *connection,
+ struct flb_coro *coro)
+{
+ int ret;
+ int async = FLB_FALSE;
+ flb_sockfd_t fd = -1;
+ // struct flb_upstream *u = u_conn->u;
+
+ if (connection->fd > 0) {
+ flb_socket_close(connection->fd);
+
+ connection->fd = -1;
+ connection->event.fd = -1;
+ }
+
+ /* Check which connection mode must be done */
+ if (coro) {
+ async = flb_upstream_is_async(connection->upstream);
+ }
+ else {
+ async = FLB_FALSE;
+ }
+
+ /* Perform TCP connection */
+ fd = flb_net_tcp_connect(connection->upstream->tcp_host,
+ connection->upstream->tcp_port,
+ connection->stream->net.source_address,
+ connection->stream->net.connect_timeout,
+ async, coro, connection);
+ if (fd == -1) {
+ return -1;
+ }
+
+ if (connection->upstream->proxied_host) {
+ ret = flb_http_client_proxy_connect(connection);
+
+ if (ret == -1) {
+ flb_debug("[http_client] flb_http_client_proxy_connect connection #%i failed to %s:%i.",
+ connection->fd,
+ connection->upstream->tcp_host,
+ connection->upstream->tcp_port);
+
+ flb_socket_close(fd);
+
+ return -1;
+ }
+ flb_debug("[http_client] flb_http_client_proxy_connect connection #%i connected to %s:%i.",
+ connection->fd,
+ connection->upstream->tcp_host,
+ connection->upstream->tcp_port);
+ }
+
+#ifdef FLB_HAVE_TLS
+ /* Check if TLS was enabled, if so perform the handshake */
+ if (flb_stream_is_secure(connection->stream) &&
+ connection->stream->tls_context != NULL) {
+ ret = flb_tls_session_create(connection->stream->tls_context,
+ connection,
+ coro);
+
+ if (ret != 0) {
+ return -1;
+ }
+ }
+#endif
+
+ flb_trace("[io] connection OK");
+
+ return 0;
+}
+
+static void net_io_propagate_critical_error(
+ struct flb_connection *connection)
+{
+ switch (errno) {
+ case EBADF:
+ case ECONNRESET:
+ case EDESTADDRREQ:
+ case ENOTCONN:
+ case EPIPE:
+ case EACCES:
+ case ENOTTY:
+ case ENETDOWN:
+ case ENETUNREACH:
+ connection->net_error = errno;
+ }
+}
+
+static int fd_io_write(int fd, struct sockaddr_storage *address,
+ const void *data, size_t len, size_t *out_len);
+static int net_io_write(struct flb_connection *connection,
+ const void *data, size_t len, size_t *out_len)
+{
+ struct sockaddr_storage *address;
+ int ret;
+
+ if (connection->fd <= 0) {
+ if (connection->type != FLB_UPSTREAM_CONNECTION) {
+ return -1;
+ }
+
+ ret = flb_io_net_connect((struct flb_connection *) connection,
+ flb_coro_get());
+
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ address = NULL;
+
+ if (connection->type == FLB_DOWNSTREAM_CONNECTION) {
+ if (connection->stream->transport == FLB_TRANSPORT_UDP ||
+ connection->stream->transport == FLB_TRANSPORT_UNIX_DGRAM) {
+ address = &connection->raw_remote_host;
+ }
+ }
+
+ ret = fd_io_write(connection->fd, address, data, len, out_len);
+
+ if (ret == -1) {
+ net_io_propagate_critical_error(connection);
+ }
+
+ return ret;
+}
+
+static int fd_io_write(int fd, struct sockaddr_storage *address,
+ const void *data, size_t len, size_t *out_len)
+{
+ int ret;
+ int tries = 0;
+ size_t total = 0;
+
+ while (total < len) {
+ if (address != NULL) {
+ ret = sendto(fd, (char *) data + total, len - total, 0,
+ (struct sockaddr *) address,
+ flb_network_address_size(address));
+ }
+ else {
+ ret = send(fd, (char *) data + total, len - total, 0);
+ }
+
+ if (ret == -1) {
+ if (FLB_WOULDBLOCK()) {
+ /*
+ * FIXME: for now we are handling this in a very lazy way,
+ * just sleep for a second and retry (for a max of 30 tries).
+ */
+ sleep(1);
+ tries++;
+
+ if (tries == 30) {
+ /* Since we're aborting after 30 failures we want the
+ * caller to know how much data we were able to send
+ */
+
+ *out_len = total;
+
+ return -1;
+ }
+
+ continue;
+ }
+
+ return -1;
+ }
+
+ tries = 0;
+ total += ret;
+ }
+
+ *out_len = total;
+
+ return total;
+}
+
+static FLB_INLINE void net_io_backup_event(struct flb_connection *connection,
+ struct mk_event *backup)
+{
+ if (connection != NULL && backup != NULL) {
+ memcpy(backup, &connection->event, sizeof(struct mk_event));
+ }
+}
+
+static FLB_INLINE void net_io_restore_event(struct flb_connection *connection,
+ struct mk_event *backup)
+{
+ int result;
+
+ if (connection != NULL && backup != NULL) {
+ if (MK_EVENT_IS_REGISTERED((&connection->event))) {
+ result = mk_event_del(connection->evl, &connection->event);
+
+ assert(result == 0);
+ }
+
+ if (MK_EVENT_IS_REGISTERED(backup)) {
+ connection->event.priority = backup->priority;
+ connection->event.handler = backup->handler;
+
+ result = mk_event_add(connection->evl,
+ connection->fd,
+ backup->type,
+ backup->mask,
+ &connection->event);
+
+ assert(result == 0);
+ }
+ }
+}
+
+/*
+ * Perform Async socket write(2) operations. This function depends on a main
+ * event-loop and the co-routines interface to yield/resume once sockets are
+ * ready to continue.
+ *
+ * Intentionally we register/de-register the socket file descriptor from
+ * the event loop each time when we require to do some work.
+ */
+static FLB_INLINE int net_io_write_async(struct flb_coro *co,
+ struct flb_connection *connection,
+ const void *data, size_t len, size_t *out_len)
+{
+ int ret = 0;
+ int error;
+ uint32_t mask;
+ ssize_t bytes;
+ size_t total = 0;
+ size_t to_send;
+ char so_error_buf[256];
+ struct mk_event event_backup;
+ int event_restore_needed;
+
+ event_restore_needed = FLB_FALSE;
+
+ net_io_backup_event(connection, &event_backup);
+
+retry:
+ error = 0;
+
+ if (len - total > 524288) {
+ to_send = 524288;
+ }
+ else {
+ to_send = (len - total);
+ }
+
+ bytes = send(connection->fd, (char *) data + total, to_send, 0);
+
+#ifdef FLB_HAVE_TRACE
+ if (bytes > 0) {
+ flb_trace("[io coro=%p] [fd %i] write_async(2)=%d (%lu/%lu)",
+ co, connection->fd, bytes, total + bytes, len);
+ }
+ else {
+ flb_trace("[io coro=%p] [fd %i] write_async(2)=%d (%lu/%lu)",
+ co, connection->fd, bytes, total, len);
+ }
+#endif
+
+ if (bytes == -1) {
+ if (FLB_WOULDBLOCK()) {
+ event_restore_needed = FLB_TRUE;
+
+ ret = mk_event_add(connection->evl,
+ connection->fd,
+ FLB_ENGINE_EV_THREAD,
+ MK_EVENT_WRITE,
+ &connection->event);
+
+ connection->event.priority = FLB_ENGINE_PRIORITY_SEND_RECV;
+
+ if (ret == -1) {
+ /*
+ * If we failed here there no much that we can do, just
+ * let the caller we failed
+ */
+ *out_len = total;
+
+ net_io_restore_event(connection, &event_backup);
+
+ return -1;
+ }
+
+ connection->coroutine = co;
+
+ /*
+ * Return the control to the parent caller, we need to wait for
+ * the event loop to get back to us.
+ */
+ flb_coro_yield(co, FLB_FALSE);
+
+ /* We want this field to hold NULL at all times unless we are explicitly
+ * waiting to be resumed.
+ */
+ connection->coroutine = NULL;
+
+ /* Save events mask since mk_event_del() will reset it */
+ mask = connection->event.mask;
+
+ /* We got a notification, remove the event registered */
+ ret = mk_event_del(connection->evl, &connection->event);
+
+ if (ret == -1) {
+ *out_len = total;
+
+ net_io_restore_event(connection, &event_backup);
+
+ return -1;
+ }
+
+ /* Check the connection status */
+ if (mask & MK_EVENT_WRITE) {
+ error = flb_socket_error(connection->fd);
+
+ if (error != 0) {
+ /* Connection is broken, not much to do here */
+ strerror_r(error, so_error_buf, sizeof(so_error_buf) - 1);
+
+ flb_error("[io fd=%i] error sending data to: %s (%s)",
+ connection->fd,
+ flb_connection_get_remote_address(connection),
+ so_error_buf);
+
+ *out_len = total;
+
+ net_io_restore_event(connection, &event_backup);
+
+ return -1;
+ }
+
+ MK_EVENT_NEW(&connection->event);
+
+ goto retry;
+ }
+ else {
+ *out_len = total;
+
+ net_io_restore_event(connection, &event_backup);
+
+ return -1;
+ }
+
+ }
+ else {
+ *out_len = total;
+
+ net_io_restore_event(connection, &event_backup);
+ net_io_propagate_critical_error(connection);
+
+ return -1;
+ }
+ }
+
+ /* Update counters */
+ total += bytes;
+ if (total < len) {
+ if ((connection->event.mask & MK_EVENT_WRITE) == 0) {
+ ret = mk_event_add(connection->evl,
+ connection->fd,
+ FLB_ENGINE_EV_THREAD,
+ MK_EVENT_WRITE,
+ &connection->event);
+
+ connection->event.priority = FLB_ENGINE_PRIORITY_SEND_RECV;
+
+ if (ret == -1) {
+ /*
+ * If we failed here there no much that we can do, just
+ * let the caller we failed
+ */
+ *out_len = total;
+
+ net_io_restore_event(connection, &event_backup);
+
+ return -1;
+ }
+ }
+
+ connection->coroutine = co;
+
+ flb_coro_yield(co, MK_FALSE);
+
+ /* We want this field to hold NULL at all times unless we are explicitly
+ * waiting to be resumed.
+ */
+ connection->coroutine = NULL;
+
+ goto retry;
+ }
+
+ if (event_restore_needed) {
+ /* If we enter here it means we registered this connection
+ * in the event loop, in which case we need to unregister it
+ * and restore the original registration if there was one.
+ *
+ * We do it conditionally because in those cases in which
+ * send succeeds on the first try we don't touch the event
+ * and it wouldn't make sense to unregister and register for
+ * the same event.
+ */
+
+ net_io_restore_event(connection, &event_backup);
+ }
+
+ *out_len = total;
+
+ return bytes;
+}
+
+static ssize_t fd_io_read(int fd, struct sockaddr_storage *address,
+ void *buf, size_t len);
+static ssize_t net_io_read(struct flb_connection *connection,
+ void *buf, size_t len)
+{
+ struct sockaddr_storage *address;
+ int ret;
+
+ address = NULL;
+
+ if (connection->type == FLB_DOWNSTREAM_CONNECTION) {
+ if (connection->stream->transport == FLB_TRANSPORT_UDP ||
+ connection->stream->transport == FLB_TRANSPORT_UNIX_DGRAM) {
+ address = &connection->raw_remote_host;
+ }
+ }
+
+ ret = fd_io_read(connection->fd, address, buf, len);
+
+ if (ret == -1) {
+ ret = FLB_WOULDBLOCK();
+ if (ret) {
+ /* timeout caused error */
+ flb_warn("[net] sync io_read #%i timeout after %i seconds from: %s",
+ connection->fd,
+ connection->net->io_timeout,
+ flb_connection_get_remote_address(connection));
+ }
+ else {
+ net_io_propagate_critical_error(connection);
+ }
+
+ return -1;
+ }
+
+ return ret;
+}
+
+static ssize_t fd_io_read(int fd, struct sockaddr_storage *address,
+ void *buf, size_t len)
+{
+ socklen_t address_size;
+ int ret;
+
+ if (address != NULL) {
+ address_size = sizeof(struct sockaddr_storage);
+ ret = recvfrom(fd, buf, len, 0,
+ (struct sockaddr *) address,
+ &address_size);
+ }
+ else {
+ ret = recv(fd, buf, len, 0);
+ }
+
+ if (ret == -1) {
+ return -1;
+ }
+
+ return ret;
+}
+
+static FLB_INLINE ssize_t net_io_read_async(struct flb_coro *co,
+ struct flb_connection *connection,
+ void *buf, size_t len)
+{
+ struct mk_event event_backup;
+ int event_restore_needed;
+ int ret;
+
+ event_restore_needed = FLB_FALSE;
+
+ net_io_backup_event(connection, &event_backup);
+
+ retry_read:
+ ret = recv(connection->fd, buf, len, 0);
+
+ if (ret == -1) {
+ if (FLB_WOULDBLOCK()) {
+ event_restore_needed = FLB_TRUE;
+
+ ret = mk_event_add(connection->evl,
+ connection->fd,
+ FLB_ENGINE_EV_THREAD,
+ MK_EVENT_READ,
+ &connection->event);
+
+ connection->event.priority = FLB_ENGINE_PRIORITY_SEND_RECV;
+
+ if (ret == -1) {
+ /*
+ * If we failed here there no much that we can do, just
+ * let the caller we failed
+ */
+ net_io_restore_event(connection, &event_backup);
+
+ return -1;
+ }
+
+ connection->coroutine = co;
+
+ flb_coro_yield(co, MK_FALSE);
+
+ /* We want this field to hold NULL at all times unless we are explicitly
+ * waiting to be resumed.
+ */
+ connection->coroutine = NULL;
+
+ goto retry_read;
+ }
+ else {
+ net_io_propagate_critical_error(connection);
+ }
+
+ ret = -1;
+ }
+ else if (ret <= 0) {
+ ret = -1;
+ }
+
+ if (event_restore_needed) {
+ /* If we enter here it means we registered this connection
+ * in the event loop, in which case we need to unregister it
+ * and restore the original registration if there was one.
+ *
+ * We do it conditionally because in those cases in which
+ * send succeeds on the first try we don't touch the event
+ * and it wouldn't make sense to unregister and register for
+ * the same event.
+ */
+
+ net_io_restore_event(connection, &event_backup);
+ }
+
+ return ret;
+}
+
+/* Write data to fd. For unix socket. */
+int flb_io_fd_write(int fd, const void *data, size_t len, size_t *out_len)
+{
+ /* TODO: support async mode */
+ return fd_io_write(fd, NULL, data, len, out_len);
+}
+
+/* Write data to an upstream connection/server */
+int flb_io_net_write(struct flb_connection *connection, const void *data,
+ size_t len, size_t *out_len)
+{
+ int flags;
+ struct flb_coro *coro;
+ int ret;
+
+ ret = -1;
+ coro = flb_coro_get();
+ flags = flb_connection_get_flags(connection);
+
+ flb_trace("[io coro=%p] [net_write] trying %zd bytes", coro, len);
+
+ if (connection->tls_session == NULL) {
+ if (flags & FLB_IO_ASYNC) {
+ ret = net_io_write_async(coro, connection, data, len, out_len);
+ }
+ else {
+ ret = net_io_write(connection, data, len, out_len);
+ }
+ }
+#ifdef FLB_HAVE_TLS
+ else if (flags & FLB_IO_TLS) {
+ if (flags & FLB_IO_ASYNC) {
+ ret = flb_tls_net_write_async(coro, connection->tls_session, data, len, out_len);
+ }
+ else {
+ ret = flb_tls_net_write(connection->tls_session, data, len, out_len);
+ }
+ }
+#endif
+
+ if (ret > 0) {
+ flb_connection_reset_io_timeout(connection);
+ }
+
+ flb_trace("[io coro=%p] [net_write] ret=%i total=%lu/%lu",
+ coro, ret, *out_len, len);
+
+ return ret;
+}
+
+ssize_t flb_io_fd_read(int fd, void *buf, size_t len)
+{
+ /* TODO: support async mode */
+ return fd_io_read(fd, NULL, buf, len);
+}
+
+ssize_t flb_io_net_read(struct flb_connection *connection, void *buf, size_t len)
+{
+ int ret;
+ int flags;
+ struct flb_coro *coro;
+
+ ret = -1;
+ coro = flb_coro_get();
+
+ flb_trace("[io coro=%p] [net_read] try up to %zd bytes", coro, len);
+
+ flags = flb_connection_get_flags(connection);
+
+ if (!connection->tls_session) {
+ if (flags & FLB_IO_ASYNC) {
+ ret = net_io_read_async(coro, connection, buf, len);
+ }
+ else {
+ ret = net_io_read(connection, buf, len);
+ }
+ }
+#ifdef FLB_HAVE_TLS
+ else if (flags & FLB_IO_TLS) {
+ if (flags & FLB_IO_ASYNC) {
+ ret = flb_tls_net_read_async(coro, connection->tls_session, buf, len);
+ }
+ else {
+ ret = flb_tls_net_read(connection->tls_session, buf, len);
+ }
+ }
+#endif
+
+ if (ret > 0) {
+ flb_connection_reset_io_timeout(connection);
+ }
+
+ flb_trace("[io coro=%p] [net_read] ret=%i", coro, ret);
+
+ return ret;
+}