diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_aux.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_aux.c | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_aux.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_aux.c new file mode 100644 index 000000000..753f03d67 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_aux.c @@ -0,0 +1,278 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2018 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "rdkafka_int.h" +#include "rdkafka_aux.h" +#include "rdkafka_error.h" + +rd_kafka_resp_err_t +rd_kafka_topic_result_error(const rd_kafka_topic_result_t *topicres) { + return topicres->err; +} + +const char * +rd_kafka_topic_result_error_string(const rd_kafka_topic_result_t *topicres) { + return topicres->errstr; +} + +const char * +rd_kafka_topic_result_name(const rd_kafka_topic_result_t *topicres) { + return topicres->topic; +} + +/** + * @brief Create new topic_result (single allocation). + * + * @param topic Topic string, if topic_size is != -1 it does not have to + * be nul-terminated. + * @param topic_size Size of topic, or -1 to perform automatic strlen() + * @param err Error code + * @param errstr Optional error string. + * + * All input arguments are copied. + */ + +rd_kafka_topic_result_t *rd_kafka_topic_result_new(const char *topic, + ssize_t topic_size, + rd_kafka_resp_err_t err, + const char *errstr) { + size_t tlen = topic_size != -1 ? (size_t)topic_size : strlen(topic); + size_t elen = errstr ? strlen(errstr) + 1 : 0; + rd_kafka_topic_result_t *terr; + + terr = rd_malloc(sizeof(*terr) + tlen + 1 + elen); + + terr->err = err; + + terr->topic = terr->data; + memcpy(terr->topic, topic, tlen); + terr->topic[tlen] = '\0'; + + if (errstr) { + terr->errstr = terr->topic + tlen + 1; + memcpy(terr->errstr, errstr, elen); + } else { + terr->errstr = NULL; + } + + return terr; +} + + +/** + * @brief Destroy topic_result + */ +void rd_kafka_topic_result_destroy(rd_kafka_topic_result_t *terr) { + rd_free(terr); +} + +/** + * @brief Destroy-variant suitable for rd_list free_cb use. + */ +void rd_kafka_topic_result_free(void *ptr) { + rd_kafka_topic_result_destroy((rd_kafka_topic_result_t *)ptr); +} + +const rd_kafka_error_t * +rd_kafka_group_result_error(const rd_kafka_group_result_t *groupres) { + return groupres->error; +} + +const char * +rd_kafka_group_result_name(const rd_kafka_group_result_t *groupres) { + return groupres->group; +} + +const rd_kafka_topic_partition_list_t * +rd_kafka_group_result_partitions(const rd_kafka_group_result_t *groupres) { + return groupres->partitions; +} + +rd_kafka_group_result_t * +rd_kafka_group_result_copy(const rd_kafka_group_result_t *groupres) { + return rd_kafka_group_result_new( + groupres->group, -1, groupres->partitions, + groupres->error ? rd_kafka_error_copy(groupres->error) : NULL); +} + +/** + * @brief Same as rd_kafka_group_result_copy() but suitable for + * rd_list_copy(). The \p opaque is ignored. + */ +void *rd_kafka_group_result_copy_opaque(const void *src_groupres, + void *opaque) { + return rd_kafka_group_result_copy(src_groupres); +} + + +/** + * @brief Create new group_result (single allocation). + * + * @param group Group string, if group_size is != -1 it does not have to + * be nul-terminated. + * @param group_size Size of group, or -1 to perform automatic strlen() + * @param error Error object, or NULL on success. Takes ownership of \p error. + * + * All input arguments are copied. + */ + +rd_kafka_group_result_t * +rd_kafka_group_result_new(const char *group, + ssize_t group_size, + const rd_kafka_topic_partition_list_t *partitions, + rd_kafka_error_t *error) { + size_t glen = group_size != -1 ? (size_t)group_size : strlen(group); + rd_kafka_group_result_t *groupres; + + groupres = rd_calloc(1, sizeof(*groupres) + glen + 1); + + + groupres->group = groupres->data; + memcpy(groupres->group, group, glen); + groupres->group[glen] = '\0'; + + if (partitions) + groupres->partitions = + rd_kafka_topic_partition_list_copy(partitions); + + groupres->error = error; + + return groupres; +} + + +/** + * @brief Destroy group_result + */ +void rd_kafka_group_result_destroy(rd_kafka_group_result_t *groupres) { + if (groupres->partitions) + rd_kafka_topic_partition_list_destroy(groupres->partitions); + if (groupres->error) + rd_kafka_error_destroy(groupres->error); + rd_free(groupres); +} + +/** + * @brief Destroy-variant suitable for rd_list free_cb use. + */ +void rd_kafka_group_result_free(void *ptr) { + rd_kafka_group_result_destroy((rd_kafka_group_result_t *)ptr); +} + + +const rd_kafka_error_t * +rd_kafka_acl_result_error(const rd_kafka_acl_result_t *aclres) { + return aclres->error; +} + +/** + * @brief Allocates and return an acl result, takes ownership of \p error + * (unless NULL). + * + * @returns The new acl result. + */ +rd_kafka_acl_result_t *rd_kafka_acl_result_new(rd_kafka_error_t *error) { + rd_kafka_acl_result_t *acl_res; + + acl_res = rd_calloc(1, sizeof(*acl_res)); + + acl_res->error = error; + + return acl_res; +} + +/** + * @brief Destroy acl_result + */ +void rd_kafka_acl_result_destroy(rd_kafka_acl_result_t *acl_res) { + if (acl_res->error) + rd_kafka_error_destroy(acl_res->error); + rd_free(acl_res); +} + +/** + * @brief Destroy-variant suitable for rd_list free_cb use. + */ +void rd_kafka_acl_result_free(void *ptr) { + rd_kafka_acl_result_destroy((rd_kafka_acl_result_t *)ptr); +} + + +/** + * @brief Create a new Node object. + * + * @param id The node id. + * @param host The node host. + * @param port The node port. + * @param rack_id (optional) The node rack id. + * @return A new allocated Node object. + * Use rd_kafka_Node_destroy() to free when done. + */ +rd_kafka_Node_t *rd_kafka_Node_new(int id, + const char *host, + uint16_t port, + const char *rack_id) { + rd_kafka_Node_t *ret = rd_calloc(1, sizeof(*ret)); + ret->id = id; + ret->port = port; + ret->host = rd_strdup(host); + if (rack_id != NULL) + ret->rack_id = rd_strdup(rack_id); + return ret; +} + +/** + * @brief Copy \p src Node object + * + * @param src The Node to copy. + * @return A new allocated Node object. + * Use rd_kafka_Node_destroy() to free when done. + */ +rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src) { + return rd_kafka_Node_new(src->id, src->host, src->port, src->rack_id); +} + +void rd_kafka_Node_destroy(rd_kafka_Node_t *node) { + rd_free(node->host); + if (node->rack_id) + rd_free(node->rack_id); + rd_free(node); +} + +int rd_kafka_Node_id(const rd_kafka_Node_t *node) { + return node->id; +} + +const char *rd_kafka_Node_host(const rd_kafka_Node_t *node) { + return node->host; +} + +uint16_t rd_kafka_Node_port(const rd_kafka_Node_t *node) { + return node->port; +} |