summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_http_client.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--fluent-bit/src/flb_http_client.c1399
1 files changed, 1399 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_http_client.c b/fluent-bit/src/flb_http_client.c
new file mode 100644
index 00000000..2d280329
--- /dev/null
+++ b/fluent-bit/src/flb_http_client.c
@@ -0,0 +1,1399 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This is a very simple HTTP Client interface which aims to provide an
+ * easy way to issue HTTP requests and handle reponses from the input/output
+ * plugins.
+ *
+ * It scope is:
+ *
+ * - Use upstream connections.
+ * - Support 'retry' in case the HTTP server timeouts a connection.
+ * - Get return Status, Headers and Body content if found.
+ * - If Upstream supports keepalive, adjust headers
+ */
+
+#define _GNU_SOURCE
+#include <string.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_http_client_debug.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_base64.h>
+
+
+
+void flb_http_client_debug(struct flb_http_client *c,
+ struct flb_callback *cb_ctx)
+{
+#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
+ if (cb_ctx) {
+ flb_http_client_debug_enable(c, cb_ctx);
+ }
+#endif
+}
+
+/*
+ * Removes the port from the host header
+ */
+int flb_http_strip_port_from_host(struct flb_http_client *c)
+{
+ struct mk_list *head;
+ struct flb_kv *kv;
+ char *out_host;
+ struct flb_upstream *u;
+
+ u = c->u_conn->upstream;
+
+ if (!c->host) {
+ if (!u->proxied_host) {
+ out_host = u->tcp_host;
+ } else {
+ out_host = u->proxied_host;
+ }
+ } else {
+ out_host = (char *) c->host;
+ }
+
+ mk_list_foreach(head, &c->headers) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ if (strcasecmp("Host", kv->key) == 0) {
+ flb_sds_destroy(kv->val);
+ kv->val = NULL;
+ kv->val = flb_sds_create(out_host);
+ if (!kv->val) {
+ flb_errno();
+ return -1;
+ }
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
+int flb_http_allow_duplicated_headers(struct flb_http_client *c, int allow)
+{
+ if (allow != FLB_TRUE && allow != FLB_FALSE) {
+ return -1;
+ }
+
+ c->allow_dup_headers = allow;
+ return 0;
+}
+
+/* check if there is enough space in the client header buffer */
+static int header_available(struct flb_http_client *c, int bytes)
+{
+ int available;
+
+ available = c->header_size - c->header_len;
+ if (available < bytes) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Try to find a header value in the buffer */
+static int header_lookup(struct flb_http_client *c,
+ const char *header, int header_len,
+ const char **out_val, int *out_len)
+{
+ char *p;
+ char *crlf;
+ char *end;
+
+ if (!c->resp.data) {
+ return FLB_HTTP_MORE;
+ }
+
+ /* Lookup the beginning of the header */
+ p = strcasestr(c->resp.data, header);
+ end = strstr(c->resp.data, "\r\n\r\n");
+ if (!p) {
+ if (end) {
+ /* The headers are complete but the header is not there */
+ return FLB_HTTP_NOT_FOUND;
+ }
+
+ /* We need more data */
+ return FLB_HTTP_MORE;
+ }
+
+ /* Exclude matches in the body */
+ if (end && p > end) {
+ return FLB_HTTP_NOT_FOUND;
+ }
+
+ /* Lookup CRLF (end of line \r\n) */
+ crlf = strstr(p, "\r\n");
+ if (!crlf) {
+ return FLB_HTTP_MORE;
+ }
+
+ p += header_len;
+
+ *out_val = p;
+ *out_len = (crlf - p);
+
+ return FLB_HTTP_OK;
+}
+
+/* HTTP/1.1: Check if we have a Chunked Transfer Encoding */
+static int check_chunked_encoding(struct flb_http_client *c)
+{
+ int ret;
+ int len;
+ const char *header = NULL;
+
+ ret = header_lookup(c, "Transfer-Encoding: ", 19,
+ &header, &len);
+ if (ret == FLB_HTTP_NOT_FOUND) {
+ /* If the header is missing, this is fine */
+ c->resp.chunked_encoding = FLB_FALSE;
+ return FLB_HTTP_OK;
+ }
+ else if (ret == FLB_HTTP_MORE) {
+ return FLB_HTTP_MORE;
+ }
+
+ if (strncasecmp(header, "chunked", len) == 0) {
+ c->resp.chunked_encoding = FLB_TRUE;
+ }
+
+ return FLB_HTTP_OK;
+}
+
+/* Check response for a 'Content-Length' header */
+static int check_content_length(struct flb_http_client *c)
+{
+ int ret;
+ int len;
+ const char *header;
+ char tmp[256];
+
+ if (c->resp.status == 204) {
+ c->resp.content_length = -1;
+ return FLB_HTTP_OK;
+ }
+
+ ret = header_lookup(c, "Content-Length: ", 16,
+ &header, &len);
+ if (ret == FLB_HTTP_MORE) {
+ return FLB_HTTP_MORE;
+ }
+ else if (ret == FLB_HTTP_NOT_FOUND) {
+ return FLB_HTTP_NOT_FOUND;
+ }
+
+ if (len > sizeof(tmp) - 1) {
+ /* Value too long */
+ return FLB_HTTP_ERROR;
+ }
+
+ /* Copy to temporary buffer */
+ memcpy(tmp, header, len);
+ tmp[len] = '\0';
+
+ c->resp.content_length = atoi(tmp);
+ return FLB_HTTP_OK;
+}
+
+/* Check response for a 'Connection' header */
+static int check_connection(struct flb_http_client *c)
+{
+ int ret;
+ int len;
+ const char *header;
+ char *buf;
+
+ ret = header_lookup(c, "Connection: ", 12,
+ &header, &len);
+ if (ret == FLB_HTTP_NOT_FOUND) {
+ return FLB_HTTP_NOT_FOUND;
+ }
+ else if (ret == FLB_HTTP_MORE) {
+ return FLB_HTTP_MORE;
+ }
+
+ buf = flb_malloc(len + 1);
+ if (!buf) {
+ flb_errno();
+ return -1;
+ }
+
+ memcpy(buf, header, len);
+ buf[len] = '\0';
+
+ if (strncasecmp(buf, "close", 5) == 0) {
+ c->resp.connection_close = FLB_TRUE;
+ }
+ else if (strcasestr(buf, "keep-alive")) {
+ c->resp.connection_close = FLB_FALSE;
+ }
+ flb_free(buf);
+ return FLB_HTTP_OK;
+
+}
+
+static inline void consume_bytes(char *buf, int bytes, int length)
+{
+ memmove(buf, buf + bytes, length - bytes);
+}
+
+static int process_chunked_data(struct flb_http_client *c)
+{
+ long len;
+ long drop;
+ long val;
+ char *p;
+ char tmp[32];
+ struct flb_http_response *r = &c->resp;
+
+ chunk_start:
+ p = strstr(r->chunk_processed_end, "\r\n");
+ if (!p) {
+ return FLB_HTTP_MORE;
+ }
+
+ /* Hexa string length */
+ len = (p - r->chunk_processed_end);
+ if ((len > sizeof(tmp) - 1) || len == 0) {
+ return FLB_HTTP_ERROR;
+ }
+ p += 2;
+
+ /* Copy hexa string to temporary buffer */
+ memcpy(tmp, r->chunk_processed_end, len);
+ tmp[len] = '\0';
+
+ /* Convert hexa string to decimal */
+ errno = 0;
+ val = strtol(tmp, NULL, 16);
+ if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN))
+ || (errno != 0 && val == 0)) {
+ flb_errno();
+ return FLB_HTTP_ERROR;
+ }
+ if (val < 0) {
+ return FLB_HTTP_ERROR;
+ }
+ /*
+ * 'val' contains the expected number of bytes, check current lengths
+ * and do buffer adjustments.
+ *
+ * we do val + 2 because the chunk always ends with \r\n
+ */
+ val += 2;
+
+ /* Number of bytes after the Chunk header */
+ len = r->data_len - (p - r->data);
+ if (len < val) {
+ return FLB_HTTP_MORE;
+ }
+
+ /* From the current chunk we expect it ends with \r\n */
+ if (p[val -2] != '\r' || p[val - 1] != '\n') {
+ return FLB_HTTP_ERROR;
+ }
+
+ /*
+ * At this point we are just fine, the chunk is valid, next steps:
+ *
+ * 1. check possible last chunk
+ * 2. drop chunk header from the buffer
+ * 3. remove chunk ending \r\n
+ */
+
+ /* 1. Validate ending chunk */
+ if (val - 2 == 0) {
+ /*
+ * For an ending chunk we expect:
+ *
+ * 0\r\n
+ * \r\n
+ *
+ * so at least we need 5 bytes in the buffer
+ */
+ len = r->data_len - (r->chunk_processed_end - r->data);
+ if (len < 5) {
+ return FLB_HTTP_MORE;
+ }
+
+ if (r->chunk_processed_end[3] != '\r' ||
+ r->chunk_processed_end[4] != '\n') {
+ return FLB_HTTP_ERROR;
+ }
+ }
+
+ /* 2. Drop chunk header */
+ drop = (p - r->chunk_processed_end);
+ len = r->data_len - (r->chunk_processed_end - r->data);
+ consume_bytes(r->chunk_processed_end, drop, len);
+ r->data_len -= drop;
+ r->data[r->data_len] = '\0';
+
+ /* 3. Remove chunk ending \r\n */
+ drop = 2;
+ r->chunk_processed_end += labs(val - 2);
+ len = r->data_len - (r->chunk_processed_end - r->data);
+ consume_bytes(r->chunk_processed_end, drop, len);
+ r->data_len -= drop;
+
+ /* Always append a NULL byte */
+ r->data[r->data_len] = '\0';
+
+ /* Is this the last chunk ? */
+ if ((val - 2 == 0)) {
+ /* Update payload size */
+ r->payload_size = r->data_len - (r->headers_end - r->data);
+ return FLB_HTTP_OK;
+ }
+
+ /* If we have some remaining bytes, start over */
+ len = r->data_len - (r->chunk_processed_end - r->data);
+ if (len > 0) {
+ goto chunk_start;
+ }
+
+ return FLB_HTTP_MORE;
+}
+
+static int process_data(struct flb_http_client *c)
+{
+ int ret;
+ char code[4];
+ char *tmp;
+
+ if (c->resp.data_len < 15) {
+ /* we need more data */
+ return FLB_HTTP_MORE;
+ }
+
+ /* HTTP response status */
+ if (c->resp.status <= 0) {
+ memcpy(code, c->resp.data + 9, 3);
+ code[3] = '\0';
+ c->resp.status = atoi(code);
+ if (c->resp.status < 100 || c->resp.status > 599) {
+ return FLB_HTTP_ERROR;
+ }
+ }
+
+ /* Try to lookup content length */
+ if (c->resp.content_length == -1 && c->resp.chunked_encoding == FLB_FALSE) {
+ ret = check_content_length(c);
+ if (ret == FLB_HTTP_ERROR) {
+ return FLB_HTTP_ERROR;
+ }
+ }
+
+ /* Chunked encoding for HTTP/1.1 (no content length of course) */
+ if ((c->flags & FLB_HTTP_11) && c->resp.content_length == -1) {
+ if (c->resp.chunked_encoding == FLB_FALSE) {
+ ret = check_chunked_encoding(c);
+ if (ret == FLB_HTTP_ERROR) {
+ return FLB_HTTP_ERROR;
+ }
+ }
+ }
+
+ if (!c->resp.headers_end) {
+ tmp = strstr(c->resp.data, "\r\n\r\n");
+ if (tmp) {
+ c->resp.headers_end = tmp + 4;
+ if (c->resp.chunked_encoding == FLB_TRUE) {
+ c->resp.chunk_processed_end = c->resp.headers_end;
+ }
+
+ /* Mark the payload */
+ if ((tmp - c->resp.data + 4) < c->resp.data_len) {
+ c->resp.payload = tmp += 4;
+ c->resp.payload_size = (c->resp.data_len - (tmp - c->resp.data));
+ }
+ }
+ else {
+ return FLB_HTTP_MORE;
+ }
+ }
+
+ /* Re-check if an ending exists, if so process payload if required */
+ if (c->resp.headers_end) {
+ /* Mark the payload */
+ if (!c->resp.payload &&
+ c->resp.headers_end - c->resp.data < c->resp.data_len) {
+ c->resp.payload = c->resp.headers_end;
+ c->resp.payload_size = (c->resp.data_len - (c->resp.headers_end - c->resp.data));
+ }
+
+ if (c->resp.content_length >= 0) {
+ c->resp.payload_size = c->resp.data_len;
+ c->resp.payload_size -= (c->resp.headers_end - c->resp.data);
+ if (c->resp.payload_size >= c->resp.content_length) {
+ return FLB_HTTP_OK;
+ }
+ }
+ else if (c->resp.chunked_encoding == FLB_TRUE) {
+ ret = process_chunked_data(c);
+ if (ret == FLB_HTTP_ERROR) {
+ return FLB_HTTP_ERROR;
+ }
+ else if (ret == FLB_HTTP_OK) {
+ return FLB_HTTP_OK;
+ }
+ }
+ else {
+ return FLB_HTTP_OK;
+ }
+ }
+ else if (c->resp.headers_end && c->resp.content_length <= 0) {
+ return FLB_HTTP_OK;
+ }
+
+ return FLB_HTTP_MORE;
+}
+
+#if defined FLB_HAVE_TESTS_OSSFUZZ
+int fuzz_process_data(struct flb_http_client *c);
+int fuzz_process_data(struct flb_http_client *c) {
+ return process_data(c);
+}
+
+int fuzz_check_connection(struct flb_http_client *c);
+int fuzz_check_connection(struct flb_http_client *c) {
+ return check_connection(c);
+}
+
+#endif
+
+static int proxy_parse(const char *proxy, struct flb_http_client *c)
+{
+ int len;
+ int port;
+ int off = 0;
+ const char *s;
+ const char *e;
+ const char *host;
+
+ len = strlen(proxy);
+ if (len < 7) {
+ return -1;
+ }
+
+ /* Protocol lookup */
+ if (strncmp(proxy, "http://", 7) == 0) {
+ port = 80;
+ off = 7;
+ c->proxy.type = FLB_HTTP_PROXY_HTTP;
+ }
+ else if (strncmp(proxy, "https://", 8) == 0) {
+ port = 443;
+ off = 8;
+ c->proxy.type = FLB_HTTP_PROXY_HTTPS;
+ }
+ else {
+ return -1;
+ }
+
+ /* Separate host/ip from port if any */
+ s = proxy + off;
+ if (*s == '[') {
+ /* IPv6 address (RFC 3986) */
+ e = strchr(++s, ']');
+ if (!e) {
+ return -1;
+ }
+ host = strndup(s, e - s);
+ s = e + 1;
+ } else {
+ e = s;
+ while (!(*e == '\0' || *e == ':' || *e == '/')) {
+ ++e;
+ }
+ if (e == s) {
+ return -1;
+ }
+ host = strndup(s, e - s);
+ s = e;
+ }
+ if (*s == ':') {
+ port = atoi(++s);
+ }
+
+ flb_trace("[http_client] proxy type=%i host=%s port=%i",
+ c->proxy.type, host, port);
+
+ c->proxy.host = host;
+ c->proxy.port = port;
+
+ return 0;
+}
+
+static int add_host_and_content_length(struct flb_http_client *c)
+{
+ int len;
+ flb_sds_t tmp;
+ flb_sds_t host;
+ char *out_host;
+ int out_port;
+ size_t size;
+ struct flb_upstream *u = c->u_conn->upstream;
+
+ if (!c->host) {
+ if (u->proxied_host) {
+ out_host = u->proxied_host;
+ }
+ else {
+ out_host = u->tcp_host;
+ }
+ }
+ else {
+ out_host = (char *) c->host;
+ }
+
+ len = strlen(out_host);
+ host = flb_sds_create_size(len + 32);
+ if (!host) {
+ flb_error("[http_client] cannot create temporal buffer");
+ return -1;
+ }
+
+ if (c->port == 0) {
+ if (u->proxied_port != 0 ) {
+ out_port = u->proxied_port;
+ }
+ else {
+ out_port = u->tcp_port;
+ }
+ }
+ else {
+ out_port = c->port;
+ }
+
+ if (c->flags & FLB_IO_TLS && out_port == 443) {
+ tmp = flb_sds_copy(host, out_host, strlen(out_host));
+ }
+ else {
+ tmp = flb_sds_printf(&host, "%s:%i", out_host, out_port);
+ }
+
+ if (!tmp) {
+ flb_sds_destroy(host);
+ flb_error("[http_client] cannot compose temporary host header");
+ return -1;
+ }
+ host = tmp;
+ tmp = NULL;
+
+ flb_http_add_header(c, "Host", 4, host, flb_sds_len(host));
+ flb_sds_destroy(host);
+
+ /* Content-Length */
+ if (c->body_len >= 0) {
+ size = 32;
+ tmp = flb_malloc(size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ len = snprintf(tmp, size - 1, "%i", c->body_len);
+ flb_http_add_header(c, "Content-Length", 14, tmp, len);
+ flb_free(tmp);
+ }
+
+ return 0;
+}
+
+struct flb_http_client *flb_http_client(struct flb_connection *u_conn,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ const char *host, int port,
+ const char *proxy, int flags)
+{
+ int ret;
+ char *p;
+ char *buf = NULL;
+ char *str_method = NULL;
+ char *fmt_plain = \
+ "%s %s HTTP/1.%i\r\n";
+ char *fmt_proxy = \
+ "%s http://%s:%i%s HTTP/1.%i\r\n"
+ "Proxy-Connection: KeepAlive\r\n";
+ // TODO: IPv6 should have the format of [ip]:port
+ char *fmt_connect = \
+ "%s %s:%i HTTP/1.%i\r\n"
+ "Proxy-Connection: KeepAlive\r\n";
+
+ struct flb_http_client *c;
+
+ switch (method) {
+ case FLB_HTTP_GET:
+ str_method = "GET";
+ break;
+ case FLB_HTTP_POST:
+ str_method = "POST";
+ break;
+ case FLB_HTTP_PUT:
+ str_method = "PUT";
+ break;
+ case FLB_HTTP_HEAD:
+ str_method = "HEAD";
+ break;
+ case FLB_HTTP_CONNECT:
+ str_method = "CONNECT";
+ break;
+ case FLB_HTTP_PATCH:
+ str_method = "PATCH";
+ break;
+ };
+
+ buf = flb_calloc(1, FLB_HTTP_BUF_SIZE);
+ if (!buf) {
+ flb_errno();
+ return NULL;
+ }
+
+ /* FIXME: handler for HTTPS proxy */
+ if (proxy) {
+ flb_debug("[http_client] using http_proxy %s for header", proxy);
+ ret = snprintf(buf, FLB_HTTP_BUF_SIZE,
+ fmt_proxy,
+ str_method,
+ host,
+ port,
+ uri,
+ flags & FLB_HTTP_10 ? 0 : 1);
+ }
+ else if (method == FLB_HTTP_CONNECT) {
+ flb_debug("[http_client] using HTTP CONNECT for proxy: proxy host %s, proxy port %i", host, port);
+ ret = snprintf(buf, FLB_HTTP_BUF_SIZE,
+ fmt_connect,
+ str_method,
+ host,
+ port,
+ flags & FLB_HTTP_10 ? 0 : 1);
+ }
+ else {
+ flb_debug("[http_client] not using http_proxy for header");
+ ret = snprintf(buf, FLB_HTTP_BUF_SIZE,
+ fmt_plain,
+ str_method,
+ uri,
+ flags & FLB_HTTP_10 ? 0 : 1);
+ }
+
+ if (ret == -1) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+
+ c = flb_calloc(1, sizeof(struct flb_http_client));
+ if (!c) {
+ flb_free(buf);
+ return NULL;
+ }
+
+ c->u_conn = u_conn;
+ c->method = method;
+ c->uri = uri;
+ c->host = host;
+ c->port = port;
+ c->header_buf = buf;
+ c->header_size = FLB_HTTP_BUF_SIZE;
+ c->header_len = ret;
+ c->flags = flags;
+ c->allow_dup_headers = FLB_TRUE;
+ mk_list_init(&c->headers);
+
+ /* Check if we have a query string */
+ p = strchr(uri, '?');
+ if (p) {
+ p++;
+ c->query_string = p;
+ }
+
+ /* Is Upstream connection using keepalive mode ? */
+ if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) {
+ c->flags |= FLB_HTTP_KA;
+ }
+
+ /* Response */
+ c->resp.content_length = -1;
+ c->resp.connection_close = -1;
+
+ if ((flags & FLB_HTTP_10) == 0) {
+ c->flags |= FLB_HTTP_11;
+ }
+
+ if (body && body_len > 0) {
+ c->body_buf = body;
+ c->body_len = body_len;
+ }
+
+ add_host_and_content_length(c);
+
+ /* Check proxy data */
+ if (proxy) {
+ flb_debug("[http_client] Using http_proxy: %s", proxy);
+ ret = proxy_parse(proxy, c);
+ if (ret != 0) {
+ flb_debug("[http_client] Something wrong with the http_proxy parsing");
+ flb_http_client_destroy(c);
+ return NULL;
+ }
+ }
+
+ /* 'Read' buffer size */
+ c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX);
+ if (!c->resp.data) {
+ flb_errno();
+ flb_http_client_destroy(c);
+ return NULL;
+ }
+ c->resp.data[0] = '\0';
+ c->resp.data_len = 0;
+ c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX;
+ c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX;
+
+ return c;
+}
+
+/*
+ * By default the HTTP client have a fixed buffer to read a response for a
+ * simple request. But in certain situations the caller might expect a
+ * larger response that exceed the buffer limit.
+ *
+ * This function allows to set a maximum buffer size for the client
+ * response where:
+ *
+ * 1. size = 0 no limit, read as much as possible.
+ * 2. size = N: specific limit, upon reach limit discard data (default: 4KB)
+ */
+int flb_http_buffer_size(struct flb_http_client *c, size_t size)
+{
+ if (size < c->resp.data_size_max && size != 0) {
+ flb_error("[http] requested buffer size %lu (bytes) needs to be greater than "
+ "minimum size allowed %lu (bytes)",
+ size, c->resp.data_size_max);
+ return -1;
+ }
+
+ c->resp.data_size_max = size;
+ return 0;
+}
+
+size_t flb_http_buffer_available(struct flb_http_client *c)
+{
+ return (c->resp.data_size - c->resp.data_len);
+}
+
+/*
+ * Increase the read buffer size based on the limits set by default or manually
+ * through the flb_http_buffer_size() function.
+ *
+ * The parameter 'size' is the amount of extra memory requested.
+ */
+int flb_http_buffer_increase(struct flb_http_client *c, size_t size,
+ size_t *out_size)
+{
+ int off_payload = 0;
+ int off_headers_end = 0;
+ int off_chunk_processed_end = 0;
+ char *tmp;
+ size_t new_size;
+ size_t allocated;
+
+ *out_size = 0;
+ new_size = c->resp.data_size + size;
+
+ /* Limit exceeded, adjust */
+ if (c->resp.data_size_max != 0) {
+ if (new_size > c->resp.data_size_max) {
+ new_size = c->resp.data_size_max;
+ if (new_size <= c->resp.data_size) {
+ /* Can't expand the buffer any further. */
+ return -1;
+ }
+ }
+ }
+
+
+ if (c->resp.headers_end) {
+ off_headers_end = c->resp.headers_end - c->resp.data;
+ }
+ if (c->resp.chunk_processed_end) {
+ off_chunk_processed_end = c->resp.chunk_processed_end - c->resp.data;
+ }
+
+ /*
+ * The payload is a reference to a position of 'data' buffer,
+ * we need to adjust the pointer after a memory buffer size change.
+ */
+ if (c->resp.payload_size > 0) {
+ off_payload = c->resp.payload - c->resp.data;
+ }
+
+ tmp = flb_realloc(c->resp.data, new_size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ else {
+ allocated = new_size - c->resp.data_size;
+ c->resp.data = tmp;
+ c->resp.data_size = new_size;
+
+ if (off_headers_end > 0) {
+ c->resp.headers_end = c->resp.data + off_headers_end;
+ }
+ if (off_chunk_processed_end > 0) {
+ c->resp.chunk_processed_end = c->resp.data + off_chunk_processed_end;
+ }
+ if (off_payload > 0) {
+ c->resp.payload = c->resp.data + off_payload;
+ }
+ }
+
+ *out_size = allocated;
+ return 0;
+}
+
+
+/* Append a custom HTTP header to the request */
+int flb_http_add_header(struct flb_http_client *c,
+ const char *key, size_t key_len,
+ const char *val, size_t val_len)
+{
+ struct flb_kv *kv;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ if (key_len < 1 || val_len < 1) {
+ return -1;
+ }
+
+ /* Check any previous header to avoid duplicates */
+ if (c->allow_dup_headers == FLB_FALSE) {
+ mk_list_foreach_safe(head, tmp, &c->headers) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ if (flb_sds_casecmp(kv->key, key, key_len) == 0) {
+ /* the header already exists, remove it */
+ flb_kv_item_destroy(kv);
+ break;
+ }
+ }
+ }
+
+ /* register new header in the temporal kv list */
+ kv = flb_kv_item_create_len(&c->headers,
+ (char *) key, key_len, (char *) val, val_len);
+ if (!kv) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * flb_http_get_header looks up a first value of request header.
+ * The return value should be destroyed after using.
+ * The return value is NULL, if the value is not found.
+ */
+flb_sds_t flb_http_get_header(struct flb_http_client *c,
+ const char *key, size_t key_len)
+{
+ flb_sds_t ret_str;
+ struct flb_kv *kv;
+ struct mk_list *head = NULL;
+ struct mk_list *tmp = NULL;
+
+ mk_list_foreach_safe(head, tmp, &c->headers) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+ if (flb_sds_casecmp(kv->key, key, key_len) == 0) {
+ ret_str = flb_sds_create(kv->val);
+ return ret_str;
+ }
+ }
+
+ return NULL;
+}
+
+static int http_header_push(struct flb_http_client *c, struct flb_kv *header)
+{
+ char *tmp;
+ const char *key;
+ const char *val;
+ size_t key_len;
+ size_t val_len;
+ size_t required;
+ size_t new_size;
+
+ key = header->key;
+ key_len = flb_sds_len(header->key);
+ val = header->val;
+ val_len = flb_sds_len(header->val);
+
+ /*
+ * The new header will need enough space in the buffer:
+ *
+ * key : length of the key
+ * separator: ': ' (2 bytes)
+ * val : length of the key value
+ * CRLF : '\r\n' (2 bytes)
+ */
+ required = key_len + 2 + val_len + 2;
+
+ if (header_available(c, required) != 0) {
+ if (required < 512) {
+ new_size = c->header_size + 512;
+ }
+ else {
+ new_size = c->header_size + required;
+ }
+ tmp = flb_realloc(c->header_buf, new_size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ c->header_buf = tmp;
+ c->header_size = new_size;
+ }
+
+ /* append the header key */
+ memcpy(c->header_buf + c->header_len,
+ key, key_len);
+ c->header_len += key_len;
+
+ /* append the separator */
+ c->header_buf[c->header_len++] = ':';
+ c->header_buf[c->header_len++] = ' ';
+
+ /* append the header value */
+ memcpy(c->header_buf + c->header_len,
+ val, val_len);
+ c->header_len += val_len;
+
+ /* Append the ending header CRLF */
+ c->header_buf[c->header_len++] = '\r';
+ c->header_buf[c->header_len++] = '\n';
+
+ return 0;
+}
+
+static int http_headers_compose(struct flb_http_client *c)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_kv *header;
+
+ /* Push header list to one buffer */
+ mk_list_foreach(head, &c->headers) {
+ header = mk_list_entry(head, struct flb_kv, _head);
+ ret = http_header_push(c, header);
+ if (ret != 0) {
+ flb_error("[http_client] cannot compose request headers");
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static void http_headers_destroy(struct flb_http_client *c)
+{
+ flb_kv_release(&c->headers);
+}
+
+int flb_http_set_keepalive(struct flb_http_client *c)
+{
+ /* check if 'keepalive' mode is enabled in the Upstream connection */
+ if (flb_stream_is_keepalive(c->u_conn->stream)) {
+ return -1;
+ }
+
+ /* append header */
+ return flb_http_add_header(c,
+ FLB_HTTP_HEADER_CONNECTION,
+ sizeof(FLB_HTTP_HEADER_CONNECTION) - 1,
+ FLB_HTTP_HEADER_KA,
+ sizeof(FLB_HTTP_HEADER_KA) - 1);
+}
+
+/* Adds a header specifying that the payload is compressed with gzip */
+int flb_http_set_content_encoding_gzip(struct flb_http_client *c)
+{
+ int ret;
+
+ ret = flb_http_add_header(c,
+ FLB_HTTP_HEADER_CONTENT_ENCODING,
+ sizeof(FLB_HTTP_HEADER_CONTENT_ENCODING) - 1,
+ "gzip", 4);
+ return ret;
+}
+
+int flb_http_set_callback_context(struct flb_http_client *c,
+ struct flb_callback *cb_ctx)
+{
+ c->cb_ctx = cb_ctx;
+ return 0;
+}
+
+int flb_http_add_auth_header(struct flb_http_client *c,
+ const char *user, const char *passwd, const char *header) {
+ int ret;
+ int len_u;
+ int len_p;
+ int len_h;
+ int len_out;
+ char tmp[1024];
+ char *p;
+ size_t b64_len;
+
+ /*
+ * We allow a max of 255 bytes for user and password (255 each), meaning
+ * we need at least:
+ *
+ * 'Basic base64(user : passwd)' => ~688 bytes
+ *
+ */
+
+ len_u = strlen(user);
+
+ if (passwd) {
+ len_p = strlen(passwd);
+ }
+ else {
+ len_p = 0;
+ }
+
+ p = flb_malloc(len_u + len_p + 2);
+ if (!p) {
+ flb_errno();
+ return -1;
+ }
+
+ memcpy(p, user, len_u);
+ p[len_u] = ':';
+ len_out = len_u + 1;
+
+ if (passwd) {
+ memcpy(p + len_out, passwd, len_p);
+ len_out += len_p;
+ }
+ p[len_out] = '\0';
+
+ memcpy(tmp, "Basic ", 6);
+ ret = flb_base64_encode((unsigned char *) tmp + 6, sizeof(tmp) - 7, &b64_len,
+ (unsigned char *) p, len_out);
+ if (ret != 0) {
+ flb_free(p);
+ return -1;
+ }
+
+ flb_free(p);
+ b64_len += 6;
+
+ len_h = strlen(header);
+ ret = flb_http_add_header(c,
+ header,
+ len_h,
+ tmp, b64_len);
+ return ret;
+}
+
+int flb_http_basic_auth(struct flb_http_client *c,
+ const char *user, const char *passwd)
+{
+ return flb_http_add_auth_header(c, user, passwd, FLB_HTTP_HEADER_AUTH);
+}
+
+int flb_http_proxy_auth(struct flb_http_client *c,
+ const char *user, const char *passwd)
+{
+ return flb_http_add_auth_header(c, user, passwd, FLB_HTTP_HEADER_PROXY_AUTH);
+}
+
+int flb_http_bearer_auth(struct flb_http_client *c, const char *token)
+{
+ flb_sds_t header_buffer;
+ flb_sds_t header_line;
+ int result;
+
+ result = -1;
+
+ if (token == NULL) {
+ token = "";
+
+ /* Shouldn't we log this and return instead of sending
+ * a malformed value?
+ */
+ }
+
+ header_buffer = flb_sds_create_size(strlen(token) + 64);
+
+ if (header_buffer == NULL) {
+ return -1;
+ }
+
+ header_line = flb_sds_printf(&header_buffer, "Bearer %s", token);
+
+ if (header_line != NULL) {
+ result = flb_http_add_header(c,
+ FLB_HTTP_HEADER_AUTH,
+ strlen(FLB_HTTP_HEADER_AUTH),
+ header_line,
+ flb_sds_len(header_line));
+ }
+
+ flb_sds_destroy(header_buffer);
+
+ return result;
+}
+
+
+int flb_http_do(struct flb_http_client *c, size_t *bytes)
+{
+ int ret;
+ int r_bytes;
+ int crlf = 2;
+ int new_size;
+ ssize_t available;
+ size_t out_size;
+ size_t bytes_header = 0;
+ size_t bytes_body = 0;
+ char *tmp;
+
+ /* Append pending headers */
+ ret = http_headers_compose(c);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* check enough space for the ending CRLF */
+ if (header_available(c, crlf) != 0) {
+ new_size = c->header_size + 2;
+ tmp = flb_realloc(c->header_buf, new_size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ c->header_buf = tmp;
+ c->header_size = new_size;
+ }
+
+ /* Append the ending header CRLF */
+ c->header_buf[c->header_len++] = '\r';
+ c->header_buf[c->header_len++] = '\n';
+
+#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
+ /* debug: request_headers callback */
+ flb_http_client_debug_cb(c, "_debug.http.request_headers");
+
+ /* debug: request_payload callback */
+ if (c->body_len > 0) {
+ flb_http_client_debug_cb(c, "_debug.http.request_payload");
+ }
+#endif
+
+ /* Write the header */
+ ret = flb_io_net_write(c->u_conn,
+ c->header_buf, c->header_len,
+ &bytes_header);
+ if (ret == -1) {
+ /* errno might be changed from the original call */
+ if (errno != 0) {
+ flb_errno();
+ }
+ return -1;
+ }
+
+ if (c->body_len > 0) {
+ ret = flb_io_net_write(c->u_conn,
+ c->body_buf, c->body_len,
+ &bytes_body);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ /* number of sent bytes */
+ *bytes = (bytes_header + bytes_body);
+
+ /* Read the server response, we need at least 19 bytes */
+ c->resp.data_len = 0;
+ while (1) {
+ available = flb_http_buffer_available(c) - 1;
+ if (available <= 1) {
+ /*
+ * If there is no more space available on our buffer, try to
+ * increase it.
+ */
+ ret = flb_http_buffer_increase(c, FLB_HTTP_DATA_CHUNK,
+ &out_size);
+ if (ret == -1) {
+ /*
+ * We could not allocate more space, let the caller handle
+ * this.
+ */
+ flb_warn("[http_client] cannot increase buffer: current=%zu "
+ "requested=%zu max=%zu", c->resp.data_size,
+ c->resp.data_size + FLB_HTTP_DATA_CHUNK,
+ c->resp.data_size_max);
+ flb_upstream_conn_recycle(c->u_conn, FLB_FALSE);
+ return 0;
+ }
+ available = flb_http_buffer_available(c) - 1;
+ }
+
+ r_bytes = flb_io_net_read(c->u_conn,
+ c->resp.data + c->resp.data_len,
+ available);
+ if (r_bytes <= 0) {
+ if (c->flags & FLB_HTTP_10) {
+ break;
+ }
+ }
+
+ /* Always append a NULL byte */
+ if (r_bytes >= 0) {
+ c->resp.data_len += r_bytes;
+ c->resp.data[c->resp.data_len] = '\0';
+
+ ret = process_data(c);
+ if (ret == FLB_HTTP_ERROR) {
+ flb_warn("[http_client] malformed HTTP response from %s:%i on "
+ "connection #%i",
+ c->u_conn->upstream->tcp_host,
+ c->u_conn->upstream->tcp_port,
+ c->u_conn->fd);
+ return -1;
+ }
+ else if (ret == FLB_HTTP_OK) {
+ break;
+ }
+ else if (ret == FLB_HTTP_MORE) {
+ continue;
+ }
+ }
+ else {
+ flb_error("[http_client] broken connection to %s:%i ?",
+ c->u_conn->upstream->tcp_host,
+ c->u_conn->upstream->tcp_port);
+ return -1;
+ }
+ }
+
+ /* Check 'Connection' response header */
+ ret = check_connection(c);
+ if (ret == FLB_HTTP_OK) {
+ /*
+ * If the server replied that the connection will be closed
+ * and our Upstream connection is in keepalive mode, we must
+ * inactivate the connection.
+ */
+ if (c->resp.connection_close == FLB_TRUE) {
+ /* Do not recycle the connection (no more keepalive) */
+ flb_upstream_conn_recycle(c->u_conn, FLB_FALSE);
+ flb_debug("[http_client] server %s:%i will close connection #%i",
+ c->u_conn->upstream->tcp_host,
+ c->u_conn->upstream->tcp_port,
+ c->u_conn->fd);
+ }
+ }
+
+#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
+ flb_http_client_debug_cb(c, "_debug.http.response_headers");
+ if (c->resp.payload_size > 0) {
+ flb_http_client_debug_cb(c, "_debug.http.response_payload");
+ }
+#endif
+
+ return 0;
+}
+
+/*
+ * flb_http_client_proxy_connect opens a tunnel to a proxy server via
+ * http `CONNECT` method. This is needed for https traffic through a
+ * http proxy.
+ * More: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT
+ */
+int flb_http_client_proxy_connect(struct flb_connection *u_conn)
+{
+ struct flb_upstream *u = u_conn->upstream;
+ struct flb_http_client *c;
+ size_t b_sent;
+ int ret = -1;
+
+ /* Don't pass proxy when using FLB_HTTP_CONNECT */
+ flb_debug("[upstream] establishing http tunneling to proxy: host %s port %d", u->tcp_host, u->tcp_port);
+ c = flb_http_client(u_conn, FLB_HTTP_CONNECT, "", NULL,
+ 0, u->proxied_host, u->proxied_port, NULL, 0);
+
+ /* Setup proxy's username and password */
+ if (u->proxy_username && u->proxy_password) {
+ flb_debug("[upstream] proxy uses username %s password %s", u->proxy_username, u->proxy_password);
+ flb_http_proxy_auth(c, u->proxy_username, u->proxy_password);
+ }
+
+ flb_http_buffer_size(c, 4192);
+
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+
+ /* Send HTTP request */
+ ret = flb_http_do(c, &b_sent);
+
+ /* Validate HTTP response */
+ if (ret != 0) {
+ flb_error("[upstream] error in flb_establish_proxy: %d", ret);
+ ret = -1;
+ }
+ else {
+ /* The request was issued successfully, validate the 'error' field */
+ flb_debug("[upstream] proxy returned %d", c->resp.status);
+ if (c->resp.status == 200) {
+ ret = 0;
+ }
+ else {
+ flb_error("flb_establish_proxy error: %s", c->resp.payload);
+ ret = -1;
+ }
+ }
+
+ /* Cleanup */
+ flb_http_client_destroy(c);
+
+ return ret;
+}
+
+void flb_http_client_destroy(struct flb_http_client *c)
+{
+ http_headers_destroy(c);
+ flb_free(c->resp.data);
+ flb_free(c->header_buf);
+ flb_free((void *)c->proxy.host);
+ flb_free(c);
+}