summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_subscription.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_subscription.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_subscription.c278
1 files changed, 278 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_subscription.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_subscription.c
new file mode 100644
index 000000000..080589358
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_subscription.c
@@ -0,0 +1,278 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * This is the high level consumer API which is mutually exclusive
+ * with the old legacy simple consumer.
+ * Only one of these interfaces may be used on a given rd_kafka_t handle.
+ */
+
+#include "rdkafka_int.h"
+
+
+rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t *rk) {
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ return rd_kafka_op_err_destroy(
+ rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_SUBSCRIBE));
+}
+
+
+/** @returns 1 if the topic is invalid (bad regex, empty), else 0 if valid. */
+static size_t _invalid_topic_cb(const rd_kafka_topic_partition_t *rktpar,
+ void *opaque) {
+ rd_regex_t *re;
+ char errstr[1];
+
+ if (!*rktpar->topic)
+ return 1;
+
+ if (*rktpar->topic != '^')
+ return 0;
+
+ if (!(re = rd_regex_comp(rktpar->topic, errstr, sizeof(errstr))))
+ return 1;
+
+ rd_regex_destroy(re);
+
+ return 0;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_subscribe(rd_kafka_t *rk,
+ const rd_kafka_topic_partition_list_t *topics) {
+
+ rd_kafka_op_t *rko;
+ rd_kafka_cgrp_t *rkcg;
+ rd_kafka_topic_partition_list_t *topics_cpy;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ /* Validate topics */
+ if (topics->cnt == 0 || rd_kafka_topic_partition_list_sum(
+ topics, _invalid_topic_cb, NULL) > 0)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ topics_cpy = rd_kafka_topic_partition_list_copy(topics);
+ if (rd_kafka_topic_partition_list_has_duplicates(
+ topics_cpy, rd_true /*ignore partition field*/)) {
+ rd_kafka_topic_partition_list_destroy(topics_cpy);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_SUBSCRIBE);
+ rko->rko_u.subscribe.topics = topics_cpy;
+
+ return rd_kafka_op_err_destroy(
+ rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE));
+}
+
+
+rd_kafka_error_t *
+rd_kafka_assign0(rd_kafka_t *rk,
+ rd_kafka_assign_method_t assign_method,
+ const rd_kafka_topic_partition_list_t *partitions) {
+ rd_kafka_op_t *rko;
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
+ "Requires a consumer with group.id "
+ "configured");
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_ASSIGN);
+
+ rko->rko_u.assign.method = assign_method;
+
+ if (partitions)
+ rko->rko_u.assign.partitions =
+ rd_kafka_topic_partition_list_copy(partitions);
+
+ return rd_kafka_op_error_destroy(
+ rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE));
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_assign(rd_kafka_t *rk,
+ const rd_kafka_topic_partition_list_t *partitions) {
+ rd_kafka_error_t *error;
+ rd_kafka_resp_err_t err;
+
+ error = rd_kafka_assign0(rk, RD_KAFKA_ASSIGN_METHOD_ASSIGN, partitions);
+
+ if (!error)
+ err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ else {
+ err = rd_kafka_error_code(error);
+ rd_kafka_error_destroy(error);
+ }
+
+ return err;
+}
+
+
+rd_kafka_error_t *
+rd_kafka_incremental_assign(rd_kafka_t *rk,
+ const rd_kafka_topic_partition_list_t *partitions) {
+ if (!partitions)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "partitions must not be NULL");
+
+ return rd_kafka_assign0(rk, RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN,
+ partitions);
+}
+
+
+rd_kafka_error_t *rd_kafka_incremental_unassign(
+ rd_kafka_t *rk,
+ const rd_kafka_topic_partition_list_t *partitions) {
+ if (!partitions)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "partitions must not be NULL");
+
+ return rd_kafka_assign0(rk, RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN,
+ partitions);
+}
+
+
+int rd_kafka_assignment_lost(rd_kafka_t *rk) {
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return 0;
+
+ return rd_kafka_cgrp_assignment_is_lost(rkcg) == rd_true;
+}
+
+
+const char *rd_kafka_rebalance_protocol(rd_kafka_t *rk) {
+ rd_kafka_op_t *rko;
+ rd_kafka_cgrp_t *rkcg;
+ const char *result;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return NULL;
+
+ rko = rd_kafka_op_req2(rkcg->rkcg_ops,
+ RD_KAFKA_OP_GET_REBALANCE_PROTOCOL);
+
+ if (!rko)
+ return NULL;
+ else if (rko->rko_err) {
+ rd_kafka_op_destroy(rko);
+ return NULL;
+ }
+
+ result = rko->rko_u.rebalance_protocol.str;
+
+ rd_kafka_op_destroy(rko);
+
+ return result;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_assignment(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t **partitions) {
+ rd_kafka_op_t *rko;
+ rd_kafka_resp_err_t err;
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_ASSIGNMENT);
+ if (!rko)
+ return RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ err = rko->rko_err;
+
+ *partitions = rko->rko_u.assign.partitions;
+ rko->rko_u.assign.partitions = NULL;
+ rd_kafka_op_destroy(rko);
+
+ if (!*partitions && !err) {
+ /* Create an empty list for convenience of the caller */
+ *partitions = rd_kafka_topic_partition_list_new(0);
+ }
+
+ return err;
+}
+
+rd_kafka_resp_err_t
+rd_kafka_subscription(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t **topics) {
+ rd_kafka_op_t *rko;
+ rd_kafka_resp_err_t err;
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_SUBSCRIPTION);
+ if (!rko)
+ return RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ err = rko->rko_err;
+
+ *topics = rko->rko_u.subscribe.topics;
+ rko->rko_u.subscribe.topics = NULL;
+ rd_kafka_op_destroy(rko);
+
+ if (!*topics && !err) {
+ /* Create an empty list for convenience of the caller */
+ *topics = rd_kafka_topic_partition_list_new(0);
+ }
+
+ return err;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_pause_partitions(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions) {
+ return rd_kafka_toppars_pause_resume(rk, rd_true /*pause*/, RD_SYNC,
+ RD_KAFKA_TOPPAR_F_APP_PAUSE,
+ partitions);
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_resume_partitions(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions) {
+ return rd_kafka_toppars_pause_resume(rk, rd_false /*resume*/, RD_SYNC,
+ RD_KAFKA_TOPPAR_F_APP_PAUSE,
+ partitions);
+}