diff options
Diffstat (limited to '')
-rw-r--r-- | fluent-bit/src/flb_http_client.c | 1399 |
1 files changed, 1399 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_http_client.c b/fluent-bit/src/flb_http_client.c new file mode 100644 index 00000000..2d280329 --- /dev/null +++ b/fluent-bit/src/flb_http_client.c @@ -0,0 +1,1399 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This is a very simple HTTP Client interface which aims to provide an + * easy way to issue HTTP requests and handle reponses from the input/output + * plugins. + * + * It scope is: + * + * - Use upstream connections. + * - Support 'retry' in case the HTTP server timeouts a connection. + * - Get return Status, Headers and Body content if found. + * - If Upstream supports keepalive, adjust headers + */ + +#define _GNU_SOURCE +#include <string.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_http_client_debug.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_base64.h> + + + +void flb_http_client_debug(struct flb_http_client *c, + struct flb_callback *cb_ctx) +{ +#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG + if (cb_ctx) { + flb_http_client_debug_enable(c, cb_ctx); + } +#endif +} + +/* + * Removes the port from the host header + */ +int flb_http_strip_port_from_host(struct flb_http_client *c) +{ + struct mk_list *head; + struct flb_kv *kv; + char *out_host; + struct flb_upstream *u; + + u = c->u_conn->upstream; + + if (!c->host) { + if (!u->proxied_host) { + out_host = u->tcp_host; + } else { + out_host = u->proxied_host; + } + } else { + out_host = (char *) c->host; + } + + mk_list_foreach(head, &c->headers) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (strcasecmp("Host", kv->key) == 0) { + flb_sds_destroy(kv->val); + kv->val = NULL; + kv->val = flb_sds_create(out_host); + if (!kv->val) { + flb_errno(); + return -1; + } + return 0; + } + } + + return -1; +} + +int flb_http_allow_duplicated_headers(struct flb_http_client *c, int allow) +{ + if (allow != FLB_TRUE && allow != FLB_FALSE) { + return -1; + } + + c->allow_dup_headers = allow; + return 0; +} + +/* check if there is enough space in the client header buffer */ +static int header_available(struct flb_http_client *c, int bytes) +{ + int available; + + available = c->header_size - c->header_len; + if (available < bytes) { + return -1; + } + + return 0; +} + +/* Try to find a header value in the buffer */ +static int header_lookup(struct flb_http_client *c, + const char *header, int header_len, + const char **out_val, int *out_len) +{ + char *p; + char *crlf; + char *end; + + if (!c->resp.data) { + return FLB_HTTP_MORE; + } + + /* Lookup the beginning of the header */ + p = strcasestr(c->resp.data, header); + end = strstr(c->resp.data, "\r\n\r\n"); + if (!p) { + if (end) { + /* The headers are complete but the header is not there */ + return FLB_HTTP_NOT_FOUND; + } + + /* We need more data */ + return FLB_HTTP_MORE; + } + + /* Exclude matches in the body */ + if (end && p > end) { + return FLB_HTTP_NOT_FOUND; + } + + /* Lookup CRLF (end of line \r\n) */ + crlf = strstr(p, "\r\n"); + if (!crlf) { + return FLB_HTTP_MORE; + } + + p += header_len; + + *out_val = p; + *out_len = (crlf - p); + + return FLB_HTTP_OK; +} + +/* HTTP/1.1: Check if we have a Chunked Transfer Encoding */ +static int check_chunked_encoding(struct flb_http_client *c) +{ + int ret; + int len; + const char *header = NULL; + + ret = header_lookup(c, "Transfer-Encoding: ", 19, + &header, &len); + if (ret == FLB_HTTP_NOT_FOUND) { + /* If the header is missing, this is fine */ + c->resp.chunked_encoding = FLB_FALSE; + return FLB_HTTP_OK; + } + else if (ret == FLB_HTTP_MORE) { + return FLB_HTTP_MORE; + } + + if (strncasecmp(header, "chunked", len) == 0) { + c->resp.chunked_encoding = FLB_TRUE; + } + + return FLB_HTTP_OK; +} + +/* Check response for a 'Content-Length' header */ +static int check_content_length(struct flb_http_client *c) +{ + int ret; + int len; + const char *header; + char tmp[256]; + + if (c->resp.status == 204) { + c->resp.content_length = -1; + return FLB_HTTP_OK; + } + + ret = header_lookup(c, "Content-Length: ", 16, + &header, &len); + if (ret == FLB_HTTP_MORE) { + return FLB_HTTP_MORE; + } + else if (ret == FLB_HTTP_NOT_FOUND) { + return FLB_HTTP_NOT_FOUND; + } + + if (len > sizeof(tmp) - 1) { + /* Value too long */ + return FLB_HTTP_ERROR; + } + + /* Copy to temporary buffer */ + memcpy(tmp, header, len); + tmp[len] = '\0'; + + c->resp.content_length = atoi(tmp); + return FLB_HTTP_OK; +} + +/* Check response for a 'Connection' header */ +static int check_connection(struct flb_http_client *c) +{ + int ret; + int len; + const char *header; + char *buf; + + ret = header_lookup(c, "Connection: ", 12, + &header, &len); + if (ret == FLB_HTTP_NOT_FOUND) { + return FLB_HTTP_NOT_FOUND; + } + else if (ret == FLB_HTTP_MORE) { + return FLB_HTTP_MORE; + } + + buf = flb_malloc(len + 1); + if (!buf) { + flb_errno(); + return -1; + } + + memcpy(buf, header, len); + buf[len] = '\0'; + + if (strncasecmp(buf, "close", 5) == 0) { + c->resp.connection_close = FLB_TRUE; + } + else if (strcasestr(buf, "keep-alive")) { + c->resp.connection_close = FLB_FALSE; + } + flb_free(buf); + return FLB_HTTP_OK; + +} + +static inline void consume_bytes(char *buf, int bytes, int length) +{ + memmove(buf, buf + bytes, length - bytes); +} + +static int process_chunked_data(struct flb_http_client *c) +{ + long len; + long drop; + long val; + char *p; + char tmp[32]; + struct flb_http_response *r = &c->resp; + + chunk_start: + p = strstr(r->chunk_processed_end, "\r\n"); + if (!p) { + return FLB_HTTP_MORE; + } + + /* Hexa string length */ + len = (p - r->chunk_processed_end); + if ((len > sizeof(tmp) - 1) || len == 0) { + return FLB_HTTP_ERROR; + } + p += 2; + + /* Copy hexa string to temporary buffer */ + memcpy(tmp, r->chunk_processed_end, len); + tmp[len] = '\0'; + + /* Convert hexa string to decimal */ + errno = 0; + val = strtol(tmp, NULL, 16); + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) + || (errno != 0 && val == 0)) { + flb_errno(); + return FLB_HTTP_ERROR; + } + if (val < 0) { + return FLB_HTTP_ERROR; + } + /* + * 'val' contains the expected number of bytes, check current lengths + * and do buffer adjustments. + * + * we do val + 2 because the chunk always ends with \r\n + */ + val += 2; + + /* Number of bytes after the Chunk header */ + len = r->data_len - (p - r->data); + if (len < val) { + return FLB_HTTP_MORE; + } + + /* From the current chunk we expect it ends with \r\n */ + if (p[val -2] != '\r' || p[val - 1] != '\n') { + return FLB_HTTP_ERROR; + } + + /* + * At this point we are just fine, the chunk is valid, next steps: + * + * 1. check possible last chunk + * 2. drop chunk header from the buffer + * 3. remove chunk ending \r\n + */ + + /* 1. Validate ending chunk */ + if (val - 2 == 0) { + /* + * For an ending chunk we expect: + * + * 0\r\n + * \r\n + * + * so at least we need 5 bytes in the buffer + */ + len = r->data_len - (r->chunk_processed_end - r->data); + if (len < 5) { + return FLB_HTTP_MORE; + } + + if (r->chunk_processed_end[3] != '\r' || + r->chunk_processed_end[4] != '\n') { + return FLB_HTTP_ERROR; + } + } + + /* 2. Drop chunk header */ + drop = (p - r->chunk_processed_end); + len = r->data_len - (r->chunk_processed_end - r->data); + consume_bytes(r->chunk_processed_end, drop, len); + r->data_len -= drop; + r->data[r->data_len] = '\0'; + + /* 3. Remove chunk ending \r\n */ + drop = 2; + r->chunk_processed_end += labs(val - 2); + len = r->data_len - (r->chunk_processed_end - r->data); + consume_bytes(r->chunk_processed_end, drop, len); + r->data_len -= drop; + + /* Always append a NULL byte */ + r->data[r->data_len] = '\0'; + + /* Is this the last chunk ? */ + if ((val - 2 == 0)) { + /* Update payload size */ + r->payload_size = r->data_len - (r->headers_end - r->data); + return FLB_HTTP_OK; + } + + /* If we have some remaining bytes, start over */ + len = r->data_len - (r->chunk_processed_end - r->data); + if (len > 0) { + goto chunk_start; + } + + return FLB_HTTP_MORE; +} + +static int process_data(struct flb_http_client *c) +{ + int ret; + char code[4]; + char *tmp; + + if (c->resp.data_len < 15) { + /* we need more data */ + return FLB_HTTP_MORE; + } + + /* HTTP response status */ + if (c->resp.status <= 0) { + memcpy(code, c->resp.data + 9, 3); + code[3] = '\0'; + c->resp.status = atoi(code); + if (c->resp.status < 100 || c->resp.status > 599) { + return FLB_HTTP_ERROR; + } + } + + /* Try to lookup content length */ + if (c->resp.content_length == -1 && c->resp.chunked_encoding == FLB_FALSE) { + ret = check_content_length(c); + if (ret == FLB_HTTP_ERROR) { + return FLB_HTTP_ERROR; + } + } + + /* Chunked encoding for HTTP/1.1 (no content length of course) */ + if ((c->flags & FLB_HTTP_11) && c->resp.content_length == -1) { + if (c->resp.chunked_encoding == FLB_FALSE) { + ret = check_chunked_encoding(c); + if (ret == FLB_HTTP_ERROR) { + return FLB_HTTP_ERROR; + } + } + } + + if (!c->resp.headers_end) { + tmp = strstr(c->resp.data, "\r\n\r\n"); + if (tmp) { + c->resp.headers_end = tmp + 4; + if (c->resp.chunked_encoding == FLB_TRUE) { + c->resp.chunk_processed_end = c->resp.headers_end; + } + + /* Mark the payload */ + if ((tmp - c->resp.data + 4) < c->resp.data_len) { + c->resp.payload = tmp += 4; + c->resp.payload_size = (c->resp.data_len - (tmp - c->resp.data)); + } + } + else { + return FLB_HTTP_MORE; + } + } + + /* Re-check if an ending exists, if so process payload if required */ + if (c->resp.headers_end) { + /* Mark the payload */ + if (!c->resp.payload && + c->resp.headers_end - c->resp.data < c->resp.data_len) { + c->resp.payload = c->resp.headers_end; + c->resp.payload_size = (c->resp.data_len - (c->resp.headers_end - c->resp.data)); + } + + if (c->resp.content_length >= 0) { + c->resp.payload_size = c->resp.data_len; + c->resp.payload_size -= (c->resp.headers_end - c->resp.data); + if (c->resp.payload_size >= c->resp.content_length) { + return FLB_HTTP_OK; + } + } + else if (c->resp.chunked_encoding == FLB_TRUE) { + ret = process_chunked_data(c); + if (ret == FLB_HTTP_ERROR) { + return FLB_HTTP_ERROR; + } + else if (ret == FLB_HTTP_OK) { + return FLB_HTTP_OK; + } + } + else { + return FLB_HTTP_OK; + } + } + else if (c->resp.headers_end && c->resp.content_length <= 0) { + return FLB_HTTP_OK; + } + + return FLB_HTTP_MORE; +} + +#if defined FLB_HAVE_TESTS_OSSFUZZ +int fuzz_process_data(struct flb_http_client *c); +int fuzz_process_data(struct flb_http_client *c) { + return process_data(c); +} + +int fuzz_check_connection(struct flb_http_client *c); +int fuzz_check_connection(struct flb_http_client *c) { + return check_connection(c); +} + +#endif + +static int proxy_parse(const char *proxy, struct flb_http_client *c) +{ + int len; + int port; + int off = 0; + const char *s; + const char *e; + const char *host; + + len = strlen(proxy); + if (len < 7) { + return -1; + } + + /* Protocol lookup */ + if (strncmp(proxy, "http://", 7) == 0) { + port = 80; + off = 7; + c->proxy.type = FLB_HTTP_PROXY_HTTP; + } + else if (strncmp(proxy, "https://", 8) == 0) { + port = 443; + off = 8; + c->proxy.type = FLB_HTTP_PROXY_HTTPS; + } + else { + return -1; + } + + /* Separate host/ip from port if any */ + s = proxy + off; + if (*s == '[') { + /* IPv6 address (RFC 3986) */ + e = strchr(++s, ']'); + if (!e) { + return -1; + } + host = strndup(s, e - s); + s = e + 1; + } else { + e = s; + while (!(*e == '\0' || *e == ':' || *e == '/')) { + ++e; + } + if (e == s) { + return -1; + } + host = strndup(s, e - s); + s = e; + } + if (*s == ':') { + port = atoi(++s); + } + + flb_trace("[http_client] proxy type=%i host=%s port=%i", + c->proxy.type, host, port); + + c->proxy.host = host; + c->proxy.port = port; + + return 0; +} + +static int add_host_and_content_length(struct flb_http_client *c) +{ + int len; + flb_sds_t tmp; + flb_sds_t host; + char *out_host; + int out_port; + size_t size; + struct flb_upstream *u = c->u_conn->upstream; + + if (!c->host) { + if (u->proxied_host) { + out_host = u->proxied_host; + } + else { + out_host = u->tcp_host; + } + } + else { + out_host = (char *) c->host; + } + + len = strlen(out_host); + host = flb_sds_create_size(len + 32); + if (!host) { + flb_error("[http_client] cannot create temporal buffer"); + return -1; + } + + if (c->port == 0) { + if (u->proxied_port != 0 ) { + out_port = u->proxied_port; + } + else { + out_port = u->tcp_port; + } + } + else { + out_port = c->port; + } + + if (c->flags & FLB_IO_TLS && out_port == 443) { + tmp = flb_sds_copy(host, out_host, strlen(out_host)); + } + else { + tmp = flb_sds_printf(&host, "%s:%i", out_host, out_port); + } + + if (!tmp) { + flb_sds_destroy(host); + flb_error("[http_client] cannot compose temporary host header"); + return -1; + } + host = tmp; + tmp = NULL; + + flb_http_add_header(c, "Host", 4, host, flb_sds_len(host)); + flb_sds_destroy(host); + + /* Content-Length */ + if (c->body_len >= 0) { + size = 32; + tmp = flb_malloc(size); + if (!tmp) { + flb_errno(); + return -1; + } + len = snprintf(tmp, size - 1, "%i", c->body_len); + flb_http_add_header(c, "Content-Length", 14, tmp, len); + flb_free(tmp); + } + + return 0; +} + +struct flb_http_client *flb_http_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + int ret; + char *p; + char *buf = NULL; + char *str_method = NULL; + char *fmt_plain = \ + "%s %s HTTP/1.%i\r\n"; + char *fmt_proxy = \ + "%s http://%s:%i%s HTTP/1.%i\r\n" + "Proxy-Connection: KeepAlive\r\n"; + // TODO: IPv6 should have the format of [ip]:port + char *fmt_connect = \ + "%s %s:%i HTTP/1.%i\r\n" + "Proxy-Connection: KeepAlive\r\n"; + + struct flb_http_client *c; + + switch (method) { + case FLB_HTTP_GET: + str_method = "GET"; + break; + case FLB_HTTP_POST: + str_method = "POST"; + break; + case FLB_HTTP_PUT: + str_method = "PUT"; + break; + case FLB_HTTP_HEAD: + str_method = "HEAD"; + break; + case FLB_HTTP_CONNECT: + str_method = "CONNECT"; + break; + case FLB_HTTP_PATCH: + str_method = "PATCH"; + break; + }; + + buf = flb_calloc(1, FLB_HTTP_BUF_SIZE); + if (!buf) { + flb_errno(); + return NULL; + } + + /* FIXME: handler for HTTPS proxy */ + if (proxy) { + flb_debug("[http_client] using http_proxy %s for header", proxy); + ret = snprintf(buf, FLB_HTTP_BUF_SIZE, + fmt_proxy, + str_method, + host, + port, + uri, + flags & FLB_HTTP_10 ? 0 : 1); + } + else if (method == FLB_HTTP_CONNECT) { + flb_debug("[http_client] using HTTP CONNECT for proxy: proxy host %s, proxy port %i", host, port); + ret = snprintf(buf, FLB_HTTP_BUF_SIZE, + fmt_connect, + str_method, + host, + port, + flags & FLB_HTTP_10 ? 0 : 1); + } + else { + flb_debug("[http_client] not using http_proxy for header"); + ret = snprintf(buf, FLB_HTTP_BUF_SIZE, + fmt_plain, + str_method, + uri, + flags & FLB_HTTP_10 ? 0 : 1); + } + + if (ret == -1) { + flb_errno(); + flb_free(buf); + return NULL; + } + + c = flb_calloc(1, sizeof(struct flb_http_client)); + if (!c) { + flb_free(buf); + return NULL; + } + + c->u_conn = u_conn; + c->method = method; + c->uri = uri; + c->host = host; + c->port = port; + c->header_buf = buf; + c->header_size = FLB_HTTP_BUF_SIZE; + c->header_len = ret; + c->flags = flags; + c->allow_dup_headers = FLB_TRUE; + mk_list_init(&c->headers); + + /* Check if we have a query string */ + p = strchr(uri, '?'); + if (p) { + p++; + c->query_string = p; + } + + /* Is Upstream connection using keepalive mode ? */ + if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { + c->flags |= FLB_HTTP_KA; + } + + /* Response */ + c->resp.content_length = -1; + c->resp.connection_close = -1; + + if ((flags & FLB_HTTP_10) == 0) { + c->flags |= FLB_HTTP_11; + } + + if (body && body_len > 0) { + c->body_buf = body; + c->body_len = body_len; + } + + add_host_and_content_length(c); + + /* Check proxy data */ + if (proxy) { + flb_debug("[http_client] Using http_proxy: %s", proxy); + ret = proxy_parse(proxy, c); + if (ret != 0) { + flb_debug("[http_client] Something wrong with the http_proxy parsing"); + flb_http_client_destroy(c); + return NULL; + } + } + + /* 'Read' buffer size */ + c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); + if (!c->resp.data) { + flb_errno(); + flb_http_client_destroy(c); + return NULL; + } + c->resp.data[0] = '\0'; + c->resp.data_len = 0; + c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; + c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; + + return c; +} + +/* + * By default the HTTP client have a fixed buffer to read a response for a + * simple request. But in certain situations the caller might expect a + * larger response that exceed the buffer limit. + * + * This function allows to set a maximum buffer size for the client + * response where: + * + * 1. size = 0 no limit, read as much as possible. + * 2. size = N: specific limit, upon reach limit discard data (default: 4KB) + */ +int flb_http_buffer_size(struct flb_http_client *c, size_t size) +{ + if (size < c->resp.data_size_max && size != 0) { + flb_error("[http] requested buffer size %lu (bytes) needs to be greater than " + "minimum size allowed %lu (bytes)", + size, c->resp.data_size_max); + return -1; + } + + c->resp.data_size_max = size; + return 0; +} + +size_t flb_http_buffer_available(struct flb_http_client *c) +{ + return (c->resp.data_size - c->resp.data_len); +} + +/* + * Increase the read buffer size based on the limits set by default or manually + * through the flb_http_buffer_size() function. + * + * The parameter 'size' is the amount of extra memory requested. + */ +int flb_http_buffer_increase(struct flb_http_client *c, size_t size, + size_t *out_size) +{ + int off_payload = 0; + int off_headers_end = 0; + int off_chunk_processed_end = 0; + char *tmp; + size_t new_size; + size_t allocated; + + *out_size = 0; + new_size = c->resp.data_size + size; + + /* Limit exceeded, adjust */ + if (c->resp.data_size_max != 0) { + if (new_size > c->resp.data_size_max) { + new_size = c->resp.data_size_max; + if (new_size <= c->resp.data_size) { + /* Can't expand the buffer any further. */ + return -1; + } + } + } + + + if (c->resp.headers_end) { + off_headers_end = c->resp.headers_end - c->resp.data; + } + if (c->resp.chunk_processed_end) { + off_chunk_processed_end = c->resp.chunk_processed_end - c->resp.data; + } + + /* + * The payload is a reference to a position of 'data' buffer, + * we need to adjust the pointer after a memory buffer size change. + */ + if (c->resp.payload_size > 0) { + off_payload = c->resp.payload - c->resp.data; + } + + tmp = flb_realloc(c->resp.data, new_size); + if (!tmp) { + flb_errno(); + return -1; + } + else { + allocated = new_size - c->resp.data_size; + c->resp.data = tmp; + c->resp.data_size = new_size; + + if (off_headers_end > 0) { + c->resp.headers_end = c->resp.data + off_headers_end; + } + if (off_chunk_processed_end > 0) { + c->resp.chunk_processed_end = c->resp.data + off_chunk_processed_end; + } + if (off_payload > 0) { + c->resp.payload = c->resp.data + off_payload; + } + } + + *out_size = allocated; + return 0; +} + + +/* Append a custom HTTP header to the request */ +int flb_http_add_header(struct flb_http_client *c, + const char *key, size_t key_len, + const char *val, size_t val_len) +{ + struct flb_kv *kv; + struct mk_list *tmp; + struct mk_list *head; + + if (key_len < 1 || val_len < 1) { + return -1; + } + + /* Check any previous header to avoid duplicates */ + if (c->allow_dup_headers == FLB_FALSE) { + mk_list_foreach_safe(head, tmp, &c->headers) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (flb_sds_casecmp(kv->key, key, key_len) == 0) { + /* the header already exists, remove it */ + flb_kv_item_destroy(kv); + break; + } + } + } + + /* register new header in the temporal kv list */ + kv = flb_kv_item_create_len(&c->headers, + (char *) key, key_len, (char *) val, val_len); + if (!kv) { + return -1; + } + + return 0; +} + +/* + * flb_http_get_header looks up a first value of request header. + * The return value should be destroyed after using. + * The return value is NULL, if the value is not found. + */ +flb_sds_t flb_http_get_header(struct flb_http_client *c, + const char *key, size_t key_len) +{ + flb_sds_t ret_str; + struct flb_kv *kv; + struct mk_list *head = NULL; + struct mk_list *tmp = NULL; + + mk_list_foreach_safe(head, tmp, &c->headers) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (flb_sds_casecmp(kv->key, key, key_len) == 0) { + ret_str = flb_sds_create(kv->val); + return ret_str; + } + } + + return NULL; +} + +static int http_header_push(struct flb_http_client *c, struct flb_kv *header) +{ + char *tmp; + const char *key; + const char *val; + size_t key_len; + size_t val_len; + size_t required; + size_t new_size; + + key = header->key; + key_len = flb_sds_len(header->key); + val = header->val; + val_len = flb_sds_len(header->val); + + /* + * The new header will need enough space in the buffer: + * + * key : length of the key + * separator: ': ' (2 bytes) + * val : length of the key value + * CRLF : '\r\n' (2 bytes) + */ + required = key_len + 2 + val_len + 2; + + if (header_available(c, required) != 0) { + if (required < 512) { + new_size = c->header_size + 512; + } + else { + new_size = c->header_size + required; + } + tmp = flb_realloc(c->header_buf, new_size); + if (!tmp) { + flb_errno(); + return -1; + } + c->header_buf = tmp; + c->header_size = new_size; + } + + /* append the header key */ + memcpy(c->header_buf + c->header_len, + key, key_len); + c->header_len += key_len; + + /* append the separator */ + c->header_buf[c->header_len++] = ':'; + c->header_buf[c->header_len++] = ' '; + + /* append the header value */ + memcpy(c->header_buf + c->header_len, + val, val_len); + c->header_len += val_len; + + /* Append the ending header CRLF */ + c->header_buf[c->header_len++] = '\r'; + c->header_buf[c->header_len++] = '\n'; + + return 0; +} + +static int http_headers_compose(struct flb_http_client *c) +{ + int ret; + struct mk_list *head; + struct flb_kv *header; + + /* Push header list to one buffer */ + mk_list_foreach(head, &c->headers) { + header = mk_list_entry(head, struct flb_kv, _head); + ret = http_header_push(c, header); + if (ret != 0) { + flb_error("[http_client] cannot compose request headers"); + return -1; + } + } + + return 0; +} + +static void http_headers_destroy(struct flb_http_client *c) +{ + flb_kv_release(&c->headers); +} + +int flb_http_set_keepalive(struct flb_http_client *c) +{ + /* check if 'keepalive' mode is enabled in the Upstream connection */ + if (flb_stream_is_keepalive(c->u_conn->stream)) { + return -1; + } + + /* append header */ + return flb_http_add_header(c, + FLB_HTTP_HEADER_CONNECTION, + sizeof(FLB_HTTP_HEADER_CONNECTION) - 1, + FLB_HTTP_HEADER_KA, + sizeof(FLB_HTTP_HEADER_KA) - 1); +} + +/* Adds a header specifying that the payload is compressed with gzip */ +int flb_http_set_content_encoding_gzip(struct flb_http_client *c) +{ + int ret; + + ret = flb_http_add_header(c, + FLB_HTTP_HEADER_CONTENT_ENCODING, + sizeof(FLB_HTTP_HEADER_CONTENT_ENCODING) - 1, + "gzip", 4); + return ret; +} + +int flb_http_set_callback_context(struct flb_http_client *c, + struct flb_callback *cb_ctx) +{ + c->cb_ctx = cb_ctx; + return 0; +} + +int flb_http_add_auth_header(struct flb_http_client *c, + const char *user, const char *passwd, const char *header) { + int ret; + int len_u; + int len_p; + int len_h; + int len_out; + char tmp[1024]; + char *p; + size_t b64_len; + + /* + * We allow a max of 255 bytes for user and password (255 each), meaning + * we need at least: + * + * 'Basic base64(user : passwd)' => ~688 bytes + * + */ + + len_u = strlen(user); + + if (passwd) { + len_p = strlen(passwd); + } + else { + len_p = 0; + } + + p = flb_malloc(len_u + len_p + 2); + if (!p) { + flb_errno(); + return -1; + } + + memcpy(p, user, len_u); + p[len_u] = ':'; + len_out = len_u + 1; + + if (passwd) { + memcpy(p + len_out, passwd, len_p); + len_out += len_p; + } + p[len_out] = '\0'; + + memcpy(tmp, "Basic ", 6); + ret = flb_base64_encode((unsigned char *) tmp + 6, sizeof(tmp) - 7, &b64_len, + (unsigned char *) p, len_out); + if (ret != 0) { + flb_free(p); + return -1; + } + + flb_free(p); + b64_len += 6; + + len_h = strlen(header); + ret = flb_http_add_header(c, + header, + len_h, + tmp, b64_len); + return ret; +} + +int flb_http_basic_auth(struct flb_http_client *c, + const char *user, const char *passwd) +{ + return flb_http_add_auth_header(c, user, passwd, FLB_HTTP_HEADER_AUTH); +} + +int flb_http_proxy_auth(struct flb_http_client *c, + const char *user, const char *passwd) +{ + return flb_http_add_auth_header(c, user, passwd, FLB_HTTP_HEADER_PROXY_AUTH); +} + +int flb_http_bearer_auth(struct flb_http_client *c, const char *token) +{ + flb_sds_t header_buffer; + flb_sds_t header_line; + int result; + + result = -1; + + if (token == NULL) { + token = ""; + + /* Shouldn't we log this and return instead of sending + * a malformed value? + */ + } + + header_buffer = flb_sds_create_size(strlen(token) + 64); + + if (header_buffer == NULL) { + return -1; + } + + header_line = flb_sds_printf(&header_buffer, "Bearer %s", token); + + if (header_line != NULL) { + result = flb_http_add_header(c, + FLB_HTTP_HEADER_AUTH, + strlen(FLB_HTTP_HEADER_AUTH), + header_line, + flb_sds_len(header_line)); + } + + flb_sds_destroy(header_buffer); + + return result; +} + + +int flb_http_do(struct flb_http_client *c, size_t *bytes) +{ + int ret; + int r_bytes; + int crlf = 2; + int new_size; + ssize_t available; + size_t out_size; + size_t bytes_header = 0; + size_t bytes_body = 0; + char *tmp; + + /* Append pending headers */ + ret = http_headers_compose(c); + if (ret == -1) { + return -1; + } + + /* check enough space for the ending CRLF */ + if (header_available(c, crlf) != 0) { + new_size = c->header_size + 2; + tmp = flb_realloc(c->header_buf, new_size); + if (!tmp) { + flb_errno(); + return -1; + } + c->header_buf = tmp; + c->header_size = new_size; + } + + /* Append the ending header CRLF */ + c->header_buf[c->header_len++] = '\r'; + c->header_buf[c->header_len++] = '\n'; + +#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG + /* debug: request_headers callback */ + flb_http_client_debug_cb(c, "_debug.http.request_headers"); + + /* debug: request_payload callback */ + if (c->body_len > 0) { + flb_http_client_debug_cb(c, "_debug.http.request_payload"); + } +#endif + + /* Write the header */ + ret = flb_io_net_write(c->u_conn, + c->header_buf, c->header_len, + &bytes_header); + if (ret == -1) { + /* errno might be changed from the original call */ + if (errno != 0) { + flb_errno(); + } + return -1; + } + + if (c->body_len > 0) { + ret = flb_io_net_write(c->u_conn, + c->body_buf, c->body_len, + &bytes_body); + if (ret == -1) { + flb_errno(); + return -1; + } + } + + /* number of sent bytes */ + *bytes = (bytes_header + bytes_body); + + /* Read the server response, we need at least 19 bytes */ + c->resp.data_len = 0; + while (1) { + available = flb_http_buffer_available(c) - 1; + if (available <= 1) { + /* + * If there is no more space available on our buffer, try to + * increase it. + */ + ret = flb_http_buffer_increase(c, FLB_HTTP_DATA_CHUNK, + &out_size); + if (ret == -1) { + /* + * We could not allocate more space, let the caller handle + * this. + */ + flb_warn("[http_client] cannot increase buffer: current=%zu " + "requested=%zu max=%zu", c->resp.data_size, + c->resp.data_size + FLB_HTTP_DATA_CHUNK, + c->resp.data_size_max); + flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + return 0; + } + available = flb_http_buffer_available(c) - 1; + } + + r_bytes = flb_io_net_read(c->u_conn, + c->resp.data + c->resp.data_len, + available); + if (r_bytes <= 0) { + if (c->flags & FLB_HTTP_10) { + break; + } + } + + /* Always append a NULL byte */ + if (r_bytes >= 0) { + c->resp.data_len += r_bytes; + c->resp.data[c->resp.data_len] = '\0'; + + ret = process_data(c); + if (ret == FLB_HTTP_ERROR) { + flb_warn("[http_client] malformed HTTP response from %s:%i on " + "connection #%i", + c->u_conn->upstream->tcp_host, + c->u_conn->upstream->tcp_port, + c->u_conn->fd); + return -1; + } + else if (ret == FLB_HTTP_OK) { + break; + } + else if (ret == FLB_HTTP_MORE) { + continue; + } + } + else { + flb_error("[http_client] broken connection to %s:%i ?", + c->u_conn->upstream->tcp_host, + c->u_conn->upstream->tcp_port); + return -1; + } + } + + /* Check 'Connection' response header */ + ret = check_connection(c); + if (ret == FLB_HTTP_OK) { + /* + * If the server replied that the connection will be closed + * and our Upstream connection is in keepalive mode, we must + * inactivate the connection. + */ + if (c->resp.connection_close == FLB_TRUE) { + /* Do not recycle the connection (no more keepalive) */ + flb_upstream_conn_recycle(c->u_conn, FLB_FALSE); + flb_debug("[http_client] server %s:%i will close connection #%i", + c->u_conn->upstream->tcp_host, + c->u_conn->upstream->tcp_port, + c->u_conn->fd); + } + } + +#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG + flb_http_client_debug_cb(c, "_debug.http.response_headers"); + if (c->resp.payload_size > 0) { + flb_http_client_debug_cb(c, "_debug.http.response_payload"); + } +#endif + + return 0; +} + +/* + * flb_http_client_proxy_connect opens a tunnel to a proxy server via + * http `CONNECT` method. This is needed for https traffic through a + * http proxy. + * More: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT + */ +int flb_http_client_proxy_connect(struct flb_connection *u_conn) +{ + struct flb_upstream *u = u_conn->upstream; + struct flb_http_client *c; + size_t b_sent; + int ret = -1; + + /* Don't pass proxy when using FLB_HTTP_CONNECT */ + flb_debug("[upstream] establishing http tunneling to proxy: host %s port %d", u->tcp_host, u->tcp_port); + c = flb_http_client(u_conn, FLB_HTTP_CONNECT, "", NULL, + 0, u->proxied_host, u->proxied_port, NULL, 0); + + /* Setup proxy's username and password */ + if (u->proxy_username && u->proxy_password) { + flb_debug("[upstream] proxy uses username %s password %s", u->proxy_username, u->proxy_password); + flb_http_proxy_auth(c, u->proxy_username, u->proxy_password); + } + + flb_http_buffer_size(c, 4192); + + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + + /* Validate HTTP response */ + if (ret != 0) { + flb_error("[upstream] error in flb_establish_proxy: %d", ret); + ret = -1; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_debug("[upstream] proxy returned %d", c->resp.status); + if (c->resp.status == 200) { + ret = 0; + } + else { + flb_error("flb_establish_proxy error: %s", c->resp.payload); + ret = -1; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + + return ret; +} + +void flb_http_client_destroy(struct flb_http_client *c) +{ + http_headers_destroy(c); + flb_free(c->resp.data); + flb_free(c->header_buf); + flb_free((void *)c->proxy.host); + flb_free(c); +} |