summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_transport.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_transport.c1295
1 files changed, 1295 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_transport.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_transport.c
new file mode 100644
index 000000000..ae5895b29
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_transport.c
@@ -0,0 +1,1295 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifdef _WIN32
+#pragma comment(lib, "ws2_32.lib")
+#endif
+
+#define __need_IOV_MAX
+
+#define _DARWIN_C_SOURCE /* MSG_DONTWAIT */
+
+#include "rdkafka_int.h"
+#include "rdaddr.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_interceptor.h"
+
+#include <errno.h>
+
+/* AIX doesn't have MSG_DONTWAIT */
+#ifndef MSG_DONTWAIT
+#define MSG_DONTWAIT MSG_NONBLOCK
+#endif
+
+#if WITH_SSL
+#include "rdkafka_ssl.h"
+#endif
+
+/**< Current thread's rd_kafka_transport_t instance.
+ * This pointer is set up when calling any OpenSSL APIs that might
+ * trigger SSL callbacks, and is used to retrieve the SSL object's
+ * corresponding rd_kafka_transport_t instance.
+ * There is an set/get_ex_data() API in OpenSSL, but it requires storing
+ * a unique index somewhere, which we can't do without having a singleton
+ * object, so instead we cut out the middle man and store the
+ * rd_kafka_transport_t pointer directly in the thread-local memory. */
+RD_TLS rd_kafka_transport_t *rd_kafka_curr_transport;
+
+
+static int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);
+
+
+/**
+ * Low-level socket close
+ */
+static void rd_kafka_transport_close0(rd_kafka_t *rk, rd_socket_t s) {
+ if (rk->rk_conf.closesocket_cb)
+ rk->rk_conf.closesocket_cb((int)s, rk->rk_conf.opaque);
+ else
+ rd_socket_close(s);
+}
+
+/**
+ * Close and destroy a transport handle
+ */
+void rd_kafka_transport_close(rd_kafka_transport_t *rktrans) {
+#if WITH_SSL
+ rd_kafka_curr_transport = rktrans;
+ if (rktrans->rktrans_ssl)
+ rd_kafka_transport_ssl_close(rktrans);
+#endif
+
+ rd_kafka_sasl_close(rktrans);
+
+ if (rktrans->rktrans_recv_buf)
+ rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
+
+#ifdef _WIN32
+ WSACloseEvent(rktrans->rktrans_wsaevent);
+#endif
+
+ if (rktrans->rktrans_s != -1)
+ rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
+ rktrans->rktrans_s);
+
+ rd_free(rktrans);
+}
+
+/**
+ * @brief shutdown(2) a transport's underlying socket.
+ *
+ * This will prohibit further sends and receives.
+ * rd_kafka_transport_close() must still be called to close the socket.
+ */
+void rd_kafka_transport_shutdown(rd_kafka_transport_t *rktrans) {
+ shutdown(rktrans->rktrans_s,
+#ifdef _WIN32
+ SD_BOTH
+#else
+ SHUT_RDWR
+#endif
+ );
+}
+
+
+#ifndef _WIN32
+/**
+ * @brief sendmsg() abstraction, converting a list of segments to iovecs.
+ * @remark should only be called if the number of segments is > 1.
+ */
+static ssize_t rd_kafka_transport_socket_sendmsg(rd_kafka_transport_t *rktrans,
+ rd_slice_t *slice,
+ char *errstr,
+ size_t errstr_size) {
+ struct iovec iov[IOV_MAX];
+ struct msghdr msg = {.msg_iov = iov};
+ size_t iovlen;
+ ssize_t r;
+ size_t r2;
+
+ rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX,
+ /* FIXME: Measure the effects of this */
+ rktrans->rktrans_sndbuf_size);
+ msg.msg_iovlen = (int)iovlen;
+
+#ifdef __sun
+ /* See recvmsg() comment. Setting it here to be safe. */
+ rd_socket_errno = EAGAIN;
+#endif
+
+ r = sendmsg(rktrans->rktrans_s, &msg,
+ MSG_DONTWAIT
+#ifdef MSG_NOSIGNAL
+ | MSG_NOSIGNAL
+#endif
+ );
+
+ if (r == -1) {
+ if (rd_socket_errno == EAGAIN)
+ return 0;
+ rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno));
+ return -1;
+ }
+
+ /* Update buffer read position */
+ r2 = rd_slice_read(slice, NULL, (size_t)r);
+ rd_assert((size_t)r == r2 &&
+ *"BUG: wrote more bytes than available in slice");
+
+ return r;
+}
+#endif
+
+
+/**
+ * @brief Plain send() abstraction
+ */
+static ssize_t rd_kafka_transport_socket_send0(rd_kafka_transport_t *rktrans,
+ rd_slice_t *slice,
+ char *errstr,
+ size_t errstr_size) {
+ ssize_t sum = 0;
+ const void *p;
+ size_t rlen;
+
+ while ((rlen = rd_slice_peeker(slice, &p))) {
+ ssize_t r;
+ size_t r2;
+
+ r = send(rktrans->rktrans_s, p,
+#ifdef _WIN32
+ (int)rlen, (int)0
+#else
+ rlen, 0
+#endif
+ );
+
+#ifdef _WIN32
+ if (unlikely(r == RD_SOCKET_ERROR)) {
+ if (sum > 0 || rd_socket_errno == WSAEWOULDBLOCK) {
+ rktrans->rktrans_blocked = rd_true;
+ return sum;
+ } else {
+ rd_snprintf(
+ errstr, errstr_size, "%s",
+ rd_socket_strerror(rd_socket_errno));
+ return -1;
+ }
+ }
+
+ rktrans->rktrans_blocked = rd_false;
+#else
+ if (unlikely(r <= 0)) {
+ if (r == 0 || rd_socket_errno == EAGAIN)
+ return 0;
+ rd_snprintf(errstr, errstr_size, "%s",
+ rd_socket_strerror(rd_socket_errno));
+ return -1;
+ }
+#endif
+
+ /* Update buffer read position */
+ r2 = rd_slice_read(slice, NULL, (size_t)r);
+ rd_assert((size_t)r == r2 &&
+ *"BUG: wrote more bytes than available in slice");
+
+
+ sum += r;
+
+ /* FIXME: remove this and try again immediately and let
+ * the next write() call fail instead? */
+ if ((size_t)r < rlen)
+ break;
+ }
+
+ return sum;
+}
+
+
+static ssize_t rd_kafka_transport_socket_send(rd_kafka_transport_t *rktrans,
+ rd_slice_t *slice,
+ char *errstr,
+ size_t errstr_size) {
+#ifndef _WIN32
+ /* FIXME: Use sendmsg() with iovecs if there's more than one segment
+ * remaining, otherwise (or if platform does not have sendmsg)
+ * use plain send(). */
+ return rd_kafka_transport_socket_sendmsg(rktrans, slice, errstr,
+ errstr_size);
+#endif
+ return rd_kafka_transport_socket_send0(rktrans, slice, errstr,
+ errstr_size);
+}
+
+
+
+#ifndef _WIN32
+/**
+ * @brief recvmsg() abstraction, converting a list of segments to iovecs.
+ * @remark should only be called if the number of segments is > 1.
+ */
+static ssize_t rd_kafka_transport_socket_recvmsg(rd_kafka_transport_t *rktrans,
+ rd_buf_t *rbuf,
+ char *errstr,
+ size_t errstr_size) {
+ ssize_t r;
+ struct iovec iov[IOV_MAX];
+ struct msghdr msg = {.msg_iov = iov};
+ size_t iovlen;
+
+ rd_buf_get_write_iov(rbuf, msg.msg_iov, &iovlen, IOV_MAX,
+ /* FIXME: Measure the effects of this */
+ rktrans->rktrans_rcvbuf_size);
+ msg.msg_iovlen = (int)iovlen;
+
+#ifdef __sun
+ /* SunOS doesn't seem to set errno when recvmsg() fails
+ * due to no data and MSG_DONTWAIT is set. */
+ rd_socket_errno = EAGAIN;
+#endif
+ r = recvmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT);
+ if (unlikely(r <= 0)) {
+ if (r == -1 && rd_socket_errno == EAGAIN)
+ return 0;
+ else if (r == 0 || (r == -1 && rd_socket_errno == ECONNRESET)) {
+ /* Receive 0 after POLLIN event means
+ * connection closed. */
+ rd_snprintf(errstr, errstr_size, "Disconnected");
+ return -1;
+ } else if (r == -1) {
+ rd_snprintf(errstr, errstr_size, "%s",
+ rd_strerror(errno));
+ return -1;
+ }
+ }
+
+ /* Update buffer write position */
+ rd_buf_write(rbuf, NULL, (size_t)r);
+
+ return r;
+}
+#endif
+
+
+/**
+ * @brief Plain recv()
+ */
+static ssize_t rd_kafka_transport_socket_recv0(rd_kafka_transport_t *rktrans,
+ rd_buf_t *rbuf,
+ char *errstr,
+ size_t errstr_size) {
+ ssize_t sum = 0;
+ void *p;
+ size_t len;
+
+ while ((len = rd_buf_get_writable(rbuf, &p))) {
+ ssize_t r;
+
+ r = recv(rktrans->rktrans_s, p,
+#ifdef _WIN32
+ (int)
+#endif
+ len,
+ 0);
+
+ if (unlikely(r == RD_SOCKET_ERROR)) {
+ if (rd_socket_errno == EAGAIN
+#ifdef _WIN32
+ || rd_socket_errno == WSAEWOULDBLOCK
+#endif
+ )
+ return sum;
+ else {
+ rd_snprintf(
+ errstr, errstr_size, "%s",
+ rd_socket_strerror(rd_socket_errno));
+ return -1;
+ }
+ } else if (unlikely(r == 0)) {
+ /* Receive 0 after POLLIN event means
+ * connection closed. */
+ rd_snprintf(errstr, errstr_size, "Disconnected");
+ return -1;
+ }
+
+ /* Update buffer write position */
+ rd_buf_write(rbuf, NULL, (size_t)r);
+
+ sum += r;
+
+ /* FIXME: remove this and try again immediately and let
+ * the next recv() call fail instead? */
+ if ((size_t)r < len)
+ break;
+ }
+ return sum;
+}
+
+
+static ssize_t rd_kafka_transport_socket_recv(rd_kafka_transport_t *rktrans,
+ rd_buf_t *buf,
+ char *errstr,
+ size_t errstr_size) {
+#ifndef _WIN32
+ return rd_kafka_transport_socket_recvmsg(rktrans, buf, errstr,
+ errstr_size);
+#endif
+ return rd_kafka_transport_socket_recv0(rktrans, buf, errstr,
+ errstr_size);
+}
+
+
+
+/**
+ * CONNECT state is failed (errstr!=NULL) or done (TCP is up, SSL is working..).
+ * From this state we either hand control back to the broker code,
+ * or if authentication is configured we ente the AUTH state.
+ */
+void rd_kafka_transport_connect_done(rd_kafka_transport_t *rktrans,
+ char *errstr) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+
+ rd_kafka_curr_transport = rktrans;
+
+ rd_kafka_broker_connect_done(rkb, errstr);
+}
+
+
+
+ssize_t rd_kafka_transport_send(rd_kafka_transport_t *rktrans,
+ rd_slice_t *slice,
+ char *errstr,
+ size_t errstr_size) {
+ ssize_t r;
+#if WITH_SSL
+ if (rktrans->rktrans_ssl) {
+ rd_kafka_curr_transport = rktrans;
+ r = rd_kafka_transport_ssl_send(rktrans, slice, errstr,
+ errstr_size);
+ } else
+#endif
+ r = rd_kafka_transport_socket_send(rktrans, slice, errstr,
+ errstr_size);
+
+ return r;
+}
+
+
+ssize_t rd_kafka_transport_recv(rd_kafka_transport_t *rktrans,
+ rd_buf_t *rbuf,
+ char *errstr,
+ size_t errstr_size) {
+ ssize_t r;
+
+#if WITH_SSL
+ if (rktrans->rktrans_ssl) {
+ rd_kafka_curr_transport = rktrans;
+ r = rd_kafka_transport_ssl_recv(rktrans, rbuf, errstr,
+ errstr_size);
+ } else
+#endif
+ r = rd_kafka_transport_socket_recv(rktrans, rbuf, errstr,
+ errstr_size);
+
+ return r;
+}
+
+
+
+/**
+ * @brief Notify transport layer of full request sent.
+ */
+void rd_kafka_transport_request_sent(rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf) {
+ rd_kafka_transport_t *rktrans = rkb->rkb_transport;
+
+ /* Call on_request_sent interceptors */
+ rd_kafka_interceptors_on_request_sent(
+ rkb->rkb_rk, (int)rktrans->rktrans_s, rkb->rkb_name,
+ rkb->rkb_nodeid, rkbuf->rkbuf_reqhdr.ApiKey,
+ rkbuf->rkbuf_reqhdr.ApiVersion, rkbuf->rkbuf_corrid,
+ rd_slice_size(&rkbuf->rkbuf_reader));
+}
+
+
+
+/**
+ * Length framed receive handling.
+ * Currently only supports a the following framing:
+ * [int32_t:big_endian_length_of_payload][payload]
+ *
+ * To be used on POLLIN event, will return:
+ * -1: on fatal error (errstr will be updated, *rkbufp remains unset)
+ * 0: still waiting for data (*rkbufp remains unset)
+ * 1: data complete, (buffer returned in *rkbufp)
+ */
+int rd_kafka_transport_framed_recv(rd_kafka_transport_t *rktrans,
+ rd_kafka_buf_t **rkbufp,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_buf_t *rkbuf = rktrans->rktrans_recv_buf;
+ ssize_t r;
+ const int log_decode_errors = LOG_ERR;
+
+ /* States:
+ * !rktrans_recv_buf: initial state; set up buf to receive header.
+ * rkbuf_totlen == 0: awaiting header
+ * rkbuf_totlen > 0: awaiting payload
+ */
+
+ if (!rkbuf) {
+ rkbuf = rd_kafka_buf_new(1, 4 /*length field's length*/);
+ /* Set up buffer reader for the length field */
+ rd_buf_write_ensure(&rkbuf->rkbuf_buf, 4, 4);
+ rktrans->rktrans_recv_buf = rkbuf;
+ }
+
+
+ r = rd_kafka_transport_recv(rktrans, &rkbuf->rkbuf_buf, errstr,
+ errstr_size);
+ if (r == 0)
+ return 0;
+ else if (r == -1)
+ return -1;
+
+ if (rkbuf->rkbuf_totlen == 0) {
+ /* Frame length not known yet. */
+ int32_t frame_len;
+
+ if (rd_buf_write_pos(&rkbuf->rkbuf_buf) < sizeof(frame_len)) {
+ /* Wait for entire frame header. */
+ return 0;
+ }
+
+ /* Initialize reader */
+ rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, 4);
+
+ /* Reader header: payload length */
+ rd_kafka_buf_read_i32(rkbuf, &frame_len);
+
+ if (frame_len < 0 ||
+ frame_len > rktrans->rktrans_rkb->rkb_rk->rk_conf
+ .recv_max_msg_size) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid frame size %" PRId32, frame_len);
+ return -1;
+ }
+
+ rkbuf->rkbuf_totlen = 4 + frame_len;
+ if (frame_len == 0) {
+ /* Payload is empty, we're done. */
+ rktrans->rktrans_recv_buf = NULL;
+ *rkbufp = rkbuf;
+ return 1;
+ }
+
+ /* Allocate memory to hold entire frame payload in contigious
+ * memory. */
+ rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, frame_len);
+
+ /* Try reading directly, there is probably more data available*/
+ return rd_kafka_transport_framed_recv(rktrans, rkbufp, errstr,
+ errstr_size);
+ }
+
+ if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == rkbuf->rkbuf_totlen) {
+ /* Payload is complete. */
+ rktrans->rktrans_recv_buf = NULL;
+ *rkbufp = rkbuf;
+ return 1;
+ }
+
+ /* Wait for more data */
+ return 0;
+
+err_parse:
+ rd_snprintf(errstr, errstr_size, "Frame header parsing failed: %s",
+ rd_kafka_err2str(rkbuf->rkbuf_err));
+ return -1;
+}
+
+
+/**
+ * @brief Final socket setup after a connection has been established
+ */
+void rd_kafka_transport_post_connect_setup(rd_kafka_transport_t *rktrans) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ unsigned int slen;
+
+ /* Set socket send & receive buffer sizes if configuerd */
+ if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) {
+ if (setsockopt(
+ rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
+ (void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size,
+ sizeof(rkb->rkb_rk->rk_conf.socket_sndbuf_size)) ==
+ RD_SOCKET_ERROR)
+ rd_rkb_log(rkb, LOG_WARNING, "SNDBUF",
+ "Failed to set socket send "
+ "buffer size to %i: %s",
+ rkb->rkb_rk->rk_conf.socket_sndbuf_size,
+ rd_socket_strerror(rd_socket_errno));
+ }
+
+ if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) {
+ if (setsockopt(
+ rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
+ (void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
+ sizeof(rkb->rkb_rk->rk_conf.socket_rcvbuf_size)) ==
+ RD_SOCKET_ERROR)
+ rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
+ "Failed to set socket receive "
+ "buffer size to %i: %s",
+ rkb->rkb_rk->rk_conf.socket_rcvbuf_size,
+ rd_socket_strerror(rd_socket_errno));
+ }
+
+ /* Get send and receive buffer sizes to allow limiting
+ * the total number of bytes passed with iovecs to sendmsg()
+ * and recvmsg(). */
+ slen = sizeof(rktrans->rktrans_rcvbuf_size);
+ if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF,
+ (void *)&rktrans->rktrans_rcvbuf_size,
+ &slen) == RD_SOCKET_ERROR) {
+ rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
+ "Failed to get socket receive "
+ "buffer size: %s: assuming 1MB",
+ rd_socket_strerror(rd_socket_errno));
+ rktrans->rktrans_rcvbuf_size = 1024 * 1024;
+ } else if (rktrans->rktrans_rcvbuf_size < 1024 * 64)
+ rktrans->rktrans_rcvbuf_size =
+ 1024 * 64; /* Use at least 64KB */
+
+ slen = sizeof(rktrans->rktrans_sndbuf_size);
+ if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF,
+ (void *)&rktrans->rktrans_sndbuf_size,
+ &slen) == RD_SOCKET_ERROR) {
+ rd_rkb_log(rkb, LOG_WARNING, "RCVBUF",
+ "Failed to get socket send "
+ "buffer size: %s: assuming 1MB",
+ rd_socket_strerror(rd_socket_errno));
+ rktrans->rktrans_sndbuf_size = 1024 * 1024;
+ } else if (rktrans->rktrans_sndbuf_size < 1024 * 64)
+ rktrans->rktrans_sndbuf_size =
+ 1024 * 64; /* Use at least 64KB */
+
+
+#ifdef TCP_NODELAY
+ if (rkb->rkb_rk->rk_conf.socket_nagle_disable) {
+ int one = 1;
+ if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY,
+ (void *)&one, sizeof(one)) == RD_SOCKET_ERROR)
+ rd_rkb_log(rkb, LOG_WARNING, "NAGLE",
+ "Failed to disable Nagle (TCP_NODELAY) "
+ "on socket: %s",
+ rd_socket_strerror(rd_socket_errno));
+ }
+#endif
+}
+
+
+/**
+ * TCP connection established.
+ * Set up socket options, SSL, etc.
+ *
+ * Locality: broker thread
+ */
+static void rd_kafka_transport_connected(rd_kafka_transport_t *rktrans) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+
+ rd_rkb_dbg(
+ rkb, BROKER, "CONNECT", "Connected to %s",
+ rd_sockaddr2str(rkb->rkb_addr_last,
+ RD_SOCKADDR2STR_F_PORT | RD_SOCKADDR2STR_F_FAMILY));
+
+ rd_kafka_transport_post_connect_setup(rktrans);
+
+#if WITH_SSL
+ if (rkb->rkb_proto == RD_KAFKA_PROTO_SSL ||
+ rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL) {
+ char errstr[512];
+
+ rd_kafka_broker_lock(rkb);
+ rd_kafka_broker_set_state(rkb,
+ RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE);
+ rd_kafka_broker_unlock(rkb);
+
+ /* Set up SSL connection.
+ * This is also an asynchronous operation so dont
+ * propagate to broker_connect_done() just yet. */
+ if (rd_kafka_transport_ssl_connect(rkb, rktrans, errstr,
+ sizeof(errstr)) == -1) {
+ rd_kafka_transport_connect_done(rktrans, errstr);
+ return;
+ }
+ return;
+ }
+#endif
+
+ /* Propagate connect success */
+ rd_kafka_transport_connect_done(rktrans, NULL);
+}
+
+
+
+/**
+ * @brief the kernel SO_ERROR in \p errp for the given transport.
+ * @returns 0 if getsockopt() was succesful (and \p and errp can be trusted),
+ * else -1 in which case \p errp 's value is undefined.
+ */
+static int rd_kafka_transport_get_socket_error(rd_kafka_transport_t *rktrans,
+ int *errp) {
+ socklen_t intlen = sizeof(*errp);
+
+ if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_ERROR, (void *)errp,
+ &intlen) == -1) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, BROKER, "SO_ERROR",
+ "Failed to get socket error: %s",
+ rd_socket_strerror(rd_socket_errno));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/**
+ * IO event handler.
+ *
+ * @param socket_errstr Is an optional (else NULL) error string from the
+ * socket layer.
+ *
+ * Locality: broker thread
+ */
+static void rd_kafka_transport_io_event(rd_kafka_transport_t *rktrans,
+ int events,
+ const char *socket_errstr) {
+ char errstr[512];
+ int r;
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+
+ switch (rkb->rkb_state) {
+ case RD_KAFKA_BROKER_STATE_CONNECT:
+ /* Asynchronous connect finished, read status. */
+ if (!(events & (POLLOUT | POLLERR | POLLHUP)))
+ return;
+
+ if (socket_errstr)
+ rd_kafka_broker_fail(
+ rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
+ "Connect to %s failed: %s",
+ rd_sockaddr2str(rkb->rkb_addr_last,
+ RD_SOCKADDR2STR_F_PORT |
+ RD_SOCKADDR2STR_F_FAMILY),
+ socket_errstr);
+ else if (rd_kafka_transport_get_socket_error(rktrans, &r) ==
+ -1) {
+ rd_kafka_broker_fail(
+ rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
+ "Connect to %s failed: "
+ "unable to get status from "
+ "socket %d: %s",
+ rd_sockaddr2str(rkb->rkb_addr_last,
+ RD_SOCKADDR2STR_F_PORT |
+ RD_SOCKADDR2STR_F_FAMILY),
+ rktrans->rktrans_s, rd_strerror(rd_socket_errno));
+ } else if (r != 0) {
+ /* Connect failed */
+ rd_snprintf(
+ errstr, sizeof(errstr), "Connect to %s failed: %s",
+ rd_sockaddr2str(rkb->rkb_addr_last,
+ RD_SOCKADDR2STR_F_PORT |
+ RD_SOCKADDR2STR_F_FAMILY),
+ rd_strerror(r));
+
+ rd_kafka_transport_connect_done(rktrans, errstr);
+ } else {
+ /* Connect succeeded */
+ rd_kafka_transport_connected(rktrans);
+ }
+ break;
+
+ case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:
+#if WITH_SSL
+ rd_assert(rktrans->rktrans_ssl);
+
+ /* Currently setting up SSL connection:
+ * perform handshake. */
+ r = rd_kafka_transport_ssl_handshake(rktrans);
+
+ if (r == 0 /* handshake still in progress */ &&
+ (events & POLLHUP)) {
+ rd_kafka_broker_conn_closed(
+ rkb, RD_KAFKA_RESP_ERR__TRANSPORT, "Disconnected");
+ return;
+ }
+
+#else
+ RD_NOTREACHED();
+#endif
+ break;
+
+ case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:
+ /* SASL authentication.
+ * Prior to broker version v1.0.0 this is performed
+ * directly on the socket without Kafka framing. */
+ if (rd_kafka_sasl_io_event(rktrans, events, errstr,
+ sizeof(errstr)) == -1) {
+ rd_kafka_broker_fail(
+ rkb, LOG_ERR, RD_KAFKA_RESP_ERR__AUTHENTICATION,
+ "SASL authentication failure: %s", errstr);
+ return;
+ }
+
+ if (events & POLLHUP) {
+ rd_kafka_broker_fail(rkb, LOG_ERR,
+ RD_KAFKA_RESP_ERR__AUTHENTICATION,
+ "Disconnected");
+
+ return;
+ }
+
+ break;
+
+ case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
+ case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
+ case RD_KAFKA_BROKER_STATE_AUTH_REQ:
+ case RD_KAFKA_BROKER_STATE_UP:
+ case RD_KAFKA_BROKER_STATE_UPDATE:
+
+ if (events & POLLIN) {
+ while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
+ rd_kafka_recv(rkb) > 0)
+ ;
+
+ /* If connection went down: bail out early */
+ if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN)
+ return;
+ }
+
+ if (events & POLLHUP) {
+ rd_kafka_broker_conn_closed(
+ rkb, RD_KAFKA_RESP_ERR__TRANSPORT, "Disconnected");
+ return;
+ }
+
+ if (events & POLLOUT) {
+ while (rd_kafka_send(rkb) > 0)
+ ;
+ }
+ break;
+
+ case RD_KAFKA_BROKER_STATE_INIT:
+ case RD_KAFKA_BROKER_STATE_DOWN:
+ case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
+ rd_kafka_assert(rkb->rkb_rk, !*"bad state");
+ }
+}
+
+
+
+#ifdef _WIN32
+/**
+ * @brief Convert WSA FD_.. events to POLL.. events.
+ */
+static RD_INLINE int rd_kafka_transport_wsa2events(long wevents) {
+ int events = 0;
+
+ if (unlikely(wevents == 0))
+ return 0;
+
+ if (wevents & FD_READ)
+ events |= POLLIN;
+ if (wevents & (FD_WRITE | FD_CONNECT))
+ events |= POLLOUT;
+ if (wevents & FD_CLOSE)
+ events |= POLLHUP;
+
+ rd_dassert(events != 0);
+
+ return events;
+}
+
+/**
+ * @brief Convert POLL.. events to WSA FD_.. events.
+ */
+static RD_INLINE int rd_kafka_transport_events2wsa(int events,
+ rd_bool_t is_connecting) {
+ long wevents = FD_CLOSE;
+
+ if (unlikely(is_connecting))
+ return wevents | FD_CONNECT;
+
+ if (events & POLLIN)
+ wevents |= FD_READ;
+ if (events & POLLOUT)
+ wevents |= FD_WRITE;
+
+ return wevents;
+}
+
+
+/**
+ * @returns the WinSocket events (as POLL.. events) for the broker socket.
+ */
+static int rd_kafka_transport_get_wsa_events(rd_kafka_transport_t *rktrans) {
+ const int try_bits[4 * 2] = {FD_READ_BIT, POLLIN, FD_WRITE_BIT,
+ POLLOUT, FD_CONNECT_BIT, POLLOUT,
+ FD_CLOSE_BIT, POLLHUP};
+ int r, i;
+ WSANETWORKEVENTS netevents;
+ int events = 0;
+ const char *socket_errstr = NULL;
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+
+ /* Get Socket event */
+ r = WSAEnumNetworkEvents(rktrans->rktrans_s, rktrans->rktrans_wsaevent,
+ &netevents);
+ if (unlikely(r == SOCKET_ERROR)) {
+ rd_rkb_log(rkb, LOG_ERR, "WSAWAIT",
+ "WSAEnumNetworkEvents() failed: %s",
+ rd_socket_strerror(rd_socket_errno));
+ socket_errstr = rd_socket_strerror(rd_socket_errno);
+ return POLLHUP | POLLERR;
+ }
+
+ /* Get fired events and errors for each event type */
+ for (i = 0; i < RD_ARRAYSIZE(try_bits); i += 2) {
+ const int bit = try_bits[i];
+ const int event = try_bits[i + 1];
+
+ if (!(netevents.lNetworkEvents & (1 << bit)))
+ continue;
+
+ if (unlikely(netevents.iErrorCode[bit])) {
+ socket_errstr =
+ rd_socket_strerror(netevents.iErrorCode[bit]);
+ events |= POLLHUP;
+ } else {
+ events |= event;
+
+ if (bit == FD_WRITE_BIT) {
+ /* Writing no longer blocked */
+ rktrans->rktrans_blocked = rd_false;
+ }
+ }
+ }
+
+ return events;
+}
+
+
+/**
+ * @brief Win32: Poll transport and \p rkq cond events.
+ *
+ * @returns the transport socket POLL.. event bits.
+ */
+static int rd_kafka_transport_io_serve_win32(rd_kafka_transport_t *rktrans,
+ rd_kafka_q_t *rkq,
+ int timeout_ms) {
+ const DWORD wsaevent_cnt = 3;
+ WSAEVENT wsaevents[3] = {
+ rkq->rkq_cond.mEvents[0], /* rkq: cnd_signal */
+ rkq->rkq_cond.mEvents[1], /* rkq: cnd_broadcast */
+ rktrans->rktrans_wsaevent, /* socket */
+ };
+ DWORD r;
+ int events = 0;
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ rd_bool_t set_pollout = rd_false;
+ rd_bool_t cnd_is_waiting = rd_false;
+
+ /* WSA only sets FD_WRITE (e.g., POLLOUT) when the socket was
+ * previously blocked, unlike BSD sockets that set POLLOUT as long as
+ * the socket isn't blocked. So we need to imitate the BSD behaviour
+ * here and cut the timeout short if a write is wanted and the socket
+ * is not currently blocked. */
+ if (rktrans->rktrans_rkb->rkb_state != RD_KAFKA_BROKER_STATE_CONNECT &&
+ !rktrans->rktrans_blocked &&
+ (rktrans->rktrans_pfd[0].events & POLLOUT)) {
+ timeout_ms = 0;
+ set_pollout = rd_true;
+ } else {
+ /* Check if the queue already has ops enqueued in which case we
+ * cut the timeout short. Else add this thread as waiting on the
+ * queue's condvar so that cnd_signal() (et.al.) will perform
+ * SetEvent() and thus wake up this thread in case a new op is
+ * added to the queue. */
+ mtx_lock(&rkq->rkq_lock);
+ if (rkq->rkq_qlen > 0) {
+ timeout_ms = 0;
+ } else {
+ cnd_is_waiting = rd_true;
+ cnd_wait_enter(&rkq->rkq_cond);
+ }
+ mtx_unlock(&rkq->rkq_lock);
+ }
+
+ /* Wait for IO and queue events */
+ r = WSAWaitForMultipleEvents(wsaevent_cnt, wsaevents, FALSE, timeout_ms,
+ FALSE);
+
+ if (cnd_is_waiting) {
+ mtx_lock(&rkq->rkq_lock);
+ cnd_wait_exit(&rkq->rkq_cond);
+ mtx_unlock(&rkq->rkq_lock);
+ }
+
+ if (unlikely(r == WSA_WAIT_FAILED)) {
+ rd_rkb_log(rkb, LOG_CRIT, "WSAWAIT",
+ "WSAWaitForMultipleEvents failed: %s",
+ rd_socket_strerror(rd_socket_errno));
+ return POLLERR;
+ } else if (r != WSA_WAIT_TIMEOUT) {
+ r -= WSA_WAIT_EVENT_0;
+
+ /* Reset the cond events if any of them were triggered */
+ if (r < 2) {
+ ResetEvent(rkq->rkq_cond.mEvents[0]);
+ ResetEvent(rkq->rkq_cond.mEvents[1]);
+ }
+
+ /* Get the socket events. */
+ events = rd_kafka_transport_get_wsa_events(rktrans);
+ }
+
+ /* As explained above we need to set the POLLOUT flag
+ * in case it is wanted but not triggered by Winsocket so that
+ * io_event() knows it can attempt to send more data. */
+ if (likely(set_pollout && !(events & (POLLHUP | POLLERR | POLLOUT))))
+ events |= POLLOUT;
+
+ return events;
+}
+#endif
+
+
+/**
+ * @brief Poll and serve IOs
+ *
+ * @returns 0 if \p rkq may need additional blocking/timeout polling, else 1.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_transport_io_serve(rd_kafka_transport_t *rktrans,
+ rd_kafka_q_t *rkq,
+ int timeout_ms) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ int events;
+
+ rd_kafka_curr_transport = rktrans;
+
+ if (
+#ifndef _WIN32
+ /* BSD sockets use POLLOUT to indicate success to connect.
+ * Windows has its own flag for this (FD_CONNECT). */
+ rkb->rkb_state == RD_KAFKA_BROKER_STATE_CONNECT ||
+#endif
+ (rkb->rkb_state > RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE &&
+ rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
+ rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0))
+ rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT);
+
+#ifdef _WIN32
+ /* BSD sockets use POLLIN and a following recv() returning 0 to
+ * to indicate connection close.
+ * Windows has its own flag for this (FD_CLOSE). */
+ if (rd_kafka_bufq_cnt(&rkb->rkb_waitresps) > 0)
+#endif
+ rd_kafka_transport_poll_set(rkb->rkb_transport, POLLIN);
+
+ /* On Windows we can wait for both IO and condvars (rkq)
+ * simultaneously.
+ *
+ * On *nix/BSD sockets we use a local pipe (pfd[1]) to wake
+ * up the rkq. */
+#ifdef _WIN32
+ events = rd_kafka_transport_io_serve_win32(rktrans, rkq, timeout_ms);
+
+#else
+ if (rd_kafka_transport_poll(rktrans, timeout_ms) < 1)
+ return 0; /* No events, caller can block on \p rkq poll */
+
+ /* Broker socket events */
+ events = rktrans->rktrans_pfd[0].revents;
+#endif
+
+ if (events) {
+ rd_kafka_transport_poll_clear(rktrans, POLLOUT | POLLIN);
+
+ rd_kafka_transport_io_event(rktrans, events, NULL);
+ }
+
+ return 1;
+}
+
+
+/**
+ * @brief Create a new transport object using existing socket \p s.
+ */
+rd_kafka_transport_t *rd_kafka_transport_new(rd_kafka_broker_t *rkb,
+ rd_socket_t s,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_transport_t *rktrans;
+ int on = 1;
+ int r;
+
+#ifdef SO_NOSIGPIPE
+ /* Disable SIGPIPE signalling for this socket on OSX */
+ if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1)
+ rd_rkb_dbg(rkb, BROKER, "SOCKET",
+ "Failed to set SO_NOSIGPIPE: %s",
+ rd_socket_strerror(rd_socket_errno));
+#endif
+
+#ifdef SO_KEEPALIVE
+ /* Enable TCP keep-alives, if configured. */
+ if (rkb->rkb_rk->rk_conf.socket_keepalive) {
+ if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (void *)&on,
+ sizeof(on)) == RD_SOCKET_ERROR)
+ rd_rkb_dbg(rkb, BROKER, "SOCKET",
+ "Failed to set SO_KEEPALIVE: %s",
+ rd_socket_strerror(rd_socket_errno));
+ }
+#endif
+
+ /* Set the socket to non-blocking */
+ if ((r = rd_fd_set_nonblocking(s))) {
+ rd_snprintf(errstr, errstr_size,
+ "Failed to set socket non-blocking: %s",
+ rd_socket_strerror(r));
+ return NULL;
+ }
+
+
+ rktrans = rd_calloc(1, sizeof(*rktrans));
+ rktrans->rktrans_rkb = rkb;
+ rktrans->rktrans_s = s;
+
+#ifdef _WIN32
+ rktrans->rktrans_wsaevent = WSACreateEvent();
+ rd_assert(rktrans->rktrans_wsaevent != NULL);
+#endif
+
+ return rktrans;
+}
+
+
+/**
+ * Initiate asynchronous connection attempt.
+ *
+ * Locality: broker thread
+ */
+rd_kafka_transport_t *rd_kafka_transport_connect(rd_kafka_broker_t *rkb,
+ const rd_sockaddr_inx_t *sinx,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_transport_t *rktrans;
+ int s = -1;
+ int r;
+
+ rkb->rkb_addr_last = sinx;
+
+ s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family, SOCK_STREAM,
+ IPPROTO_TCP,
+ rkb->rkb_rk->rk_conf.opaque);
+ if (s == -1) {
+ rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
+ rd_socket_strerror(rd_socket_errno));
+ return NULL;
+ }
+
+ rktrans = rd_kafka_transport_new(rkb, s, errstr, errstr_size);
+ if (!rktrans) {
+ rd_kafka_transport_close0(rkb->rkb_rk, s);
+ return NULL;
+ }
+
+ rd_rkb_dbg(rkb, BROKER, "CONNECT",
+ "Connecting to %s (%s) "
+ "with socket %i",
+ rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |
+ RD_SOCKADDR2STR_F_PORT),
+ rd_kafka_secproto_names[rkb->rkb_proto], s);
+
+ /* Connect to broker */
+ if (rkb->rkb_rk->rk_conf.connect_cb) {
+ rd_kafka_broker_lock(rkb); /* for rkb_nodename */
+ r = rkb->rkb_rk->rk_conf.connect_cb(
+ s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
+ rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);
+ rd_kafka_broker_unlock(rkb);
+ } else {
+ if (connect(s, (struct sockaddr *)sinx,
+ RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR &&
+ (rd_socket_errno != EINPROGRESS
+#ifdef _WIN32
+ && rd_socket_errno != WSAEWOULDBLOCK
+#endif
+ ))
+ r = rd_socket_errno;
+ else
+ r = 0;
+ }
+
+ if (r != 0) {
+ rd_rkb_dbg(rkb, BROKER, "CONNECT",
+ "Couldn't connect to %s: %s (%i)",
+ rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_PORT |
+ RD_SOCKADDR2STR_F_FAMILY),
+ rd_socket_strerror(r), r);
+ rd_snprintf(errstr, errstr_size,
+ "Failed to connect to broker at %s: %s",
+ rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
+ rd_socket_strerror(r));
+
+ rd_kafka_transport_close(rktrans);
+ return NULL;
+ }
+
+ /* Set up transport handle */
+ rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;
+ if (rkb->rkb_wakeup_fd[0] != -1) {
+ rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
+ rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd =
+ rkb->rkb_wakeup_fd[0];
+ }
+
+
+ /* Poll writability to trigger on connection success/failure. */
+ rd_kafka_transport_poll_set(rktrans, POLLOUT);
+
+ return rktrans;
+}
+
+
+#ifdef _WIN32
+/**
+ * @brief Set the WinSocket event poll bit to \p events.
+ */
+static void rd_kafka_transport_poll_set_wsa(rd_kafka_transport_t *rktrans,
+ int events) {
+ int r;
+ r = WSAEventSelect(
+ rktrans->rktrans_s, rktrans->rktrans_wsaevent,
+ rd_kafka_transport_events2wsa(rktrans->rktrans_pfd[0].events,
+ rktrans->rktrans_rkb->rkb_state ==
+ RD_KAFKA_BROKER_STATE_CONNECT));
+ if (unlikely(r != 0)) {
+ rd_rkb_log(rktrans->rktrans_rkb, LOG_CRIT, "WSAEVENT",
+ "WSAEventSelect() failed: %s",
+ rd_socket_strerror(rd_socket_errno));
+ }
+}
+#endif
+
+void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event) {
+ if ((rktrans->rktrans_pfd[0].events & event) == event)
+ return;
+
+ rktrans->rktrans_pfd[0].events |= event;
+
+#ifdef _WIN32
+ rd_kafka_transport_poll_set_wsa(rktrans,
+ rktrans->rktrans_pfd[0].events);
+#endif
+}
+
+void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event) {
+ if (!(rktrans->rktrans_pfd[0].events & event))
+ return;
+
+ rktrans->rktrans_pfd[0].events &= ~event;
+
+#ifdef _WIN32
+ rd_kafka_transport_poll_set_wsa(rktrans,
+ rktrans->rktrans_pfd[0].events);
+#endif
+}
+
+#ifndef _WIN32
+/**
+ * @brief Poll transport fds.
+ *
+ * @returns 1 if an event was raised, else 0, or -1 on error.
+ */
+static int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) {
+ int r;
+
+ r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
+ if (r <= 0)
+ return r;
+
+ if (rktrans->rktrans_pfd[1].revents & POLLIN) {
+ /* Read wake-up fd data and throw away, just used for wake-ups*/
+ char buf[1024];
+ while (rd_socket_read((int)rktrans->rktrans_pfd[1].fd, buf,
+ sizeof(buf)) > 0)
+ ; /* Read all buffered signalling bytes */
+ }
+
+ return 1;
+}
+#endif
+
+#ifdef _WIN32
+/**
+ * @brief A socket write operation would block, flag the socket
+ * as blocked so that POLLOUT events are handled correctly.
+ *
+ * This is really only used on Windows where POLLOUT (FD_WRITE) is
+ * edge-triggered rather than level-triggered.
+ */
+void rd_kafka_transport_set_blocked(rd_kafka_transport_t *rktrans,
+ rd_bool_t blocked) {
+ rktrans->rktrans_blocked = blocked;
+}
+#endif
+
+
+#if 0
+/**
+ * Global cleanup.
+ * This is dangerous and SHOULD NOT be called since it will rip
+ * the rug from under the application if it uses any of this functionality
+ * in its own code. This means we might leak some memory on exit.
+ */
+void rd_kafka_transport_term (void) {
+#ifdef _WIN32
+ (void)WSACleanup(); /* FIXME: dangerous */
+#endif
+}
+#endif
+
+void rd_kafka_transport_init(void) {
+#ifdef _WIN32
+ WSADATA d;
+ (void)WSAStartup(MAKEWORD(2, 2), &d);
+#endif
+}