diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c new file mode 100644 index 000000000..1e715cfba --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c @@ -0,0 +1,142 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +/** + * Builtin SASL PLAIN support when Cyrus SASL is not available + */ +#include "rdkafka_int.h" +#include "rdkafka_transport.h" +#include "rdkafka_transport_int.h" +#include "rdkafka_sasl.h" +#include "rdkafka_sasl_int.h" + + +/** + * @brief Handle received frame from broker. + */ +static int rd_kafka_sasl_plain_recv(struct rd_kafka_transport_s *rktrans, + const void *buf, + size_t size, + char *errstr, + size_t errstr_size) { + if (size) + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLPLAIN", + "Received non-empty SASL PLAIN (builtin) " + "response from broker (%" PRIusz " bytes)", + size); + + rd_kafka_sasl_auth_done(rktrans); + + return 0; +} + + +/** + * @brief Initialize and start SASL PLAIN (builtin) authentication. + * + * Returns 0 on successful init and -1 on error. + * + * @locality broker thread + */ +int rd_kafka_sasl_plain_client_new(rd_kafka_transport_t *rktrans, + const char *hostname, + char *errstr, + size_t errstr_size) { + rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; + rd_kafka_t *rk = rkb->rkb_rk; + /* [authzid] UTF8NUL authcid UTF8NUL passwd */ + char *buf; + int of = 0; + int zidlen = 0; + int cidlen, pwlen; + + mtx_lock(&rk->rk_conf.sasl.lock); + + cidlen = rk->rk_conf.sasl.username + ? (int)strlen(rk->rk_conf.sasl.username) + : 0; + pwlen = rk->rk_conf.sasl.password + ? (int)strlen(rk->rk_conf.sasl.password) + : 0; + + buf = rd_alloca(zidlen + 1 + cidlen + 1 + pwlen + 1); + + /* authzid: none (empty) */ + /* UTF8NUL */ + buf[of++] = 0; + /* authcid */ + memcpy(&buf[of], rk->rk_conf.sasl.username, cidlen); + of += cidlen; + /* UTF8NUL */ + buf[of++] = 0; + /* passwd */ + memcpy(&buf[of], rk->rk_conf.sasl.password, pwlen); + of += pwlen; + mtx_unlock(&rk->rk_conf.sasl.lock); + + rd_rkb_dbg(rkb, SECURITY, "SASLPLAIN", + "Sending SASL PLAIN (builtin) authentication token"); + + if (rd_kafka_sasl_send(rktrans, buf, of, errstr, errstr_size)) + return -1; + + /* PLAIN is appearantly done here, but we still need to make sure + * the PLAIN frame is sent and we get a response back (empty) */ + rktrans->rktrans_sasl.complete = 1; + return 0; +} + + +/** + * @brief Validate PLAIN config + */ +static int rd_kafka_sasl_plain_conf_validate(rd_kafka_t *rk, + char *errstr, + size_t errstr_size) { + rd_bool_t both_set; + + mtx_lock(&rk->rk_conf.sasl.lock); + both_set = rk->rk_conf.sasl.username && rk->rk_conf.sasl.password; + mtx_unlock(&rk->rk_conf.sasl.lock); + + if (!both_set) { + rd_snprintf(errstr, errstr_size, + "sasl.username and sasl.password must be set"); + return -1; + } + + return 0; +} + + +const struct rd_kafka_sasl_provider rd_kafka_sasl_plain_provider = { + .name = "PLAIN (builtin)", + .client_new = rd_kafka_sasl_plain_client_new, + .recv = rd_kafka_sasl_plain_recv, + .conf_validate = rd_kafka_sasl_plain_conf_validate}; |