summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdhttp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdhttp.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdhttp.c511
1 files changed, 511 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdhttp.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdhttp.c
new file mode 100644
index 00000000..7457a7fb
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdhttp.c
@@ -0,0 +1,511 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2021 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * @name HTTP client
+ *
+ */
+
+#include "rdkafka_int.h"
+#include "rdunittest.h"
+
+#include <stdarg.h>
+
+#include <curl/curl.h>
+#include "rdhttp.h"
+
+/** Maximum response size, increase as necessary. */
+#define RD_HTTP_RESPONSE_SIZE_MAX 1024 * 1024 * 500 /* 500kb */
+
+
+void rd_http_error_destroy(rd_http_error_t *herr) {
+ rd_free(herr);
+}
+
+static rd_http_error_t *rd_http_error_new(int code, const char *fmt, ...)
+ RD_FORMAT(printf, 2, 3);
+static rd_http_error_t *rd_http_error_new(int code, const char *fmt, ...) {
+ size_t len = 0;
+ rd_http_error_t *herr;
+ va_list ap;
+
+ va_start(ap, fmt);
+
+ if (fmt && *fmt) {
+ va_list ap2;
+ va_copy(ap2, ap);
+ len = rd_vsnprintf(NULL, 0, fmt, ap2);
+ va_end(ap2);
+ }
+
+ /* Use single allocation for both herr and the error string */
+ herr = rd_malloc(sizeof(*herr) + len + 1);
+ herr->code = code;
+ herr->errstr = herr->data;
+
+ if (len > 0)
+ rd_vsnprintf(herr->errstr, len + 1, fmt, ap);
+ else
+ herr->errstr[0] = '\0';
+
+ va_end(ap);
+
+ return herr;
+}
+
+/**
+ * @brief Same as rd_http_error_new() but reads the error string from the
+ * provided buffer.
+ */
+static rd_http_error_t *rd_http_error_new_from_buf(int code,
+ const rd_buf_t *rbuf) {
+ rd_http_error_t *herr;
+ rd_slice_t slice;
+ size_t len = rd_buf_len(rbuf);
+
+ if (len == 0)
+ return rd_http_error_new(
+ code, "Server did not provide an error string");
+
+
+ /* Use single allocation for both herr and the error string */
+ herr = rd_malloc(sizeof(*herr) + len + 1);
+ herr->code = code;
+ herr->errstr = herr->data;
+ rd_slice_init_full(&slice, rbuf);
+ rd_slice_read(&slice, herr->errstr, len);
+ herr->errstr[len] = '\0';
+
+ return herr;
+}
+
+void rd_http_req_destroy(rd_http_req_t *hreq) {
+ RD_IF_FREE(hreq->hreq_curl, curl_easy_cleanup);
+ RD_IF_FREE(hreq->hreq_buf, rd_buf_destroy_free);
+}
+
+
+/**
+ * @brief Curl writefunction. Writes the bytes passed from curl
+ * to the hreq's buffer.
+ */
+static size_t
+rd_http_req_write_cb(char *ptr, size_t size, size_t nmemb, void *userdata) {
+ rd_http_req_t *hreq = (rd_http_req_t *)userdata;
+
+ if (unlikely(rd_buf_len(hreq->hreq_buf) + nmemb >
+ RD_HTTP_RESPONSE_SIZE_MAX))
+ return 0; /* FIXME: Set some overflow flag or rely on curl? */
+
+ rd_buf_write(hreq->hreq_buf, ptr, nmemb);
+
+ return nmemb;
+}
+
+rd_http_error_t *rd_http_req_init(rd_http_req_t *hreq, const char *url) {
+
+ memset(hreq, 0, sizeof(*hreq));
+
+ hreq->hreq_curl = curl_easy_init();
+ if (!hreq->hreq_curl)
+ return rd_http_error_new(-1, "Failed to create curl handle");
+
+ hreq->hreq_buf = rd_buf_new(1, 1024);
+
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_URL, url);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_PROTOCOLS,
+ CURLPROTO_HTTP | CURLPROTO_HTTPS);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_MAXREDIRS, 16);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_TIMEOUT, 30);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_ERRORBUFFER,
+ hreq->hreq_curl_errstr);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_NOSIGNAL, 1);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_WRITEFUNCTION,
+ rd_http_req_write_cb);
+ curl_easy_setopt(hreq->hreq_curl, CURLOPT_WRITEDATA, (void *)hreq);
+
+ return NULL;
+}
+
+/**
+ * @brief Synchronously (blockingly) perform the HTTP operation.
+ */
+rd_http_error_t *rd_http_req_perform_sync(rd_http_req_t *hreq) {
+ CURLcode res;
+ long code = 0;
+
+ res = curl_easy_perform(hreq->hreq_curl);
+ if (unlikely(res != CURLE_OK))
+ return rd_http_error_new(-1, "%s", hreq->hreq_curl_errstr);
+
+ curl_easy_getinfo(hreq->hreq_curl, CURLINFO_RESPONSE_CODE, &code);
+ hreq->hreq_code = (int)code;
+ if (hreq->hreq_code >= 400)
+ return rd_http_error_new_from_buf(hreq->hreq_code,
+ hreq->hreq_buf);
+
+ return NULL;
+}
+
+
+int rd_http_req_get_code(const rd_http_req_t *hreq) {
+ return hreq->hreq_code;
+}
+
+const char *rd_http_req_get_content_type(rd_http_req_t *hreq) {
+ const char *content_type = NULL;
+
+ if (curl_easy_getinfo(hreq->hreq_curl, CURLINFO_CONTENT_TYPE,
+ &content_type))
+ return NULL;
+
+ return content_type;
+}
+
+
+/**
+ * @brief Perform a blocking HTTP(S) request to \p url.
+ *
+ * Returns the response (even if there's a HTTP error code returned)
+ * in \p *rbufp.
+ *
+ * Returns NULL on success (HTTP response code < 400), or an error
+ * object on transport or HTTP error - this error object must be destroyed
+ * by calling rd_http_error_destroy(). In case of HTTP error the \p *rbufp
+ * may be filled with the error response.
+ */
+rd_http_error_t *rd_http_get(const char *url, rd_buf_t **rbufp) {
+ rd_http_req_t hreq;
+ rd_http_error_t *herr;
+
+ *rbufp = NULL;
+
+ herr = rd_http_req_init(&hreq, url);
+ if (unlikely(herr != NULL))
+ return herr;
+
+ herr = rd_http_req_perform_sync(&hreq);
+ if (herr) {
+ rd_http_req_destroy(&hreq);
+ return herr;
+ }
+
+ *rbufp = hreq.hreq_buf;
+ hreq.hreq_buf = NULL;
+
+ return NULL;
+}
+
+
+/**
+ * @brief Extract the JSON object from \p hreq and return it in \p *jsonp.
+ *
+ * @returns Returns NULL on success, or an JSON parsing error - this
+ * error object must be destroyed by calling rd_http_error_destroy().
+ */
+rd_http_error_t *rd_http_parse_json(rd_http_req_t *hreq, cJSON **jsonp) {
+ size_t len;
+ char *raw_json;
+ const char *end = NULL;
+ rd_slice_t slice;
+ rd_http_error_t *herr = NULL;
+
+ /* cJSON requires the entire input to parse in contiguous memory. */
+ rd_slice_init_full(&slice, hreq->hreq_buf);
+ len = rd_buf_len(hreq->hreq_buf);
+
+ raw_json = rd_malloc(len + 1);
+ rd_slice_read(&slice, raw_json, len);
+ raw_json[len] = '\0';
+
+ /* Parse JSON */
+ *jsonp = cJSON_ParseWithOpts(raw_json, &end, 0);
+
+ if (!*jsonp)
+ herr = rd_http_error_new(hreq->hreq_code,
+ "Failed to parse JSON response "
+ "at %" PRIusz "/%" PRIusz,
+ (size_t)(end - raw_json), len);
+ rd_free(raw_json);
+ return herr;
+}
+
+
+/**
+ * @brief Check if the error returned from HTTP(S) is temporary or not.
+ *
+ * @returns If the \p error_code is temporary, return rd_true,
+ * otherwise return rd_false.
+ *
+ * @locality Any thread.
+ */
+static rd_bool_t rd_http_is_failure_temporary(int error_code) {
+ switch (error_code) {
+ case 408: /**< Request timeout */
+ case 425: /**< Too early */
+ case 500: /**< Internal server error */
+ case 502: /**< Bad gateway */
+ case 503: /**< Service unavailable */
+ case 504: /**< Gateway timeout */
+ return rd_true;
+
+ default:
+ return rd_false;
+ }
+}
+
+
+/**
+ * @brief Perform a blocking HTTP(S) request to \p url with
+ * HTTP(S) headers and data with \p timeout_s.
+ * If the HTTP(S) request fails, will retry another \p retries times
+ * with multiplying backoff \p retry_ms.
+ *
+ * @returns The result will be returned in \p *jsonp.
+ * Returns NULL on success (HTTP response code < 400), or an error
+ * object on transport, HTTP error or a JSON parsing error - this
+ * error object must be destroyed by calling rd_http_error_destroy().
+ *
+ * @locality Any thread.
+ */
+rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
+ const char *url,
+ const struct curl_slist *headers,
+ const char *post_fields,
+ size_t post_fields_size,
+ int timeout_s,
+ int retries,
+ int retry_ms,
+ cJSON **jsonp) {
+ rd_http_error_t *herr;
+ rd_http_req_t hreq;
+ int i;
+ size_t len;
+ const char *content_type;
+
+ herr = rd_http_req_init(&hreq, url);
+ if (unlikely(herr != NULL))
+ return herr;
+
+ curl_easy_setopt(hreq.hreq_curl, CURLOPT_HTTPHEADER, headers);
+ curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT, timeout_s);
+
+ curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDSIZE,
+ post_fields_size);
+ curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDS, post_fields);
+
+ for (i = 0; i <= retries; i++) {
+ if (rd_kafka_terminating(rk)) {
+ rd_http_req_destroy(&hreq);
+ return rd_http_error_new(-1, "Terminating");
+ }
+
+ herr = rd_http_req_perform_sync(&hreq);
+ len = rd_buf_len(hreq.hreq_buf);
+
+ if (!herr) {
+ if (len > 0)
+ break; /* Success */
+ /* Empty response */
+ rd_http_req_destroy(&hreq);
+ return NULL;
+ }
+ /* Retry if HTTP(S) request returns temporary error and there
+ * are remaining retries, else fail. */
+ if (i == retries || !rd_http_is_failure_temporary(herr->code)) {
+ rd_http_req_destroy(&hreq);
+ return herr;
+ }
+
+ /* Retry */
+ rd_http_error_destroy(herr);
+ rd_usleep(retry_ms * 1000 * (i + 1), &rk->rk_terminate);
+ }
+
+ content_type = rd_http_req_get_content_type(&hreq);
+
+ if (!content_type || rd_strncasecmp(content_type, "application/json",
+ strlen("application/json"))) {
+ if (!herr)
+ herr = rd_http_error_new(
+ hreq.hreq_code, "Response is not JSON encoded: %s",
+ content_type ? content_type : "(n/a)");
+ rd_http_req_destroy(&hreq);
+ return herr;
+ }
+
+ herr = rd_http_parse_json(&hreq, jsonp);
+
+ rd_http_req_destroy(&hreq);
+
+ return herr;
+}
+
+
+/**
+ * @brief Same as rd_http_get() but requires a JSON response.
+ * The response is parsed and a JSON object is returned in \p *jsonp.
+ *
+ * Same error semantics as rd_http_get().
+ */
+rd_http_error_t *rd_http_get_json(const char *url, cJSON **jsonp) {
+ rd_http_req_t hreq;
+ rd_http_error_t *herr;
+ rd_slice_t slice;
+ size_t len;
+ const char *content_type;
+ char *raw_json;
+ const char *end;
+
+ *jsonp = NULL;
+
+ herr = rd_http_req_init(&hreq, url);
+ if (unlikely(herr != NULL))
+ return herr;
+
+ // FIXME: send Accept: json.. header?
+
+ herr = rd_http_req_perform_sync(&hreq);
+ len = rd_buf_len(hreq.hreq_buf);
+ if (herr && len == 0) {
+ rd_http_req_destroy(&hreq);
+ return herr;
+ }
+
+ if (len == 0) {
+ /* Empty response: create empty JSON object */
+ *jsonp = cJSON_CreateObject();
+ rd_http_req_destroy(&hreq);
+ return NULL;
+ }
+
+ content_type = rd_http_req_get_content_type(&hreq);
+
+ if (!content_type || rd_strncasecmp(content_type, "application/json",
+ strlen("application/json"))) {
+ if (!herr)
+ herr = rd_http_error_new(
+ hreq.hreq_code, "Response is not JSON encoded: %s",
+ content_type ? content_type : "(n/a)");
+ rd_http_req_destroy(&hreq);
+ return herr;
+ }
+
+ /* cJSON requires the entire input to parse in contiguous memory. */
+ rd_slice_init_full(&slice, hreq.hreq_buf);
+ raw_json = rd_malloc(len + 1);
+ rd_slice_read(&slice, raw_json, len);
+ raw_json[len] = '\0';
+
+ /* Parse JSON */
+ end = NULL;
+ *jsonp = cJSON_ParseWithOpts(raw_json, &end, 0);
+ if (!*jsonp && !herr)
+ herr = rd_http_error_new(hreq.hreq_code,
+ "Failed to parse JSON response "
+ "at %" PRIusz "/%" PRIusz,
+ (size_t)(end - raw_json), len);
+
+ rd_free(raw_json);
+ rd_http_req_destroy(&hreq);
+
+ return herr;
+}
+
+
+void rd_http_global_init(void) {
+ curl_global_init(CURL_GLOBAL_DEFAULT);
+}
+
+
+/**
+ * @brief Unittest. Requires a (local) webserver to be set with env var
+ * RD_UT_HTTP_URL=http://localhost:1234/some-path
+ *
+ * This server must return a JSON object or array containing at least one
+ * object on the main URL with a 2xx response code,
+ * and 4xx response on $RD_UT_HTTP_URL/error (with whatever type of body).
+ */
+
+int unittest_http(void) {
+ const char *base_url = rd_getenv("RD_UT_HTTP_URL", NULL);
+ char *error_url;
+ size_t error_url_size;
+ cJSON *json, *jval;
+ rd_http_error_t *herr;
+ rd_bool_t empty;
+
+ if (!base_url || !*base_url)
+ RD_UT_SKIP("RD_UT_HTTP_URL environment variable not set");
+
+ RD_UT_BEGIN();
+
+ error_url_size = strlen(base_url) + strlen("/error") + 1;
+ error_url = rd_alloca(error_url_size);
+ rd_snprintf(error_url, error_url_size, "%s/error", base_url);
+
+ /* Try the base url first, parse its JSON and extract a key-value. */
+ json = NULL;
+ herr = rd_http_get_json(base_url, &json);
+ RD_UT_ASSERT(!herr, "Expected get_json(%s) to succeed, got: %s",
+ base_url, herr->errstr);
+
+ empty = rd_true;
+ cJSON_ArrayForEach(jval, json) {
+ empty = rd_false;
+ break;
+ }
+ RD_UT_ASSERT(!empty, "Expected non-empty JSON response from %s",
+ base_url);
+ RD_UT_SAY(
+ "URL %s returned no error and a non-empty "
+ "JSON object/array as expected",
+ base_url);
+ cJSON_Delete(json);
+
+
+ /* Try the error URL, verify error code. */
+ json = NULL;
+ herr = rd_http_get_json(error_url, &json);
+ RD_UT_ASSERT(herr != NULL, "Expected get_json(%s) to fail", error_url);
+ RD_UT_ASSERT(herr->code >= 400,
+ "Expected get_json(%s) error code >= "
+ "400, got %d",
+ error_url, herr->code);
+ RD_UT_SAY(
+ "Error URL %s returned code %d, errstr \"%s\" "
+ "and %s JSON object as expected",
+ error_url, herr->code, herr->errstr, json ? "a" : "no");
+ /* Check if there's a JSON document returned */
+ if (json)
+ cJSON_Delete(json);
+ rd_http_error_destroy(herr);
+
+ RD_UT_PASS();
+}