diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_header.c | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_header.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_header.c | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_header.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_header.c new file mode 100644 index 00000000..98359b42 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_header.c @@ -0,0 +1,220 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_header.h" + + + +#define rd_kafka_header_destroy rd_free + +void rd_kafka_headers_destroy(rd_kafka_headers_t *hdrs) { + rd_list_destroy(&hdrs->rkhdrs_list); + rd_free(hdrs); +} + +rd_kafka_headers_t *rd_kafka_headers_new(size_t initial_count) { + rd_kafka_headers_t *hdrs; + + hdrs = rd_malloc(sizeof(*hdrs)); + rd_list_init(&hdrs->rkhdrs_list, (int)initial_count, + rd_kafka_header_destroy); + hdrs->rkhdrs_ser_size = 0; + + return hdrs; +} + +static void *rd_kafka_header_copy(const void *_src, void *opaque) { + rd_kafka_headers_t *hdrs = opaque; + const rd_kafka_header_t *src = (const rd_kafka_header_t *)_src; + + return (void *)rd_kafka_header_add( + hdrs, src->rkhdr_name, src->rkhdr_name_size, src->rkhdr_value, + src->rkhdr_value_size); +} + +rd_kafka_headers_t *rd_kafka_headers_copy(const rd_kafka_headers_t *src) { + rd_kafka_headers_t *dst; + + dst = rd_malloc(sizeof(*dst)); + rd_list_init(&dst->rkhdrs_list, rd_list_cnt(&src->rkhdrs_list), + rd_kafka_header_destroy); + dst->rkhdrs_ser_size = 0; /* Updated by header_copy() */ + rd_list_copy_to(&dst->rkhdrs_list, &src->rkhdrs_list, + rd_kafka_header_copy, dst); + + return dst; +} + + + +rd_kafka_resp_err_t rd_kafka_header_add(rd_kafka_headers_t *hdrs, + const char *name, + ssize_t name_size, + const void *value, + ssize_t value_size) { + rd_kafka_header_t *hdr; + char varint_NameLen[RD_UVARINT_ENC_SIZEOF(int32_t)]; + char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)]; + + if (name_size == -1) + name_size = strlen(name); + + if (value_size == -1) + value_size = value ? strlen(value) : 0; + else if (!value) + value_size = 0; + + hdr = rd_malloc(sizeof(*hdr) + name_size + 1 + value_size + 1); + hdr->rkhdr_name_size = name_size; + memcpy((void *)hdr->rkhdr_name, name, name_size); + hdr->rkhdr_name[name_size] = '\0'; + + if (likely(value != NULL)) { + hdr->rkhdr_value = hdr->rkhdr_name + name_size + 1; + memcpy((void *)hdr->rkhdr_value, value, value_size); + hdr->rkhdr_value[value_size] = '\0'; + hdr->rkhdr_value_size = value_size; + } else { + hdr->rkhdr_value = NULL; + hdr->rkhdr_value_size = 0; + } + + rd_list_add(&hdrs->rkhdrs_list, hdr); + + /* Calculate serialized size of header */ + hdr->rkhdr_ser_size = name_size + value_size; + hdr->rkhdr_ser_size += rd_uvarint_enc_i64( + varint_NameLen, sizeof(varint_NameLen), name_size); + hdr->rkhdr_ser_size += rd_uvarint_enc_i64( + varint_ValueLen, sizeof(varint_ValueLen), value_size); + hdrs->rkhdrs_ser_size += hdr->rkhdr_ser_size; + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief header_t(name) to char * comparator + */ +static int rd_kafka_header_cmp_str(void *_a, void *_b) { + const rd_kafka_header_t *a = _a; + const char *b = _b; + + return strcmp(a->rkhdr_name, b); +} + +rd_kafka_resp_err_t rd_kafka_header_remove(rd_kafka_headers_t *hdrs, + const char *name) { + size_t ser_size = 0; + rd_kafka_header_t *hdr; + int i; + + RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) { + if (rd_kafka_header_cmp_str(hdr, (void *)name)) + continue; + + ser_size += hdr->rkhdr_ser_size; + rd_list_remove_elem(&hdrs->rkhdrs_list, i); + rd_kafka_header_destroy(hdr); + } + + if (ser_size == 0) + return RD_KAFKA_RESP_ERR__NOENT; + + rd_dassert(hdrs->rkhdrs_ser_size >= ser_size); + hdrs->rkhdrs_ser_size -= ser_size; + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +rd_kafka_resp_err_t rd_kafka_header_get_last(const rd_kafka_headers_t *hdrs, + const char *name, + const void **valuep, + size_t *sizep) { + const rd_kafka_header_t *hdr; + int i; + size_t name_size = strlen(name); + + RD_LIST_FOREACH_REVERSE(hdr, &hdrs->rkhdrs_list, i) { + if (hdr->rkhdr_name_size == name_size && + !strcmp(hdr->rkhdr_name, name)) { + *valuep = hdr->rkhdr_value; + *sizep = hdr->rkhdr_value_size; + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + return RD_KAFKA_RESP_ERR__NOENT; +} + + +rd_kafka_resp_err_t rd_kafka_header_get(const rd_kafka_headers_t *hdrs, + size_t idx, + const char *name, + const void **valuep, + size_t *sizep) { + const rd_kafka_header_t *hdr; + int i; + size_t mi = 0; /* index for matching names */ + size_t name_size = strlen(name); + + RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) { + if (hdr->rkhdr_name_size == name_size && + !strcmp(hdr->rkhdr_name, name) && mi++ == idx) { + *valuep = hdr->rkhdr_value; + *sizep = hdr->rkhdr_value_size; + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + return RD_KAFKA_RESP_ERR__NOENT; +} + + +rd_kafka_resp_err_t rd_kafka_header_get_all(const rd_kafka_headers_t *hdrs, + size_t idx, + const char **namep, + const void **valuep, + size_t *sizep) { + const rd_kafka_header_t *hdr; + + hdr = rd_list_elem(&hdrs->rkhdrs_list, (int)idx); + if (unlikely(!hdr)) + return RD_KAFKA_RESP_ERR__NOENT; + + *namep = hdr->rkhdr_name; + *valuep = hdr->rkhdr_value; + *sizep = hdr->rkhdr_value_size; + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +size_t rd_kafka_header_cnt(const rd_kafka_headers_t *hdrs) { + return (size_t)rd_list_cnt(&hdrs->rkhdrs_list); +} |