summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/sockem.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/sockem.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/sockem.c801
1 files changed, 801 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/sockem.c b/fluent-bit/lib/librdkafka-2.1.0/tests/sockem.c
new file mode 100644
index 000000000..2de01627d
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/sockem.c
@@ -0,0 +1,801 @@
+/*
+ * sockem - socket-level network emulation
+ *
+ * Copyright (c) 2016, Magnus Edenhill, Andreas Smas
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#define _GNU_SOURCE /* for strdupa() and RTLD_NEXT */
+#include <errno.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <poll.h>
+#include <assert.h>
+#include <netinet/in.h>
+#include <dlfcn.h>
+
+#include "sockem.h"
+
+#include <sys/queue.h>
+
+#ifdef __APPLE__
+#include <sys/time.h> /* for gettimeofday() */
+#endif
+
+#ifdef _WIN32
+#define socket_errno() WSAGetLastError()
+#else
+#define socket_errno() errno
+#define SOCKET_ERROR -1
+#endif
+
+#ifndef strdupa
+#define strdupa(s) \
+ ({ \
+ const char *_s = (s); \
+ size_t _len = strlen(_s) + 1; \
+ char *_d = (char *)alloca(_len); \
+ (char *)memcpy(_d, _s, _len); \
+ })
+#endif
+
+#include <pthread.h>
+typedef pthread_mutex_t mtx_t;
+#define mtx_init(M) pthread_mutex_init(M, NULL)
+#define mtx_destroy(M) pthread_mutex_destroy(M)
+#define mtx_lock(M) pthread_mutex_lock(M)
+#define mtx_unlock(M) pthread_mutex_unlock(M)
+
+typedef pthread_t thrd_t;
+#define thrd_create(THRD, START_ROUTINE, ARG) \
+ pthread_create(THRD, NULL, START_ROUTINE, ARG)
+#define thrd_join0(THRD) pthread_join(THRD, NULL)
+
+
+static mtx_t sockem_lock;
+static LIST_HEAD(, sockem_s) sockems;
+
+static pthread_once_t sockem_once = PTHREAD_ONCE_INIT;
+static char *sockem_conf_str = "";
+
+typedef int64_t sockem_ts_t;
+
+
+#ifdef LIBSOCKEM_PRELOAD
+static int (*sockem_orig_connect)(int, const struct sockaddr *, socklen_t);
+static int (*sockem_orig_close)(int);
+
+#define sockem_close0(S) (sockem_orig_close(S))
+#define sockem_connect0(S, A, AL) (sockem_orig_connect(S, A, AL))
+#else
+#define sockem_close0(S) close(S)
+#define sockem_connect0(S, A, AL) connect(S, A, AL)
+#endif
+
+
+struct sockem_conf {
+ /* FIXME: these needs to be implemented */
+ int tx_thruput; /* app->peer bytes/second */
+ int rx_thruput; /* peer->app bytes/second */
+ int delay; /* latency in ms */
+ int jitter; /* latency variation in ms */
+ int debug; /* enable sockem printf debugging */
+ size_t recv_bufsz; /* recv chunk/buffer size */
+ int direct; /* direct forward, no delay or rate-limiting */
+};
+
+
+typedef struct sockem_buf_s {
+ TAILQ_ENTRY(sockem_buf_s) sb_link;
+ size_t sb_size;
+ size_t sb_of;
+ char *sb_data;
+ int64_t sb_at; /* Transmit at this absolute time. */
+} sockem_buf_t;
+
+
+struct sockem_s {
+ LIST_ENTRY(sockem_s) link;
+
+ enum {
+ /* Forwarder thread run states */
+ SOCKEM_INIT,
+ SOCKEM_START,
+ SOCKEM_RUN,
+ SOCKEM_TERM
+ } run;
+
+ int as; /* application's socket. */
+ int ls; /* internal application listen socket */
+ int ps; /* internal peer socket connecting sockem to the peer.*/
+
+ void *recv_buf; /* Receive buffer */
+ size_t recv_bufsz; /* .. size */
+
+ int linked; /* On sockems list */
+
+ thrd_t thrd; /* Forwarder thread */
+
+ mtx_t lock;
+
+ struct sockem_conf conf; /* application-set config.
+ * protected by .lock */
+
+ struct sockem_conf use; /* last copy of .conf
+ * local to skm thread */
+
+ TAILQ_HEAD(, sockem_buf_s)
+ bufs; /* Buffers in queue waiting for
+ * transmission (delayed) */
+
+ size_t bufs_size; /* Total number of bytes currently enqueued
+ * for transmission */
+ size_t bufs_size_max; /* Soft max threshold for bufs_size,
+ * when this value is exceeded the app fd
+ * is removed from the poll set until
+ * bufs_size falls below the threshold again. */
+ int poll_fd_cnt;
+ int64_t ts_last_fwd; /* For rate-limiter: timestamp of last forward */
+};
+
+
+static int sockem_vset(sockem_t *skm, va_list ap);
+
+
+/**
+ * A microsecond monotonic clock
+ */
+static __attribute__((unused)) __inline int64_t sockem_clock(void) {
+#ifdef __APPLE__
+ /* No monotonic clock on Darwin */
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return ((int64_t)tv.tv_sec * 1000000LLU) + (int64_t)tv.tv_usec;
+#elif defined(_WIN32)
+ return (int64_t)GetTickCount64() * 1000LLU;
+#else
+ struct timespec ts;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ return ((int64_t)ts.tv_sec * 1000000LLU) +
+ ((int64_t)ts.tv_nsec / 1000LLU);
+#endif
+}
+
+/**
+ * @brief Initialize libsockem once.
+ */
+static void sockem_init(void) {
+ mtx_init(&sockem_lock);
+ sockem_conf_str = getenv("SOCKEM_CONF");
+ if (!sockem_conf_str)
+ sockem_conf_str = "";
+ if (strstr(sockem_conf_str, "debug"))
+ fprintf(stderr, "%% libsockem pre-loaded (%s)\n",
+ sockem_conf_str);
+#ifdef LIBSOCKEM_PRELOAD
+ sockem_orig_connect = dlsym(RTLD_NEXT, "connect");
+ sockem_orig_close = dlsym(RTLD_NEXT, "close");
+#endif
+}
+
+
+/**
+ * @returns the maximum waittime in ms for poll(), at most 1000 ms.
+ * @remark lock must be held
+ */
+static int sockem_calc_waittime(sockem_t *skm, int64_t now) {
+ const sockem_buf_t *sb;
+ int64_t r;
+
+ if (!(sb = TAILQ_FIRST(&skm->bufs)))
+ return 1000;
+ else if (now >= sb->sb_at || skm->use.direct)
+ return 0;
+ else if ((r = (sb->sb_at - now)) < 1000000) {
+ if (r < 1000)
+ return 1; /* Ceil to 1 to avoid busy-loop during
+ * last millisecond. */
+ else
+ return (int)(r / 1000);
+ } else
+ return 1000;
+}
+
+
+/**
+ * @brief Unlink and destroy a buffer
+ */
+static void sockem_buf_destroy(sockem_t *skm, sockem_buf_t *sb) {
+ skm->bufs_size -= sb->sb_size - sb->sb_of;
+ TAILQ_REMOVE(&skm->bufs, sb, sb_link);
+ free(sb);
+}
+
+/**
+ * @brief Add delayed buffer to transmit.
+ */
+static sockem_buf_t *
+sockem_buf_add(sockem_t *skm, size_t size, const void *data) {
+ sockem_buf_t *sb;
+
+ skm->bufs_size += size;
+ if (skm->bufs_size > skm->bufs_size_max) {
+ /* No more buffer space, halt recv fd until
+ * queued buffers drop below threshold. */
+ skm->poll_fd_cnt = 1;
+ }
+
+ sb = malloc(sizeof(*sb) + size);
+
+ sb->sb_of = 0;
+ sb->sb_size = size;
+ sb->sb_data = (char *)(sb + 1);
+ sb->sb_at = sockem_clock() +
+ ((skm->use.delay + (skm->use.jitter / 2) /*FIXME*/) * 1000);
+ memcpy(sb->sb_data, data, size);
+
+ TAILQ_INSERT_TAIL(&skm->bufs, sb, sb_link);
+
+ return sb;
+}
+
+
+/**
+ * @brief Forward any delayed buffers that have passed their deadline
+ * @remark lock must be held but will be released momentarily while
+ * performing send syscall.
+ */
+static int sockem_fwd_bufs(sockem_t *skm, int ofd) {
+ sockem_buf_t *sb;
+ int64_t now = sockem_clock();
+ size_t to_write;
+ int64_t elapsed;
+
+
+ if (skm->use.direct)
+ to_write = 1024 * 1024 * 100;
+ else if ((elapsed = now - skm->ts_last_fwd)) {
+ /* Calculate how many bytes to send to adhere to rate-limit */
+ to_write = (size_t)((double)skm->use.tx_thruput *
+ ((double)elapsed / 1000000.0));
+ } else
+ return 0;
+
+ while (to_write > 0 && (sb = TAILQ_FIRST(&skm->bufs)) &&
+ (skm->use.direct || sb->sb_at <= now)) {
+ ssize_t r;
+ size_t remain = sb->sb_size - sb->sb_of;
+ size_t wr = to_write < remain ? to_write : remain;
+
+ if (wr == 0)
+ break;
+
+ mtx_unlock(&skm->lock);
+
+ r = send(ofd, sb->sb_data + sb->sb_of, wr, 0);
+
+ mtx_lock(&skm->lock);
+
+ if (r == -1) {
+ if (errno == ENOBUFS || errno == EAGAIN ||
+ errno == EWOULDBLOCK)
+ return 0;
+ return -1;
+ }
+
+ skm->ts_last_fwd = now;
+
+ sb->sb_of += r;
+ to_write -= r;
+
+ if (sb->sb_of < sb->sb_size)
+ break;
+
+ sockem_buf_destroy(skm, sb);
+
+ now = sockem_clock();
+ }
+
+ /* Re-enable app fd poll if queued buffers are below threshold */
+ if (skm->bufs_size < skm->bufs_size_max)
+ skm->poll_fd_cnt = 2;
+
+ return 0;
+}
+
+
+/**
+ * @brief read from \p ifd, write to \p ofd in a blocking fashion.
+ *
+ * @returns the number of bytes forwarded, or -1 on error.
+ */
+static int sockem_recv_fwd(sockem_t *skm, int ifd, int ofd, int direct) {
+ ssize_t r, wr;
+
+ r = recv(ifd, skm->recv_buf, skm->recv_bufsz, MSG_DONTWAIT);
+ if (r == -1) {
+ int serr = socket_errno();
+ if (serr == EAGAIN || serr == EWOULDBLOCK)
+ return 0;
+ return -1;
+
+ } else if (r == 0) {
+ /* Socket closed */
+ return -1;
+ }
+
+ if (direct) {
+ /* No delay, rate limit, or buffered data: send right away */
+ wr = send(ofd, skm->recv_buf, r, 0);
+ if (wr < r)
+ return -1;
+
+ return wr;
+ } else {
+ sockem_buf_add(skm, r, skm->recv_buf);
+ return r;
+ }
+}
+
+
+/**
+ * @brief Close all sockets and unsets ->run.
+ * @remark Preserves caller's errno.
+ * @remark lock must be held.
+ */
+static void sockem_close_all(sockem_t *skm) {
+ int serr = socket_errno();
+
+ if (skm->ls != -1) {
+ sockem_close0(skm->ls);
+ skm->ls = -1;
+ }
+
+ if (skm->ps != -1) {
+ sockem_close0(skm->ps);
+ skm->ps = -1;
+ }
+
+ skm->run = SOCKEM_TERM;
+
+ errno = serr;
+}
+
+
+/**
+ * @brief Copy desired (app) config to internally use(d) configuration.
+ * @remark lock must be held
+ */
+static __inline void sockem_conf_use(sockem_t *skm) {
+ skm->use = skm->conf;
+ /* Figure out if direct forward is to be used */
+ skm->use.direct = !(skm->use.delay || skm->use.jitter ||
+ (skm->use.tx_thruput < (1 << 30)));
+}
+
+/**
+ * @brief sockem internal per-socket forwarder thread
+ */
+static void *sockem_run(void *arg) {
+ sockem_t *skm = arg;
+ int cs = -1;
+ int ls;
+ struct pollfd pfd[2];
+
+ mtx_lock(&skm->lock);
+ if (skm->run == SOCKEM_START)
+ skm->run = SOCKEM_RUN;
+ sockem_conf_use(skm);
+ ls = skm->ls;
+ mtx_unlock(&skm->lock);
+
+ skm->recv_bufsz = skm->use.recv_bufsz;
+ skm->recv_buf = malloc(skm->recv_bufsz);
+
+ /* Accept connection from sockfd in sockem_connect() */
+ cs = accept(ls, NULL, 0);
+ if (cs == -1) {
+ mtx_lock(&skm->lock);
+ if (skm->run == SOCKEM_TERM) {
+ /* App socket was closed. */
+ goto done;
+ }
+ fprintf(stderr, "%% sockem: accept(%d) failed: %s\n", ls,
+ strerror(socket_errno()));
+ mtx_unlock(&skm->lock);
+ assert(cs != -1);
+ }
+
+ /* Set up poll (blocking IO) */
+ memset(pfd, 0, sizeof(pfd));
+ pfd[1].fd = cs;
+ pfd[1].events = POLLIN;
+
+ mtx_lock(&skm->lock);
+ pfd[0].fd = skm->ps;
+ mtx_unlock(&skm->lock);
+ pfd[0].events = POLLIN;
+
+ skm->poll_fd_cnt = 2;
+
+ mtx_lock(&skm->lock);
+ while (skm->run == SOCKEM_RUN) {
+ int r;
+ int i;
+ int waittime = sockem_calc_waittime(skm, sockem_clock());
+
+ mtx_unlock(&skm->lock);
+ r = poll(pfd, skm->poll_fd_cnt, waittime);
+ if (r == -1)
+ break;
+
+ /* Send/forward delayed buffers */
+ mtx_lock(&skm->lock);
+ sockem_conf_use(skm);
+
+ if (sockem_fwd_bufs(skm, skm->ps) == -1) {
+ mtx_unlock(&skm->lock);
+ skm->run = SOCKEM_TERM;
+ break;
+ }
+ mtx_unlock(&skm->lock);
+
+ for (i = 0; r > 0 && i < 2; i++) {
+ if (pfd[i].revents & (POLLHUP | POLLERR)) {
+ skm->run = SOCKEM_TERM;
+
+ } else if (pfd[i].revents & POLLIN) {
+ if (sockem_recv_fwd(
+ skm, pfd[i].fd, pfd[i ^ 1].fd,
+ /* direct mode for app socket
+ * without delay, and always for
+ * peer socket (receive channel) */
+ i == 0 || (skm->use.direct &&
+ skm->bufs_size == 0)) ==
+ -1) {
+ skm->run = SOCKEM_TERM;
+ break;
+ }
+ }
+ }
+
+ mtx_lock(&skm->lock);
+ }
+done:
+ if (cs != -1)
+ sockem_close0(cs);
+ sockem_close_all(skm);
+
+ mtx_unlock(&skm->lock);
+ free(skm->recv_buf);
+
+
+ return NULL;
+}
+
+
+
+/**
+ * @brief Connect socket \p s to \p addr
+ */
+static int
+sockem_do_connect(int s, const struct sockaddr *addr, socklen_t addrlen) {
+ int r;
+
+ r = sockem_connect0(s, addr, addrlen);
+ if (r == SOCKET_ERROR) {
+ int serr = socket_errno();
+ if (serr != EINPROGRESS
+#ifdef _WIN32
+ && serr != WSAEWOULDBLOCK
+#endif
+ ) {
+#ifndef _WIN32
+ errno = serr;
+#endif
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+sockem_t *sockem_connect(int sockfd,
+ const struct sockaddr *addr,
+ socklen_t addrlen,
+ ...) {
+ sockem_t *skm;
+ int ls, ps;
+ struct sockaddr_in6 sin6 = {.sin6_family = addr->sa_family};
+ socklen_t addrlen2 = addrlen;
+ va_list ap;
+
+ pthread_once(&sockem_once, sockem_init);
+
+ /* Create internal app listener socket */
+ ls = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
+ if (ls == -1)
+ return NULL;
+
+ if (bind(ls, (struct sockaddr *)&sin6, addrlen) == -1) {
+ sockem_close0(ls);
+ return NULL;
+ }
+
+ /* Get bound address */
+ if (getsockname(ls, (struct sockaddr *)&sin6, &addrlen2) == -1) {
+ sockem_close0(ls);
+ return NULL;
+ }
+
+ if (listen(ls, 1) == -1) {
+ sockem_close0(ls);
+ return NULL;
+ }
+
+ /* Create internal peer socket */
+ ps = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
+ if (ps == -1) {
+ sockem_close0(ls);
+ return NULL;
+ }
+
+ /* Connect to peer */
+ if (sockem_do_connect(ps, addr, addrlen) == -1) {
+ sockem_close0(ls);
+ sockem_close0(ps);
+ return NULL;
+ }
+
+ /* Create sockem handle */
+ skm = calloc(1, sizeof(*skm));
+ skm->as = sockfd;
+ skm->ls = ls;
+ skm->ps = ps;
+ skm->bufs_size_max = 16 * 1024 * 1024; /* 16kb of queue buffer */
+ TAILQ_INIT(&skm->bufs);
+ mtx_init(&skm->lock);
+
+ /* Default config */
+ skm->conf.rx_thruput = 1 << 30;
+ skm->conf.tx_thruput = 1 << 30;
+ skm->conf.delay = 0;
+ skm->conf.jitter = 0;
+ skm->conf.recv_bufsz = 1024 * 1024;
+ skm->conf.direct = 1;
+
+ /* Apply passed configuration */
+ va_start(ap, addrlen);
+ if (sockem_vset(skm, ap) == -1) {
+ va_end(ap);
+ sockem_close(skm);
+ return NULL;
+ }
+ va_end(ap);
+
+ mtx_lock(&skm->lock);
+ skm->run = SOCKEM_START;
+
+ /* Create pipe thread */
+ if (thrd_create(&skm->thrd, sockem_run, skm) != 0) {
+ mtx_unlock(&skm->lock);
+ sockem_close(skm);
+ return NULL;
+ }
+ mtx_unlock(&skm->lock);
+
+ /* Connect application socket to listen socket */
+ if (sockem_do_connect(sockfd, (struct sockaddr *)&sin6, addrlen2) ==
+ -1) {
+ sockem_close(skm);
+ return NULL;
+ }
+
+ mtx_lock(&sockem_lock);
+ LIST_INSERT_HEAD(&sockems, skm, link);
+ mtx_lock(&skm->lock);
+ skm->linked = 1;
+ mtx_unlock(&skm->lock);
+ mtx_unlock(&sockem_lock);
+
+ return skm;
+}
+
+
+/**
+ * @brief Purge/drop all queued buffers
+ */
+static void sockem_bufs_purge(sockem_t *skm) {
+ sockem_buf_t *sb;
+
+ while ((sb = TAILQ_FIRST(&skm->bufs)))
+ sockem_buf_destroy(skm, sb);
+}
+
+
+void sockem_close(sockem_t *skm) {
+ mtx_lock(&sockem_lock);
+ mtx_lock(&skm->lock);
+ if (skm->linked)
+ LIST_REMOVE(skm, link);
+ mtx_unlock(&sockem_lock);
+
+ /* If thread is running let it close the sockets
+ * to avoid race condition. */
+ if (skm->run == SOCKEM_START || skm->run == SOCKEM_RUN)
+ skm->run = SOCKEM_TERM;
+ else
+ sockem_close_all(skm);
+
+ mtx_unlock(&skm->lock);
+
+ thrd_join0(skm->thrd);
+
+ sockem_bufs_purge(skm);
+
+ mtx_destroy(&skm->lock);
+
+
+ free(skm);
+}
+
+
+/**
+ * @brief Set single conf key.
+ * @remark lock must be held.
+ * @returns 0 on success or -1 if key is unknown
+ */
+static int sockem_set0(sockem_t *skm, const char *key, int val) {
+ if (!strcmp(key, "rx.thruput") || !strcmp(key, "rx.throughput"))
+ skm->conf.rx_thruput = val;
+ else if (!strcmp(key, "tx.thruput") || !strcmp(key, "tx.throughput"))
+ skm->conf.tx_thruput = val;
+ else if (!strcmp(key, "delay"))
+ skm->conf.delay = val;
+ else if (!strcmp(key, "jitter"))
+ skm->conf.jitter = val;
+ else if (!strcmp(key, "rx.bufsz"))
+ skm->conf.recv_bufsz = val;
+ else if (!strcmp(key, "debug"))
+ skm->conf.debug = val;
+ else if (!strcmp(key, "true"))
+ ; /* dummy key for allowing non-empty but default config */
+ else if (!strchr(key, ',')) {
+ char *s = strdupa(key);
+ while (*s) {
+ char *t = strchr(s, ',');
+ char *d = strchr(s, '=');
+ if (t)
+ *t = '\0';
+ if (!d)
+ return -1;
+ *(d++) = '\0';
+
+ if (sockem_set0(skm, s, atoi(d)) == -1)
+ return -1;
+
+ if (!t)
+ break;
+ s += 1;
+ }
+ } else
+ return -1;
+
+ return 0;
+}
+
+
+/**
+ * @brief Set sockem config parameters
+ */
+static int sockem_vset(sockem_t *skm, va_list ap) {
+ const char *key;
+ int val;
+
+ mtx_lock(&skm->lock);
+ while ((key = va_arg(ap, const char *))) {
+ val = va_arg(ap, int);
+ if (sockem_set0(skm, key, val) == -1) {
+ mtx_unlock(&skm->lock);
+ return -1;
+ }
+ }
+ mtx_unlock(&skm->lock);
+
+ return 0;
+}
+
+int sockem_set(sockem_t *skm, ...) {
+ va_list ap;
+ int r;
+
+ va_start(ap, skm);
+ r = sockem_vset(skm, ap);
+ va_end(ap);
+
+ return r;
+}
+
+
+sockem_t *sockem_find(int sockfd) {
+ sockem_t *skm;
+
+ pthread_once(&sockem_once, sockem_init);
+
+ mtx_lock(&sockem_lock);
+ LIST_FOREACH(skm, &sockems, link)
+ if (skm->as == sockfd)
+ break;
+ mtx_unlock(&sockem_lock);
+
+ return skm;
+}
+
+
+#ifdef LIBSOCKEM_PRELOAD
+/**
+ * Provide overloading socket APIs and conf bootstrapping from env vars.
+ *
+ */
+
+
+
+/**
+ * @brief connect(2) overload
+ */
+int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
+ sockem_t *skm;
+
+ pthread_once(&sockem_once, sockem_init);
+
+ skm = sockem_connect(sockfd, addr, addrlen, sockem_conf_str, 0, NULL);
+ if (!skm)
+ return -1;
+
+ return 0;
+}
+
+/**
+ * @brief close(2) overload
+ */
+int close(int fd) {
+ sockem_t *skm;
+
+ pthread_once(&sockem_once, sockem_init);
+
+ mtx_lock(&sockem_lock);
+ skm = sockem_find(fd);
+
+ if (skm)
+ sockem_close(skm);
+ mtx_unlock(&sockem_lock);
+
+ return sockem_close0(fd);
+}
+
+#endif