diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_network.c | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_network.c')
-rw-r--r-- | fluent-bit/src/flb_network.c | 2168 |
1 files changed, 2168 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_network.c b/fluent-bit/src/flb_network.c new file mode 100644 index 00000000..9609e5a0 --- /dev/null +++ b/fluent-bit/src/flb_network.c @@ -0,0 +1,2168 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <fcntl.h> +#include <errno.h> +#include <ctype.h> + +#ifdef FLB_SYSTEM_WINDOWS +#define poll WSAPoll +#else +#include <sys/poll.h> +#endif + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_socket.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_network.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_macros.h> +#include <fluent-bit/flb_upstream.h> +#include <fluent-bit/flb_scheduler.h> + +#include <monkey/mk_core.h> +#include <ares.h> + +#ifndef SOL_TCP +#define SOL_TCP IPPROTO_TCP +#endif + +static pthread_once_t local_thread_net_dns_ctx_init = PTHREAD_ONCE_INIT; +FLB_TLS_DEFINE(struct flb_net_dns, flb_net_dns_ctx); + +/* + * Initialize thread-local-storage, every worker thread has it owns + * dns context with relevant info populated inside the thread. + */ + +static void flb_net_dns_ctx_init_private() +{ + FLB_TLS_INIT(flb_net_dns_ctx); +} + +void flb_net_dns_ctx_init() +{ + pthread_once(&local_thread_net_dns_ctx_init, flb_net_dns_ctx_init_private); +} + +struct flb_net_dns *flb_net_dns_ctx_get() +{ + return FLB_TLS_GET(flb_net_dns_ctx); +} + +void flb_net_dns_ctx_set(struct flb_net_dns *dns_ctx) +{ + FLB_TLS_SET(flb_net_dns_ctx, dns_ctx); +} + +void flb_net_lib_init() +{ + int result; + + result = ares_library_init_mem(ARES_LIB_INIT_ALL, flb_malloc, flb_free, flb_realloc); + + if(0 != result) { + flb_error("[network] c-ares memory settings initialization error : %s", + ares_strerror(result)); + } +} + +void flb_net_ctx_init(struct flb_net_dns *dns_ctx) +{ + mk_list_init(&dns_ctx->lookups); + mk_list_init(&dns_ctx->lookups_drop); +} + +void flb_net_setup_init(struct flb_net_setup *net) +{ + net->dns_mode = NULL; + net->dns_resolver = NULL; + net->dns_prefer_ipv4 = FLB_FALSE; + net->keepalive = FLB_TRUE; + net->keepalive_idle_timeout = 30; + net->keepalive_max_recycle = 0; + net->accept_timeout = 10; + net->connect_timeout = 10; + net->io_timeout = 0; /* Infinite time */ + net->source_address = NULL; +} + +int flb_net_host_set(const char *plugin_name, struct flb_net_host *host, const char *address) +{ + int len; + int olen; + const char *s, *e, *u; + + memset(host, '\0', sizeof(struct flb_net_host)); + + olen = strlen(address); + if (olen == strlen(plugin_name)) { + return 0; + } + + len = strlen(plugin_name) + 3; + if (olen < len) { + return -1; + } + + s = address + len; + if (*s == '[') { + /* IPv6 address (RFC 3986) */ + e = strchr(++s, ']'); + if (!e) { + return -1; + } + host->name = flb_sds_create_len(s, e - s); + host->ipv6 = FLB_TRUE; + s = e + 1; + } + else { + e = s; + while (!(*e == '\0' || *e == ':' || *e == '/')) { + ++e; + } + if (e == s) { + return -1; + } + host->name = flb_sds_create_len(s, e - s); + s = e; + } + + if (*s == ':') { + host->port = atoi(++s); + } + + u = strchr(s, '/'); + if (u) { + host->uri = flb_uri_create(u); + } + host->address = flb_sds_create(address); + + if (host->name) { + host->listen = flb_sds_create(host->name); + } + + return 0; +} + +int flb_net_socket_reset(flb_sockfd_t fd) +{ + int status = 1; + + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &status, sizeof(int)) == -1) { + flb_errno(); + return -1; + } + + return 0; +} + +int flb_net_socket_tcp_nodelay(flb_sockfd_t fd) +{ + int on = 1; + int ret; + + ret = setsockopt(fd, SOL_TCP, TCP_NODELAY, &on, sizeof(on)); + if (ret == -1) { + flb_errno(); + return -1; + } + + return 0; +} + +int flb_net_socket_nonblocking(flb_sockfd_t fd) +{ +#ifdef _WIN32 + unsigned long on = 1; + if (ioctlsocket(fd, FIONBIO, &on) != 0) { +#else + if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) == -1) { +#endif + flb_errno(); + return -1; + } + + return 0; +} + +int flb_net_socket_rcv_buffer(flb_sockfd_t fd, int rcvbuf) +{ + if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) != 0) { + flb_errno(); + return -1; + } + + return 0; +} + +int flb_net_socket_blocking(flb_sockfd_t fd) +{ +#ifdef _WIN32 + unsigned long off = 0; + if (ioctlsocket(fd, FIONBIO, &off) != 0) { +#else + if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK) == -1) { +#endif + flb_errno(); + return -1; + } + + return 0; +} + +int flb_net_socket_set_rcvtimeout(flb_sockfd_t fd, int timeout_in_seconds) +{ +#ifdef FLB_SYSTEM_WINDOWS + /* WINDOWS */ + DWORD timeout = timeout_in_seconds * 1000; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof timeout) + == -1) { +#else + /* LINUX and MAC OS X */ + struct timeval tv; + tv.tv_sec = timeout_in_seconds; + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv) == -1) { +#endif + flb_errno(); + return -1; + } + + return 0; +} + +/* + * Enable the TCP_FASTOPEN feature for server side implemented in Linux Kernel >= 3.7, + * for more details read here: + * + * TCP Fast Open: expediting web services: http://lwn.net/Articles/508865/ + */ +int flb_net_socket_tcp_fastopen(flb_sockfd_t fd) +{ + int qlen = 5; + return setsockopt(fd, SOL_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen)); +} + +flb_sockfd_t flb_net_socket_create(int family, int nonblock) +{ + flb_sockfd_t fd; + + /* create the socket and set the nonblocking flag status */ + fd = socket(family, SOCK_STREAM, 0); + if (fd == -1) { + flb_errno(); + return -1; + } + + if (nonblock) { + flb_net_socket_nonblocking(fd); + } + + return fd; +} + +flb_sockfd_t flb_net_socket_create_udp(int family, int nonblock) +{ + flb_sockfd_t fd; + + /* create the socket and set the nonblocking flag status */ + fd = socket(family, SOCK_DGRAM, 0); + if (fd == -1) { + flb_errno(); + return -1; + } + + if (nonblock) { + flb_net_socket_nonblocking(fd); + } + + return fd; +} + +/* + * Perform TCP connection for a blocking socket. This interface set's the socket + * to non-blocking mode temporary in order to add a timeout to the connection, + * the blocking mode is restored at the end. + */ +static int net_connect_sync(int fd, const struct sockaddr *addr, socklen_t addrlen, + char *host, int port, int connect_timeout) +{ + int ret; + int err; + int socket_errno; + struct pollfd pfd_read; + + /* Set socket to non-blocking mode */ + flb_net_socket_nonblocking(fd); + + /* connect(2) */ + ret = connect(fd, addr, addrlen); + if (ret == -1) { + /* + * An asynchronous connect can return -1, but what is important is the + * socket status, getting a EINPROGRESS is expected, but any other case + * means a failure. + */ +#ifdef FLB_SYSTEM_WINDOWS + socket_errno = flb_socket_error(fd); + err = 0; +#else + socket_errno = errno; + err = flb_socket_error(fd); +#endif + + if (!FLB_EINPROGRESS(socket_errno) || err != 0) { + goto exit_error; + } + + /* The connection is still in progress, implement a socket timeout */ + flb_trace("[net] connection #%i in process to %s:%i", + fd, host, port); + + /* + * Prepare a timeout using poll(2): we could use our own + * event loop mechanism for this, but it will require an + * extra file descriptor, the poll(2) call is straightforward + * for this use case. + */ + + pfd_read.fd = fd; + pfd_read.events = POLLOUT; + ret = poll(&pfd_read, 1, connect_timeout * 1000); + if (ret == 0) { + /* Timeout */ + flb_error("[net] connection #%i timeout after %i seconds to: " + "%s:%i", + fd, connect_timeout, host, port); + goto exit_error; + } + else if (ret < 0) { + /* Generic error */ + flb_errno(); + flb_error("[net] connection #%i failed to: %s:%i", + fd, host, port); + goto exit_error; + } + } + + /* + * No exception, the connection succeeded, return the normal + * non-blocking mode to the socket. + */ + flb_net_socket_blocking(fd); + return 0; + + exit_error: + flb_net_socket_blocking(fd); + return -1; +} + + +/* + * Asynchronous socket connection: this interface might be called from a co-routine, + * so in order to perform a real async connection and get notified back, it needs + * access to the event loop context and the connection context 'upstream connection. + */ +static int net_connect_async(int fd, + const struct sockaddr *addr, socklen_t addrlen, + char *host, int port, int connect_timeout, + void *async_ctx, struct flb_connection *u_conn) +{ + int ret; + int err; + int error = 0; + int socket_errno; + uint32_t mask; + char so_error_buf[256]; + char *str; + struct flb_upstream *u; + + u = u_conn->upstream; + + /* connect(2) */ + ret = connect(fd, addr, addrlen); + if (ret == 0) { + return 0; + } + + /* + * An asynchronous connect can return -1, but what is important is the + * socket status, getting a EINPROGRESS is expected, but any other case + * means a failure. + */ +#ifdef FLB_SYSTEM_WINDOWS + socket_errno = flb_socket_error(fd); + err = 0; +#else + socket_errno = errno; + err = flb_socket_error(fd); +#endif + /* The logic behind this check is that when establishing a connection + * errno should be EINPROGRESS with no additional information in order + * for it to be a healthy attempt. However, when errno is EINPROGRESS + * and an error occurs it could be saved in the so_error socket field + * which has to be accessed through getsockopt(... SO_ERROR ...) so + * in order to preserve that behavior while also properly detecting + * other errno values as error conditions the comparison was changed. + * + * Windows note : flb_socket_error returns either the value returned + * by WSAGetLastError or the value returned by getsockopt(... SO_ERROR ...) + * if WSAGetLastError returns WSAEWOULDBLOCK as per libevents code. + * + * General note : according to the connect syscall man page (not libc) + * there could be a timing issue with checking SO_ERROR here because + * the suggested use involves checking it after a select or poll call + * returns the socket as writable which is not the case here. + */ + + if (!FLB_EINPROGRESS(socket_errno) || err != 0) { + return -1; + } + + /* The connection is still in progress, implement a socket timeout */ + flb_trace("[net] connection #%i in process to %s:%i", + fd, host, port); + + /* Register the connection socket into the main event loop */ + MK_EVENT_ZERO(&u_conn->event); + + ret = mk_event_add(u_conn->evl, + fd, + FLB_ENGINE_EV_THREAD, + MK_EVENT_WRITE, + &u_conn->event); + + u_conn->event.priority = FLB_ENGINE_PRIORITY_CONNECT; + + if (ret == -1) { + /* + * If we failed here there no much that we can do, just + * let the caller know that we failed. + */ + return -1; + } + + u_conn->coroutine = async_ctx; + + /* + * Return the control to the parent caller, we need to wait for + * the event loop to get back to us. + */ + flb_coro_yield(async_ctx, FLB_FALSE); + + /* We want this field to hold NULL at all times unless we are explicitly + * waiting to be resumed. + */ + u_conn->coroutine = NULL; + + /* Save the mask before the event handler do a reset */ + mask = u_conn->event.mask; + + /* + * If the socket has been invalidated (e.g: timeout or shutdown), just + * print a debug message and return. + */ + if (u_conn->fd == -1) { + flb_debug("[net] TCP connection not longer available: %s:%i", + u->tcp_host, u->tcp_port); + return -1; + } + + /* We got a notification, remove the event registered */ + ret = mk_event_del(u_conn->evl, &u_conn->event); + if (ret == -1) { + flb_error("[io] connect event handler error"); + return -1; + } + + if (u_conn->net_error == ETIMEDOUT) { + flb_debug("[net] TCP connection timed out: %s:%i", + u->tcp_host, u->tcp_port); + return -1; + } + + /* Check the connection status */ + if (mask & MK_EVENT_WRITE) { + error = flb_socket_error(u_conn->fd); + + /* Check the exception */ + if (error != 0) { + /* + * The upstream connection might want to override the + * exception (mostly used for local timeouts: ETIMEDOUT. + */ + if (u_conn->net_error > 0) { + error = u_conn->net_error; + } + + /* Connection is broken, not much to do here */ + str = strerror_r(error, so_error_buf, sizeof(so_error_buf)); + flb_error("[net] TCP connection failed: %s:%i (%s)", + u->tcp_host, u->tcp_port, str); + return -1; + } + } + else { + flb_error("[net] TCP connection, unexpected error: %s:%i", + u->tcp_host, u->tcp_port); + return -1; + } + + return 0; +} + +static void flb_net_dns_lookup_context_destroy(struct flb_dns_lookup_context *lookup_context) +{ + mk_list_del(&lookup_context->_head); + ares_destroy(lookup_context->ares_channel); + flb_free(lookup_context); +} + +static void flb_net_dns_lookup_context_drop(struct flb_dns_lookup_context *lookup_context) +{ + if (!lookup_context->dropped) { + lookup_context->dropped = FLB_TRUE; + + mk_list_del(&lookup_context->_head); + mk_list_add(&lookup_context->_head, &lookup_context->dns_ctx->lookups_drop); + + if (lookup_context->udp_timer != NULL && + lookup_context->udp_timer->active) { + flb_sched_timer_invalidate(lookup_context->udp_timer); + + lookup_context->udp_timer = NULL; + } + } +} + +void flb_net_dns_lookup_context_cleanup(struct flb_net_dns *dns_ctx) +{ + struct flb_dns_lookup_context *lookup_context; + struct flb_coro *coroutine; + struct mk_list *head; + struct mk_list *tmp; + + mk_list_foreach_safe(head, tmp, &dns_ctx->lookups_drop) { + lookup_context = mk_list_entry(head, struct flb_dns_lookup_context, _head); + + coroutine = lookup_context->coroutine; + + flb_net_dns_lookup_context_destroy(lookup_context); + + if (coroutine != NULL) { + flb_coro_resume(coroutine); + } + } +} + +static void flb_net_free_translated_addrinfo(struct addrinfo *input) +{ + struct addrinfo *current_record; + struct addrinfo *next_record; + + if (input != NULL) { + next_record = NULL; + + for (current_record = input ; + current_record != NULL ; + current_record = next_record) { + + if (current_record->ai_addr != NULL) { + flb_free(current_record->ai_addr); + } + + next_record = current_record->ai_next; + + flb_free(current_record); + } + } +} + +static void flb_net_append_addrinfo_entry(struct addrinfo **head, + struct addrinfo **tail, + struct addrinfo *entry) +{ + if (*head == NULL) { + *head = entry; + } + else { + (*tail)->ai_next = entry; + } + + *tail = entry; +} + +static struct addrinfo *flb_net_sort_addrinfo_list(struct addrinfo *input, + int preferred_family) +{ + struct addrinfo *preferred_results_head; + struct addrinfo *remainder_results_head; + struct addrinfo *preferred_results_tail; + struct addrinfo *remainder_results_tail; + struct addrinfo *current_record; + struct addrinfo *next_record; + + remainder_results_head = NULL; + preferred_results_head = NULL; + remainder_results_tail = NULL; + preferred_results_tail = NULL; + current_record = NULL; + next_record = NULL; + + for (current_record = input ; + current_record != NULL ; + current_record = next_record) { + next_record = current_record->ai_next; + current_record->ai_next = NULL; + + if (preferred_family == current_record->ai_family) { + flb_net_append_addrinfo_entry(&preferred_results_head, + &preferred_results_tail, + current_record); + } + else + { + flb_net_append_addrinfo_entry(&remainder_results_head, + &remainder_results_tail, + current_record); + } + } + + if (preferred_results_tail != NULL) { + preferred_results_tail->ai_next = remainder_results_head; + } + + if (preferred_results_head == NULL) { + return remainder_results_head; + } + + return preferred_results_head; +} + +static struct addrinfo *flb_net_translate_ares_addrinfo(struct ares_addrinfo *input) +{ + struct addrinfo *previous_output_record; + struct addrinfo *current_output_record; + struct ares_addrinfo_node *current_ares_record; + int failure_detected; + struct addrinfo *output; + + output = NULL; + failure_detected = 0; + current_output_record = NULL; + previous_output_record = NULL; + + if (input != NULL) { + for (current_ares_record = input->nodes ; + current_ares_record != NULL ; + current_ares_record = current_ares_record->ai_next) { + + current_output_record = flb_calloc(1, sizeof(struct addrinfo)); + + if (current_output_record == NULL) { + flb_errno(); + failure_detected = 1; + break; + } + + if (output == NULL) { + output = current_output_record; + } + + current_output_record->ai_flags = current_ares_record->ai_flags; + current_output_record->ai_family = current_ares_record->ai_family; + current_output_record->ai_socktype = current_ares_record->ai_socktype; + current_output_record->ai_protocol = current_ares_record->ai_protocol; + current_output_record->ai_addrlen = current_ares_record->ai_addrlen; + + current_output_record->ai_addr = flb_malloc(current_output_record->ai_addrlen); + + if (current_output_record->ai_addr == NULL) { + flb_errno(); + failure_detected = 1; + break; + } + + memcpy(current_output_record->ai_addr, + current_ares_record->ai_addr, + current_output_record->ai_addrlen); + + if (previous_output_record != NULL) { + previous_output_record->ai_next = current_output_record; + } + + previous_output_record = current_output_record; + } + } + + if (failure_detected) { + if (output != NULL) { + flb_net_free_translated_addrinfo(output); + + output = NULL; + } + } + + return output; +} + + +static void flb_net_getaddrinfo_callback(void *arg, int status, int timeouts, + struct ares_addrinfo *res) +{ + struct flb_dns_lookup_context *lookup_context; + + lookup_context = (struct flb_dns_lookup_context *) arg; + + if (lookup_context->finished || + lookup_context->dropped) { + return; + } + + if (ARES_SUCCESS == status) { + *(lookup_context->result) = flb_net_translate_ares_addrinfo(res); + + if (*(lookup_context->result) == NULL) { + /* Translation fails only when calloc fails. */ + + *(lookup_context->result_code) = ARES_ENOMEM; + } + else { + *(lookup_context->result_code) = ARES_SUCCESS; + } + + ares_freeaddrinfo(res); + } + else { + *(lookup_context->result_code) = status; + } + + lookup_context->finished = 1; +} + +static int flb_net_getaddrinfo_event_handler(void *arg) +{ + struct flb_dns_lookup_context *lookup_context; + + lookup_context = FLB_DNS_LOOKUP_CONTEXT_FOR_EVENT(arg); + + if (lookup_context->finished || + lookup_context->dropped) { + return 0; + } + + ares_process_fd(lookup_context->ares_channel, + lookup_context->response_event.fd, + lookup_context->response_event.fd); + + if (lookup_context->finished) { + flb_net_dns_lookup_context_drop(lookup_context); + } + + return 0; +} + +static void flb_net_getaddrinfo_timeout_handler(struct flb_config *config, void *data) +{ + struct flb_dns_lookup_context *lookup_context; + + (void) config; + + lookup_context = (struct flb_dns_lookup_context *) data; + + if (lookup_context->finished || + lookup_context->dropped) { + return; + } + + *(lookup_context->udp_timeout_detected) = FLB_TRUE; + lookup_context->finished = FLB_TRUE; + lookup_context->udp_timer = NULL; + + /* We deliverately set udp_timer because we don't want flb_net_dns_lookup_context_drop + * to call flb_sched_timer_invalidate on the timer which was already disabled and + * is about to be destroyed after this this callback returns. + */ + + ares_cancel(lookup_context->ares_channel); + + *(lookup_context->result_code) = ARES_ETIMEOUT; + + flb_net_dns_lookup_context_drop(lookup_context); +} + +static ares_socket_t flb_dns_ares_socket(int af, int type, int protocol, void *userdata) +{ + struct flb_dns_lookup_context *lookup_context; + int event_mask; + ares_socket_t sockfd; + int result; + + lookup_context = (struct flb_dns_lookup_context *) userdata; + + if (lookup_context->ares_socket_created) { + /* This context already had a connection established and the code is not ready + * to handle multiple connections so we abort the process. + */ + errno = EACCES; + + return -1; + } + + sockfd = socket(af, type, protocol); + + if (sockfd == -1) { + return -1; + } + + /* According to configure_socket in ares_process.c:970 if we provide our own socket + * functions we need to set the socket up ourselves but the only specific thing we + * need is for the socket to be set to non blocking mode so that's all we do here. + */ + + result = flb_net_socket_nonblocking(sockfd); + + if (result) { + flb_socket_close(sockfd); + + return -1; + } + + lookup_context->ares_socket_type = type; + lookup_context->ares_socket_created = FLB_TRUE; + + lookup_context->response_event.mask = MK_EVENT_EMPTY; + lookup_context->response_event.status = MK_EVENT_NONE; + lookup_context->response_event.data = &lookup_context->response_event; + lookup_context->response_event.handler = flb_net_getaddrinfo_event_handler; + lookup_context->response_event.fd = sockfd; + + event_mask = MK_EVENT_READ; + + if (SOCK_STREAM == type) { + event_mask |= MK_EVENT_WRITE; + } + + result = mk_event_add(lookup_context->event_loop, sockfd, FLB_ENGINE_EV_CUSTOM, + event_mask, &lookup_context->response_event); + lookup_context->response_event.priority = FLB_ENGINE_PRIORITY_DNS; + if (result) { + flb_socket_close(sockfd); + + return -1; + } + + lookup_context->response_event.type = FLB_ENGINE_EV_CUSTOM; + lookup_context->ares_socket_registered = FLB_TRUE; + + return sockfd; +} + +static int flb_dns_ares_close(ares_socket_t sockfd, void *userdata) +{ + struct flb_dns_lookup_context *lookup_context; + int result; + + lookup_context = (struct flb_dns_lookup_context *) userdata; + + if (lookup_context->ares_socket_registered) { + lookup_context->ares_socket_registered = FLB_FALSE; + + mk_event_del(lookup_context->event_loop, &lookup_context->response_event); + } + + result = flb_socket_close(sockfd); + + return result; +} + +static int flb_dns_ares_connect(ares_socket_t sockfd, const struct sockaddr *addr, + ares_socklen_t addrlen, void *userdata) +{ + return connect(sockfd, addr, addrlen); +} + +static ares_ssize_t flb_dns_ares_recvfrom(ares_socket_t sockfd, void *data, + size_t data_len, int flags, + struct sockaddr *from, ares_socklen_t *from_len, + void *userdata) +{ + return recvfrom(sockfd, data, data_len, flags, from, from_len); +} + +static ares_ssize_t flb_dns_ares_send(ares_socket_t sockfd, const struct iovec *vec, + int len, void *userdata) +{ + return writev(sockfd, vec, len); +} + +static struct flb_dns_lookup_context *flb_net_dns_lookup_context_create( + struct flb_net_dns *dns_ctx, + struct mk_event_loop *evl, + struct flb_coro *coroutine, + char dns_mode, + int *result) +{ + struct flb_dns_lookup_context *lookup_context; + int local_result; + int optmask; + struct ares_options opts = {0}; + + local_result = 0; + optmask = 0; + + if (result == NULL) { + result = &local_result; + } + + /* The initialization order here is important since it makes it easier to handle + * failures + */ + lookup_context = flb_calloc(1, sizeof(struct flb_dns_lookup_context)); + + if (!lookup_context) { + flb_errno(); + + *result = ARES_ENOMEM; + + return NULL; + } + + /* c-ares options: Set the transport layer to the desired protocol and + * the number of retries to 2 + */ + + optmask = ARES_OPT_FLAGS; + opts.tries = 2; + + if (dns_mode == FLB_DNS_USE_TCP) { + opts.flags = ARES_FLAG_USEVC; + } + + *result = ares_init_options((ares_channel *) &lookup_context->ares_channel, + &opts, optmask); + + if (*result != ARES_SUCCESS) { + flb_free(lookup_context); + + return NULL; + } + + lookup_context->ares_socket_functions.asocket = flb_dns_ares_socket; + lookup_context->ares_socket_functions.aclose = flb_dns_ares_close; + lookup_context->ares_socket_functions.aconnect = flb_dns_ares_connect; + lookup_context->ares_socket_functions.arecvfrom = flb_dns_ares_recvfrom; + lookup_context->ares_socket_functions.asendv = flb_dns_ares_send; + lookup_context->ares_socket_created = 0; + lookup_context->event_loop = evl; + lookup_context->udp_timer = NULL; + lookup_context->coroutine = coroutine; + lookup_context->finished = 0; + lookup_context->dropped = 0; + lookup_context->dns_ctx = dns_ctx; + + ares_set_socket_functions(lookup_context->ares_channel, + &lookup_context->ares_socket_functions, + lookup_context); + + *result = ARES_SUCCESS; + + mk_list_add(&lookup_context->_head, &dns_ctx->lookups); + + return lookup_context; +} + +int flb_net_getaddrinfo(const char *node, const char *service, struct addrinfo *hints, + struct addrinfo **res, char *dns_mode_textual, int timeout) +{ + int udp_timeout_detected; + struct flb_dns_lookup_context *lookup_context; + int errno_backup; + int result_code; + struct addrinfo *result_data; + struct ares_addrinfo_hints ares_hints; + struct mk_event_loop *event_loop; + struct flb_coro *coroutine; + char dns_mode; + struct flb_net_dns *dns_ctx; + int result; + struct flb_sched *sched; + + errno_backup = errno; + + dns_mode = FLB_DNS_USE_UDP; + + if (dns_mode_textual != NULL) { + dns_mode = toupper(dns_mode_textual[0]); + } + + event_loop = flb_engine_evl_get(); + assert(event_loop != NULL); + + coroutine = flb_coro_get(); + assert(coroutine != NULL); + + dns_ctx = flb_net_dns_ctx_get(); + assert(dns_ctx != NULL); + + lookup_context = flb_net_dns_lookup_context_create(dns_ctx, event_loop, coroutine, + dns_mode, &result); + + if (result != ARES_SUCCESS) { + errno = errno_backup; + return result; + } + + lookup_context->udp_timeout_detected = &udp_timeout_detected; + lookup_context->result_code = &result_code; + lookup_context->result = &result_data; + + /* We think that either the callback or the timeout handler should be executed always + * but just in case that there is a corner case we initialize result_code with an + * error code so in case none of those is invoked (which shouldn't happen) the code + * is not ARES_SUCCESS and thus cause a NULL pointer to be returned. + */ + result_code = ARES_ESERVFAIL; + result_data = NULL; + udp_timeout_detected = 0; + + /* The timeout we get is expressed in seconds so we need to convert it to + * milliseconds + */ + timeout *= 1000; + + /* We need to ensure that our timer won't overlap with the upstream timeout handler. + */ + if (timeout > 3000) { + timeout -= 1000; + } + else { + timeout -= (timeout / 3); + } + + ares_hints.ai_flags = hints->ai_flags; + ares_hints.ai_family = hints->ai_family; + ares_hints.ai_socktype = hints->ai_socktype; + ares_hints.ai_protocol = hints->ai_protocol; + + ares_getaddrinfo(lookup_context->ares_channel, node, service, &ares_hints, + flb_net_getaddrinfo_callback, lookup_context); + + if (!lookup_context->finished) { + if (lookup_context->ares_socket_created) { + if (lookup_context->ares_socket_type == SOCK_DGRAM) { + /* If the socket type created by c-ares is UDP then we need to create our + * own timeout mechanism before yielding and cancel it if things go as + * expected. + */ + + sched = flb_sched_ctx_get(); + assert(sched != NULL); + + result = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_ONESHOT, + timeout, + flb_net_getaddrinfo_timeout_handler, + lookup_context, + &lookup_context->udp_timer); + if (result == -1) { + /* Timer creation failed, it happen because of file descriptor or memory + * exhaustion (ulimits usually) + */ + + result_code = ARES_ENOMEM; + + ares_cancel(lookup_context->ares_channel); + + lookup_context->coroutine = NULL; + + flb_net_dns_lookup_context_drop(lookup_context); + } + else { + flb_coro_yield(coroutine, FLB_FALSE); + } + } + else { + flb_coro_yield(coroutine, FLB_FALSE); + } + } + else { + /* Do we want to do anything special for this condition? */ + } + } + else { + lookup_context->coroutine = NULL; + + flb_net_dns_lookup_context_drop(lookup_context); + } + + if (!result_code) { + *res = result_data; + } + + result = result_code; + errno = errno_backup; + + return result; +} + +int flb_net_bind_address(int fd, char *source_addr) +{ + int ret; + struct addrinfo hint; + struct addrinfo *res = NULL; + struct sockaddr_storage addr; + + memset(&hint, '\0', sizeof hint); + + hint.ai_family = PF_UNSPEC; + hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV | AI_PASSIVE; + + ret = getaddrinfo(source_addr, NULL, &hint, &res); + if (ret == -1) { + flb_errno(); + flb_error("[net] cannot read source_address=%s", source_addr); + return -1; + } + + /* Bind the address */ + memcpy(&addr, res->ai_addr, res->ai_addrlen); + freeaddrinfo(res); + ret = bind(fd, (struct sockaddr *) &addr, sizeof(addr)); + if (ret == -1) { + flb_errno(); + flb_error("[net] could not bind source_address=%s", source_addr); + return -1; + } + + return 0; +} + +static void set_ip_family(const char *host, struct addrinfo *hints) +{ + + int ret; + struct in6_addr serveraddr; + + /* check if the given 'host' is a network address, adjust ai_flags */ + ret = inet_pton(AF_INET, host, &serveraddr); + if (ret == 1) { /* valid IPv4 text address ? */ + hints->ai_family = AF_INET; + hints->ai_flags |= AI_NUMERICHOST; + } + else { + ret = inet_pton(AF_INET6, host, &serveraddr); + if (ret == 1) { /* valid IPv6 text address ? */ + hints->ai_family = AF_INET6; + hints->ai_flags |= AI_NUMERICHOST; + } + } +} + +/* Connect to a TCP socket server and returns the file descriptor */ +flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port, + char *source_addr, int connect_timeout, + int is_async, + void *async_ctx, + struct flb_connection *u_conn) +{ + int ret; + int use_async_dns; + char resolver_initial; + flb_sockfd_t fd = -1; + char _port[6]; + char address[41]; + struct addrinfo hints; + struct addrinfo *sorted_res, *res, *rp; + + if (is_async == FLB_TRUE && !u_conn) { + flb_error("[net] invalid async mode with not set upstream connection"); + return -1; + } + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + /* Set hints */ + set_ip_family(host, &hints); + + /* fomart the TCP port */ + snprintf(_port, sizeof(_port), "%lu", port); + + use_async_dns = is_async; + + if (u_conn->net->dns_resolver != NULL) { + resolver_initial = toupper(u_conn->net->dns_resolver[0]); + + if (resolver_initial == FLB_DNS_LEGACY) { + use_async_dns = FLB_FALSE; + } + } + + /* retrieve DNS info */ + if (use_async_dns) { + ret = flb_net_getaddrinfo(host, _port, &hints, &res, + u_conn->net->dns_mode, + connect_timeout); + } + else { + ret = getaddrinfo(host, _port, &hints, &res); + } + + if (ret) { + if (use_async_dns) { + flb_warn("[net] getaddrinfo(host='%s', err=%d): %s", host, ret, ares_strerror(ret)); + } + else { + flb_warn("[net] getaddrinfo(host='%s', err=%d): %s", host, ret, gai_strerror(ret)); + } + + return -1; + } + + if (u_conn->net_error > 0) { + if (u_conn->net_error == ETIMEDOUT) { + flb_warn("[net] timeout detected between DNS lookup and connection attempt"); + } + + if (use_async_dns) { + flb_net_free_translated_addrinfo(res); + } + else { + freeaddrinfo(res); + } + + return -1; + } + + sorted_res = res; + + if (u_conn->net->dns_prefer_ipv4) { + sorted_res = flb_net_sort_addrinfo_list(res, AF_INET); + + if (sorted_res == NULL) { + flb_debug("[net] error sorting getaddrinfo results"); + + if (use_async_dns) { + flb_net_free_translated_addrinfo(res); + } + else { + freeaddrinfo(res); + } + + return -1; + } + } + + /* + * Try to connect: on this iteration we try to connect to the first + * available address. + */ + for (rp = sorted_res; rp != NULL; rp = rp->ai_next) { + if (u_conn->net_error > 0) { + if (u_conn->net_error == ETIMEDOUT) { + flb_warn("[net] timeout detected between connection attempts"); + } + } + + /* create socket */ + fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd == -1) { + flb_error("[net] coult not create client socket, retrying"); + continue; + } + + /* asynchronous socket ? */ + if (is_async == FLB_TRUE) { + flb_net_socket_nonblocking(fd); + } + + /* Bind a specific network interface ? */ + if (source_addr != NULL) { + ret = flb_net_bind_address(fd, source_addr); + + if (ret == -1) { + flb_warn("[net] falling back to random interface"); + } + else { + flb_trace("[net] client connect bind address: %s", source_addr); + } + } + + /* Disable Nagle's algorithm */ + flb_net_socket_tcp_nodelay(fd); + + /* Set receive timeout */ + flb_net_socket_set_rcvtimeout(fd, u_conn->net->io_timeout); + + if (u_conn) { + u_conn->fd = fd; + u_conn->event.fd = fd; + } + + flb_connection_set_remote_host(u_conn, rp->ai_addr); + + /* Perform TCP connection */ + if (is_async == FLB_TRUE) { + ret = net_connect_async(fd, rp->ai_addr, rp->ai_addrlen, + (char *) host, port, connect_timeout, + async_ctx, u_conn); + + } + else { + ret = net_connect_sync(fd, rp->ai_addr, rp->ai_addrlen, + (char *) host, port, connect_timeout); + } + + if (u_conn->net_error == ETIMEDOUT) { + /* flb_upstream_conn_timeouts called prepare_destroy_conn which + * closed the file descriptor and removed it from the event so + * we can safely ignore it. + */ + + fd = -1; + + break; + } + + if (ret == -1) { + address[0] = '\0'; + + ret = flb_net_address_to_str(rp->ai_family, rp->ai_addr, + address, sizeof(address)); + + /* If the connection failed, just abort and report the problem */ + flb_debug("[net] socket #%i could not connect to %s:%s", + fd, address, _port); + + if (u_conn) { + u_conn->fd = -1; + u_conn->event.fd = -1; + } + + flb_socket_close(fd); + fd = -1; + + continue; + } + + break; + } + + if (fd == -1) { + flb_debug("[net] could not connect to %s:%s", + host, _port); + } + + if (use_async_dns) { + flb_net_free_translated_addrinfo(res); + } + else { + freeaddrinfo(res); + } + + if (rp == NULL) { + return -1; + } + + return fd; +} + +/* "Connect" to a UDP socket server and returns the file descriptor */ +flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port, + char *source_addr) +{ + int ret; + flb_sockfd_t fd = -1; + char _port[6]; + struct addrinfo hints; + struct addrinfo *res, *rp; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + /* Set hints */ + set_ip_family(host, &hints); + + /* Format UDP port */ + snprintf(_port, sizeof(_port), "%lu", port); + + /* retrieve DNS info */ + ret = getaddrinfo(host, _port, &hints, &res); + if (ret != 0) { + flb_warn("net]: getaddrinfo(host='%s'): %s", + host, gai_strerror(ret)); + return -1; + } + + for (rp = res; rp != NULL; rp = rp->ai_next) { + /* create socket */ + fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (fd == -1) { + flb_error("[net] coult not create client socket, retrying"); + continue; + } + + /* Bind a specific network interface ? */ + if (source_addr != NULL) { + ret = flb_net_bind_address(fd, source_addr); + if (ret == -1) { + flb_warn("[net] falling back to random interface"); + } + else { + flb_trace("[net] client connect bind address: %s", source_addr); + } + } + + /* + * Why do we connect(2) an UDP socket ?, is this useful ?: Yes. Despite + * an UDP socket it's not in a connection state, connecting through the + * API it helps the Kernel to configure the destination address and + * is totally valid, so then you don't need to use sendto(2). + * + * For our use case this is quite helpful, since the caller keeps using + * the same Fluent Bit I/O API to deliver a message. + */ + if (connect(fd, rp->ai_addr, rp->ai_addrlen) == -1) { + flb_error("[net] UDP socket %i could connect to %s:%s", + fd, host, _port); + flb_socket_close(fd); + fd = -1; + break; + } + break; + } + + freeaddrinfo(res); + + if (rp == NULL) { + return -1; + } + + return fd; +} + +/* Connect to a TCP socket server and returns the file descriptor */ +int flb_net_tcp_fd_connect(flb_sockfd_t fd, const char *host, unsigned long port) +{ + int ret; + struct addrinfo hints; + struct addrinfo *res; + char _port[6]; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + snprintf(_port, sizeof(_port), "%lu", port); + ret = getaddrinfo(host, _port, &hints, &res); + if (ret != 0) { + flb_warn("net_tcp_fd_connect: getaddrinfo(host='%s'): %s", + host, gai_strerror(ret)); + return -1; + } + + ret = connect(fd, res->ai_addr, res->ai_addrlen); + freeaddrinfo(res); + + return ret; +} + +flb_sockfd_t flb_net_server(const char *port, const char *listen_addr) +{ + flb_sockfd_t fd = -1; + int ret; + struct addrinfo hints; + struct addrinfo *res, *rp; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + + ret = getaddrinfo(listen_addr, port, &hints, &res); + if (ret != 0) { + flb_warn("net_server: getaddrinfo(listen='%s:%s'): %s", + listen_addr, port, gai_strerror(ret)); + return -1; + } + + for (rp = res; rp != NULL; rp = rp->ai_next) { + fd = flb_net_socket_create(rp->ai_family, 1); + if (fd == -1) { + flb_error("Error creating server socket, retrying"); + continue; + } + + flb_net_socket_tcp_nodelay(fd); + flb_net_socket_reset(fd); + + ret = flb_net_bind(fd, rp->ai_addr, rp->ai_addrlen, 128); + if(ret == -1) { + flb_warn("Cannot listen on %s port %s", listen_addr, port); + flb_socket_close(fd); + continue; + } + break; + } + freeaddrinfo(res); + + if (rp == NULL) { + return -1; + } + + return fd; +} + +flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr) +{ + flb_sockfd_t fd = -1; + int ret; + struct addrinfo hints; + struct addrinfo *res, *rp; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_PASSIVE; + + ret = getaddrinfo(listen_addr, port, &hints, &res); + if (ret != 0) { + flb_warn("net_server_udp: getaddrinfo(listen='%s:%s'): %s", + listen_addr, port, gai_strerror(ret)); + return -1; + } + + for (rp = res; rp != NULL; rp = rp->ai_next) { + fd = flb_net_socket_create_udp(rp->ai_family, 0); + if (fd == -1) { + flb_error("Error creating server socket, retrying"); + continue; + } + + ret = flb_net_bind_udp(fd, rp->ai_addr, rp->ai_addrlen); + if(ret == -1) { + flb_warn("Cannot listen on %s port %s", listen_addr, port); + flb_socket_close(fd); + continue; + } + break; + } + freeaddrinfo(res); + + if (rp == NULL) { + return -1; + } + + return fd; +} + +#ifdef FLB_HAVE_UNIX_SOCKET +flb_sockfd_t flb_net_server_unix(const char *listen_path, + int stream_mode, + int backlog) +{ + size_t address_length; + size_t path_length; + struct sockaddr_un address; + int ret; + flb_sockfd_t fd; + + if (stream_mode) { + fd = flb_net_socket_create(AF_UNIX, FLB_TRUE); + } + else { + fd = flb_net_socket_create_udp(AF_UNIX, FLB_TRUE); + } + + if (fd != -1) { + memset(&address, 0, sizeof(struct sockaddr_un)); + + path_length = strlen(listen_path); + + address_length = offsetof(struct sockaddr_un, sun_path) + + path_length + + 1; + + address.sun_family = AF_UNIX; + + strncpy(address.sun_path, listen_path, sizeof(address.sun_path)); + + if (stream_mode) { + ret = flb_net_bind(fd, + (const struct sockaddr *) &address, + address_length, + backlog); + } + else { + ret = flb_net_bind_udp(fd, + (const struct sockaddr *) &address, + address_length); + } + + if(ret == -1) { + flb_warn("Cannot bind to or listen on %s", listen_path); + + flb_socket_close(fd); + } + } + else { + flb_error("Error creating server socket"); + } + + return fd; +} +#else +flb_sockfd_t flb_net_server_unix(const char *listen_path, + int stream_mode, + int backlog) +{ + flb_error("Unix sockets are not available in this platform"); + + return -1; +} +#endif + +int flb_net_bind(flb_sockfd_t fd, const struct sockaddr *addr, + socklen_t addrlen, int backlog) +{ + int ret; + + ret = bind(fd, addr, addrlen); + if( ret == -1 ) { + flb_error("Error binding socket"); + return ret; + } + + ret = listen(fd, backlog); + if(ret == -1 ) { + flb_error("Error setting up the listener"); + return -1; + } + + return ret; +} + +int flb_net_bind_udp(flb_sockfd_t fd, const struct sockaddr *addr, + socklen_t addrlen) +{ + int ret; + + ret = bind(fd, addr, addrlen); + if( ret == -1 ) { + flb_error("Error binding socket"); + return ret; + } + + return ret; +} + +flb_sockfd_t flb_net_accept(flb_sockfd_t server_fd) +{ + flb_sockfd_t remote_fd; + struct sockaddr sock_addr; + socklen_t socket_size = sizeof(struct sockaddr); + + // return accept(server_fd, &sock_addr, &socket_size); + +#ifdef FLB_HAVE_ACCEPT4 + remote_fd = accept4(server_fd, &sock_addr, &socket_size, + SOCK_NONBLOCK | SOCK_CLOEXEC); +#else + remote_fd = accept(server_fd, &sock_addr, &socket_size); + flb_net_socket_nonblocking(remote_fd); +#endif + + if (remote_fd == -1) { + perror("accept4"); + } + + return remote_fd; +} + +int flb_net_address_to_str(int family, const struct sockaddr *addr, + char *output_buffer, size_t output_buffer_size) +{ + struct sockaddr *proper_addr; + const char *result; + + if (family == AF_INET) { + proper_addr = (struct sockaddr *) &((struct sockaddr_in *) addr)->sin_addr; + } + else if (family == AF_INET6) { + proper_addr = (struct sockaddr *) &((struct sockaddr_in6 *) addr)->sin6_addr; + } + else { + strncpy(output_buffer, + "CONVERSION ERROR 1", + output_buffer_size); + + return -1; + } + + result = inet_ntop(family, proper_addr, output_buffer, output_buffer_size); + + if (result == NULL) { + strncpy(output_buffer, + "CONVERSION ERROR 2", + output_buffer_size); + + return -2; + } + + return 0; +} + +#ifdef FLB_COMPILE_UNUSED_FUNCTIONS +static int net_socket_get_local_address(flb_sockfd_t fd, + struct sockaddr_storage *address) +{ + socklen_t buffer_size; + int result; + + buffer_size = sizeof(struct sockaddr_storage); + + result = getsockname(fd, (struct sockaddr *) &address, &buffer_size); + + if (result == -1) { + return -1; + } + + return 0; +} +#endif + +static int net_socket_get_peer_address(flb_sockfd_t fd, + struct sockaddr_storage *address) +{ + socklen_t buffer_size; + int result; + + buffer_size = sizeof(struct sockaddr_storage); + + result = getpeername(fd, (struct sockaddr *) address, &buffer_size); + + if (result == -1) { + return -1; + } + + return 0; +} + +static unsigned short int net_address_port(struct sockaddr_storage *address) +{ + unsigned short int port; + + if (address->ss_family == AF_INET) { + port = ((struct sockaddr_in *) address)->sin_port; + } + else if (address->ss_family == AF_INET6) { + port = ((struct sockaddr_in6 *) address)->sin6_port; + } + else { + port = 0; + } + + return ntohs(port); +} + +#ifdef FLB_HAVE_UNIX_SOCKET +static int net_address_unix_socket_peer_pid_raw(flb_sockfd_t fd, + struct sockaddr_storage *address, + char *output_buffer, + int output_buffer_size, + size_t *output_data_size) +{ +#if !defined(FLB_SYSTEM_MACOS) && !defined(FLB_SYSTEM_FREEBSD) + unsigned int peer_credentials_size; + struct ucred peer_credentials; +#endif + size_t required_buffer_size; + int result = 0; + + if (address->ss_family != AF_UNIX) { + return -1; + } + + required_buffer_size = 11; /* maximum 32 bit signed integer */ + required_buffer_size += 1; /* string terminator */ + + if (required_buffer_size > output_buffer_size) { + return -1; + } + +#if !defined(FLB_SYSTEM_MACOS) && !defined(FLB_SYSTEM_FREEBSD) + peer_credentials_size = sizeof(struct ucred); + + result = getsockopt(fd, + SOL_SOCKET, + SO_PEERCRED, + &peer_credentials, + &peer_credentials_size); + + if (result != -1) { + *output_data_size = snprintf(output_buffer, + output_buffer_size, + "%ld", + (long) peer_credentials.pid); + } +#else + *output_data_size = snprintf(output_buffer, + output_buffer_size, + FLB_NETWORK_ADDRESS_UNAVAILABLE); +#endif + + return result; +} + +static int net_address_unix_socket_peer_pid_str(flb_sockfd_t fd, + struct sockaddr_storage *address, + char *output_buffer, + int output_buffer_size, + size_t *output_data_size) +{ + size_t required_buffer_size; + size_t peer_pid_length; + char peer_pid[12]; + int result; + + if (address->ss_family != AF_UNIX) { + return -1; + } + + result = net_address_unix_socket_peer_pid_raw(fd, + address, + peer_pid, + sizeof(peer_pid), + &peer_pid_length); + + if (result != 0) { + return -1; + } + + required_buffer_size = strlen(FLB_NETWORK_UNIX_SOCKET_PEER_ADDRESS_TEMPLATE); + required_buffer_size += peer_pid_length; + required_buffer_size -= 2; /* format string specifiers */ + required_buffer_size += 1; /* string terminator */ + + if (required_buffer_size > output_buffer_size) { + *output_data_size = required_buffer_size; + + return -1; + } + + *output_data_size = snprintf(output_buffer, + output_buffer_size, + FLB_NETWORK_UNIX_SOCKET_PEER_ADDRESS_TEMPLATE, + peer_pid); + + return 0; +} +#endif + +size_t flb_network_address_size(struct sockaddr_storage *address) +{ + if (address->ss_family == AF_INET) { + return sizeof(struct sockaddr_in); + } + else if (address->ss_family == AF_INET6) { + return sizeof(struct sockaddr_in6); + } +#ifdef FLB_HAVE_UNIX_SOCKET + else if (address->ss_family == AF_UNIX) { + return sizeof(struct sockaddr_un); + } +#endif + + return 0; +} + +static int net_address_ip_raw(flb_sockfd_t fd, + struct sockaddr_storage *address, + char *output_buffer, + int output_buffer_size, + size_t *output_data_size) +{ + char peer_pid[12]; + char *address_data; + size_t address_size; + int result; + + errno = 0; + + if (address->ss_family == AF_UNSPEC) { + flb_debug("socket_ip_raw: uninitialized address"); + + return -1; + } + if (address->ss_family == AF_INET) { + address_data = ((char *) &((struct sockaddr_in *) address)->sin_addr); + address_size = sizeof(struct in_addr); + } + else if (address->ss_family == AF_INET6) { + address_data = ((char *) &((struct sockaddr_in6 *) address)->sin6_addr); + address_size = sizeof(struct in6_addr); + } +#ifdef FLB_HAVE_UNIX_SOCKET + else if (address->ss_family == AF_UNIX) { + result = net_address_unix_socket_peer_pid_raw(fd, + address, + peer_pid, + sizeof(peer_pid), + &address_size); + + if (result != 0) { + flb_debug("socket_ip_raw: error getting client process pid"); + + return -1; + } + + address_data = peer_pid; + } +#endif + else { + flb_debug("socket_ip_raw: unsupported address type (%i)", + address->ss_family); + + return -1; + } + + if (output_buffer_size < address_size) { + flb_debug("socket_ip_raw: insufficient buffer size (%i < %zu)", + output_buffer_size, address_size); + + return -1; + } + + memcpy(output_buffer, address_data, address_size); + + if (output_data_size != NULL) { + *output_data_size = address_size; + } + + return 0; +} + +static int net_address_ip_str(flb_sockfd_t fd, + struct sockaddr_storage *address, + char *output_buffer, + int output_buffer_size, + size_t *output_data_size) +{ + void *address_data; + int result; + + errno = 0; + + if (address->ss_family == AF_UNSPEC) { + *output_data_size = snprintf(output_buffer, + output_buffer_size, + FLB_NETWORK_ADDRESS_UNAVAILABLE); + + return 0; + } + else if (address->ss_family == AF_INET) { + address_data = (void *) &((struct sockaddr_in *) address)->sin_addr; + } + else if (address->ss_family == AF_INET6) { + address_data = (void *) &((struct sockaddr_in6 *) address)->sin6_addr; + } +#ifdef FLB_HAVE_UNIX_SOCKET + else if (address->ss_family == AF_UNIX) { + result = net_address_unix_socket_peer_pid_str(fd, + address, + output_buffer, + output_buffer_size, + output_data_size); + + if (result != 0) { + flb_debug("socket_ip_str: error getting client process pid"); + } + + return result; + } +#endif + else { + flb_debug("socket_ip_str: unsupported address type (%i)", + address->ss_family); + + return -1; + } + + if ((inet_ntop(address->ss_family, + address_data, + output_buffer, + output_buffer_size)) == NULL) { + flb_debug("socket_ip_str: Can't get the IP text form (%i)", errno); + + return -1; + } + + *output_data_size = strlen(output_buffer); + + return 0; +} + +int flb_net_socket_peer_address(flb_sockfd_t fd, + struct sockaddr_storage *output_buffer) +{ + return net_socket_get_peer_address(fd, output_buffer); +} + +int flb_net_socket_address_info(flb_sockfd_t fd, + struct sockaddr_storage *address, + unsigned short int *port_output_buffer, + char *str_output_buffer, + int str_output_buffer_size, + size_t *str_output_data_size) +{ + int result; + + result = net_address_ip_str(fd, address, + str_output_buffer, + str_output_buffer_size, + str_output_data_size); + + if (result == 0) { + if (port_output_buffer != NULL) { + *port_output_buffer = net_address_port(address); + } + } + + return result; +} + +int flb_net_socket_ip_peer_str(flb_sockfd_t fd, + char *output_buffer, + int output_buffer_size, + size_t *output_data_size, + int *output_address_family) +{ + struct sockaddr_storage address; + int result; + + result = net_socket_get_peer_address(fd, &address); + + if (result != 0) { + return -1; + } + + if (address.ss_family == AF_UNIX) { + + } + + result = net_address_ip_str(fd, &address, + output_buffer, + output_buffer_size, + output_data_size); + + if (result == 0) { + if (output_address_family != NULL) { + *output_address_family = address.ss_family; + } + } + + return result; +} + +int flb_net_socket_peer_ip_raw(flb_sockfd_t fd, + char *output_buffer, + int output_buffer_size, + size_t *output_data_size, + int *output_address_family) +{ + struct sockaddr_storage address; + int result; + + result = net_socket_get_peer_address(fd, &address); + + if (result != 0) { + return -1; + } + + result = net_address_ip_raw(fd, &address, + output_buffer, + output_buffer_size, + output_data_size); + + if (result == 0) { + if (output_address_family != NULL) { + *output_address_family = address.ss_family; + } + } + + return result; +} + +int flb_net_socket_peer_port(flb_sockfd_t fd, + unsigned short int *output_buffer) +{ + struct sockaddr_storage address; + int result; + + result = net_socket_get_peer_address(fd, &address); + + if (result != 0) { + return -1; + } + + *output_buffer = net_address_port(&address); + + return 0; +} + +int flb_net_socket_peer_info(flb_sockfd_t fd, + unsigned short int *port_output_buffer, + struct sockaddr_storage *raw_output_buffer, + char *str_output_buffer, + int str_output_buffer_size, + size_t *str_output_data_size) +{ + struct sockaddr_storage address; + int result; + + result = net_socket_get_peer_address(fd, &address); + + if (result != 0) { + return -1; + } + + memcpy(raw_output_buffer, + &address, + sizeof(struct sockaddr_storage)); + + return flb_net_socket_address_info(fd, + &address, + port_output_buffer, + str_output_buffer, + str_output_buffer_size, + str_output_data_size); +} |