diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_event.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_event.c | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_event.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_event.c new file mode 100644 index 00000000..ffd1a178 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_event.c @@ -0,0 +1,426 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2016 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_event.h" +#include "rd.h" + +rd_kafka_event_type_t rd_kafka_event_type(const rd_kafka_event_t *rkev) { + return rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE; +} + +const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { + switch (rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE) { + case RD_KAFKA_EVENT_NONE: + return "(NONE)"; + case RD_KAFKA_EVENT_DR: + return "DeliveryReport"; + case RD_KAFKA_EVENT_FETCH: + return "Fetch"; + case RD_KAFKA_EVENT_LOG: + return "Log"; + case RD_KAFKA_EVENT_ERROR: + return "Error"; + case RD_KAFKA_EVENT_REBALANCE: + return "Rebalance"; + case RD_KAFKA_EVENT_OFFSET_COMMIT: + return "OffsetCommit"; + case RD_KAFKA_EVENT_STATS: + return "Stats"; + case RD_KAFKA_EVENT_CREATETOPICS_RESULT: + return "CreateTopicsResult"; + case RD_KAFKA_EVENT_DELETETOPICS_RESULT: + return "DeleteTopicsResult"; + case RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT: + return "CreatePartitionsResult"; + case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT: + return "AlterConfigsResult"; + case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT: + return "DescribeConfigsResult"; + case RD_KAFKA_EVENT_DELETERECORDS_RESULT: + return "DeleteRecordsResult"; + case RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT: + return "ListConsumerGroupsResult"; + case RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT: + return "DescribeConsumerGroupsResult"; + case RD_KAFKA_EVENT_DELETEGROUPS_RESULT: + return "DeleteGroupsResult"; + case RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT: + return "DeleteConsumerGroupOffsetsResult"; + case RD_KAFKA_EVENT_CREATEACLS_RESULT: + return "CreateAclsResult"; + case RD_KAFKA_EVENT_DESCRIBEACLS_RESULT: + return "DescribeAclsResult"; + case RD_KAFKA_EVENT_DELETEACLS_RESULT: + return "DeleteAclsResult"; + case RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT: + return "AlterConsumerGroupOffsetsResult"; + case RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT: + return "ListConsumerGroupOffsetsResult"; + case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: + return "SaslOAuthBearerTokenRefresh"; + default: + return "?unknown?"; + } +} + + + +void rd_kafka_event_destroy(rd_kafka_event_t *rkev) { + if (unlikely(!rkev)) + return; + rd_kafka_op_destroy(rkev); +} + + +/** + * @returns the next message from the event's message queue. + * @remark messages will be freed automatically when event is destroyed, + * application MUST NOT call rd_kafka_message_destroy() + */ +const rd_kafka_message_t *rd_kafka_event_message_next(rd_kafka_event_t *rkev) { + rd_kafka_op_t *rko = rkev; + rd_kafka_msg_t *rkm; + rd_kafka_msgq_t *rkmq, *rkmq2; + rd_kafka_message_t *rkmessage; + + switch (rkev->rko_type) { + case RD_KAFKA_OP_DR: + rkmq = &rko->rko_u.dr.msgq; + rkmq2 = &rko->rko_u.dr.msgq2; + break; + + case RD_KAFKA_OP_FETCH: + /* Just one message */ + if (rko->rko_u.fetch.evidx++ > 0) + return NULL; + + rkmessage = rd_kafka_message_get(rko); + if (unlikely(!rkmessage)) + return NULL; + + /* Store offset, etc. */ + rd_kafka_fetch_op_app_prepare(NULL, rko); + + return rkmessage; + + + default: + return NULL; + } + + if (unlikely(!(rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)))) + return NULL; + + rd_kafka_msgq_deq(rkmq, rkm, 1); + + /* Put rkm on secondary message queue which will be purged later. */ + rd_kafka_msgq_enq(rkmq2, rkm); + + return rd_kafka_message_get_from_rkm(rko, rkm); +} + + +size_t rd_kafka_event_message_array(rd_kafka_event_t *rkev, + const rd_kafka_message_t **rkmessages, + size_t size) { + size_t cnt = 0; + const rd_kafka_message_t *rkmessage; + + while (cnt < size && (rkmessage = rd_kafka_event_message_next(rkev))) + rkmessages[cnt++] = rkmessage; + + return cnt; +} + + +size_t rd_kafka_event_message_count(rd_kafka_event_t *rkev) { + switch (rkev->rko_evtype) { + case RD_KAFKA_EVENT_DR: + return (size_t)rkev->rko_u.dr.msgq.rkmq_msg_cnt; + case RD_KAFKA_EVENT_FETCH: + return 1; + default: + return 0; + } +} + + +const char *rd_kafka_event_config_string(rd_kafka_event_t *rkev) { + switch (rkev->rko_evtype) { +#if WITH_SASL_OAUTHBEARER + case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: + return rkev->rko_rk->rk_conf.sasl.oauthbearer_config; +#endif + default: + return NULL; + } +} + +rd_kafka_resp_err_t rd_kafka_event_error(rd_kafka_event_t *rkev) { + return rkev->rko_err; +} + +const char *rd_kafka_event_error_string(rd_kafka_event_t *rkev) { + switch (rkev->rko_type) { + case RD_KAFKA_OP_ERR: + case RD_KAFKA_OP_CONSUMER_ERR: + if (rkev->rko_u.err.errstr) + return rkev->rko_u.err.errstr; + break; + case RD_KAFKA_OP_ADMIN_RESULT: + if (rkev->rko_u.admin_result.errstr) + return rkev->rko_u.admin_result.errstr; + break; + default: + break; + } + + return rd_kafka_err2str(rkev->rko_err); +} + +int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev) { + return rkev->rko_u.err.fatal; +} + + +void *rd_kafka_event_opaque(rd_kafka_event_t *rkev) { + switch (rkev->rko_type & ~RD_KAFKA_OP_FLAGMASK) { + case RD_KAFKA_OP_OFFSET_COMMIT: + return rkev->rko_u.offset_commit.opaque; + case RD_KAFKA_OP_ADMIN_RESULT: + return rkev->rko_u.admin_result.opaque; + default: + return NULL; + } +} + + +int rd_kafka_event_log(rd_kafka_event_t *rkev, + const char **fac, + const char **str, + int *level) { + if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG)) + return -1; + + if (likely(fac != NULL)) + *fac = rkev->rko_u.log.fac; + if (likely(str != NULL)) + *str = rkev->rko_u.log.str; + if (likely(level != NULL)) + *level = rkev->rko_u.log.level; + + return 0; +} + +int rd_kafka_event_debug_contexts(rd_kafka_event_t *rkev, + char *dst, + size_t dstsize) { + static const char *names[] = { + "generic", "broker", "topic", "metadata", "feature", + "queue", "msg", "protocol", "cgrp", "security", + "fetch", "interceptor", "plugin", "consumer", "admin", + "eos", "mock", NULL}; + if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG)) + return -1; + rd_flags2str(dst, dstsize, names, rkev->rko_u.log.ctx); + return 0; +} + +const char *rd_kafka_event_stats(rd_kafka_event_t *rkev) { + return rkev->rko_u.stats.json; +} + +rd_kafka_topic_partition_list_t * +rd_kafka_event_topic_partition_list(rd_kafka_event_t *rkev) { + switch (rkev->rko_evtype) { + case RD_KAFKA_EVENT_REBALANCE: + return rkev->rko_u.rebalance.partitions; + case RD_KAFKA_EVENT_OFFSET_COMMIT: + return rkev->rko_u.offset_commit.partitions; + default: + return NULL; + } +} + + +rd_kafka_topic_partition_t * +rd_kafka_event_topic_partition(rd_kafka_event_t *rkev) { + rd_kafka_topic_partition_t *rktpar; + + if (unlikely(!rkev->rko_rktp)) + return NULL; + + rktpar = rd_kafka_topic_partition_new_from_rktp(rkev->rko_rktp); + + switch (rkev->rko_type) { + case RD_KAFKA_OP_ERR: + case RD_KAFKA_OP_CONSUMER_ERR: + rktpar->offset = rkev->rko_u.err.offset; + break; + default: + break; + } + + rktpar->err = rkev->rko_err; + + return rktpar; +} + + + +const rd_kafka_CreateTopics_result_t * +rd_kafka_event_CreateTopics_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_CREATETOPICS_RESULT) + return NULL; + else + return (const rd_kafka_CreateTopics_result_t *)rkev; +} + + +const rd_kafka_DeleteTopics_result_t * +rd_kafka_event_DeleteTopics_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DELETETOPICS_RESULT) + return NULL; + else + return (const rd_kafka_DeleteTopics_result_t *)rkev; +} + + +const rd_kafka_CreatePartitions_result_t * +rd_kafka_event_CreatePartitions_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) + return NULL; + else + return (const rd_kafka_CreatePartitions_result_t *)rkev; +} + + +const rd_kafka_AlterConfigs_result_t * +rd_kafka_event_AlterConfigs_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) + return NULL; + else + return (const rd_kafka_AlterConfigs_result_t *)rkev; +} + + +const rd_kafka_DescribeConfigs_result_t * +rd_kafka_event_DescribeConfigs_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) + return NULL; + else + return (const rd_kafka_DescribeConfigs_result_t *)rkev; +} + +const rd_kafka_DeleteRecords_result_t * +rd_kafka_event_DeleteRecords_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DELETERECORDS_RESULT) + return NULL; + else + return (const rd_kafka_DeleteRecords_result_t *)rkev; +} + +const rd_kafka_ListConsumerGroups_result_t * +rd_kafka_event_ListConsumerGroups_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) + return NULL; + else + return (const rd_kafka_ListConsumerGroups_result_t *)rkev; +} + +const rd_kafka_DescribeConsumerGroups_result_t * +rd_kafka_event_DescribeConsumerGroups_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) + return NULL; + else + return (const rd_kafka_DescribeConsumerGroups_result_t *)rkev; +} + +const rd_kafka_DeleteGroups_result_t * +rd_kafka_event_DeleteGroups_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DELETEGROUPS_RESULT) + return NULL; + else + return (const rd_kafka_DeleteGroups_result_t *)rkev; +} + +const rd_kafka_DeleteConsumerGroupOffsets_result_t * +rd_kafka_event_DeleteConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != + RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) + return NULL; + else + return ( + const rd_kafka_DeleteConsumerGroupOffsets_result_t *)rkev; +} + +const rd_kafka_CreateAcls_result_t * +rd_kafka_event_CreateAcls_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_CREATEACLS_RESULT) + return NULL; + else + return (const rd_kafka_CreateAcls_result_t *)rkev; +} + +const rd_kafka_DescribeAcls_result_t * +rd_kafka_event_DescribeAcls_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBEACLS_RESULT) + return NULL; + else + return (const rd_kafka_DescribeAcls_result_t *)rkev; +} + +const rd_kafka_DeleteAcls_result_t * +rd_kafka_event_DeleteAcls_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_DELETEACLS_RESULT) + return NULL; + else + return (const rd_kafka_DeleteAcls_result_t *)rkev; +} + +const rd_kafka_AlterConsumerGroupOffsets_result_t * +rd_kafka_event_AlterConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT) + return NULL; + else + return ( + const rd_kafka_AlterConsumerGroupOffsets_result_t *)rkev; +} + +const rd_kafka_ListConsumerGroupOffsets_result_t * +rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT) + return NULL; + else + return (const rd_kafka_ListConsumerGroupOffsets_result_t *)rkev; +} |