summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c213
1 files changed, 213 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c
new file mode 100644
index 000000000..f58bc5060
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c
@@ -0,0 +1,213 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_plugin.h"
+#include "rddl.h"
+
+
+typedef struct rd_kafka_plugin_s {
+ char *rkplug_path; /* Library path */
+ rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */
+ void *rkplug_handle; /* dlopen (or similar) handle */
+ void *rkplug_opaque; /* Plugin's opaque */
+
+} rd_kafka_plugin_t;
+
+
+/**
+ * @brief Plugin path comparator
+ */
+static int rd_kafka_plugin_cmp(const void *_a, const void *_b) {
+ const rd_kafka_plugin_t *a = _a, *b = _b;
+
+ return strcmp(a->rkplug_path, b->rkplug_path);
+}
+
+
+/**
+ * @brief Add plugin (by library path) and calls its conf_init() constructor
+ *
+ * @returns an error code on error.
+ * @remark duplicate plugins are silently ignored.
+ *
+ * @remark Libraries are refcounted and thus not unloaded until all
+ * plugins referencing the library have been destroyed.
+ * (dlopen() and LoadLibrary() does this for us)
+ */
+static rd_kafka_resp_err_t rd_kafka_plugin_new(rd_kafka_conf_t *conf,
+ const char *path,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_plugin_t *rkplug;
+ const rd_kafka_plugin_t skel = {.rkplug_path = (char *)path};
+ rd_kafka_plugin_f_conf_init_t *conf_init;
+ rd_kafka_resp_err_t err;
+ void *handle;
+ void *plug_opaque = NULL;
+
+ /* Avoid duplicates */
+ if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) {
+ rd_snprintf(errstr, errstr_size, "Ignoring duplicate plugin %s",
+ path);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", "Loading plugin \"%s\"", path);
+
+ /* Attempt to load library */
+ if (!(handle = rd_dl_open(path, errstr, errstr_size))) {
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+ "Failed to load plugin \"%s\": %s", path, errstr);
+ return RD_KAFKA_RESP_ERR__FS;
+ }
+
+ /* Find conf_init() function */
+ if (!(conf_init =
+ rd_dl_sym(handle, "conf_init", errstr, errstr_size))) {
+ rd_dl_close(handle);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ /* Call conf_init() */
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT",
+ "Calling plugin \"%s\" conf_init()", path);
+
+ if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) {
+ rd_dl_close(handle);
+ return err;
+ }
+
+ rkplug = rd_calloc(1, sizeof(*rkplug));
+ rkplug->rkplug_path = rd_strdup(path);
+ rkplug->rkplug_handle = handle;
+ rkplug->rkplug_opaque = plug_opaque;
+
+ rd_list_add(&conf->plugins, rkplug);
+
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", "Plugin \"%s\" loaded", path);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Free the plugin, any conf_destroy() interceptors will have been
+ * called prior to this call.
+ * @remark plugin is not removed from any list (caller's responsibility)
+ * @remark this relies on the actual library loader to refcount libraries,
+ * especially in the config copy case.
+ * This is true for POSIX dlopen() and Win32 LoadLibrary().
+ * @locality application thread
+ */
+static void rd_kafka_plugin_destroy(rd_kafka_plugin_t *rkplug) {
+ rd_dl_close(rkplug->rkplug_handle);
+ rd_free(rkplug->rkplug_path);
+ rd_free(rkplug);
+}
+
+
+
+/**
+ * @brief Initialize all configured plugins.
+ *
+ * @remark Any previously loaded plugins will be unloaded.
+ *
+ * @returns the error code of the first failing plugin.
+ * @locality application thread calling rd_kafka_new().
+ */
+static rd_kafka_conf_res_t rd_kafka_plugins_conf_set0(rd_kafka_conf_t *conf,
+ const char *paths,
+ char *errstr,
+ size_t errstr_size) {
+ char *s;
+
+ rd_list_destroy(&conf->plugins);
+ rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy);
+
+ if (!paths || !*paths)
+ return RD_KAFKA_CONF_OK;
+
+ /* Split paths by ; */
+ rd_strdupa(&s, paths);
+
+ rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD",
+ "Loading plugins from conf object %p: \"%s\"", conf,
+ paths);
+
+ while (s && *s) {
+ char *path = s;
+ char *t;
+ rd_kafka_resp_err_t err;
+
+ if ((t = strchr(s, ';'))) {
+ *t = '\0';
+ s = t + 1;
+ } else {
+ s = NULL;
+ }
+
+ if ((err = rd_kafka_plugin_new(conf, path, errstr,
+ errstr_size))) {
+ /* Failed to load plugin */
+ size_t elen = errstr_size > 0 ? strlen(errstr) : 0;
+
+ /* See if there is room for appending the
+ * plugin path to the error message. */
+ if (elen + strlen("(plugin )") + strlen(path) <
+ errstr_size)
+ rd_snprintf(errstr + elen, errstr_size - elen,
+ " (plugin %s)", path);
+
+ rd_list_destroy(&conf->plugins);
+ return RD_KAFKA_CONF_INVALID;
+ }
+ }
+
+ return RD_KAFKA_CONF_OK;
+}
+
+
+/**
+ * @brief Conf setter for "plugin.library.paths"
+ */
+rd_kafka_conf_res_t rd_kafka_plugins_conf_set(int scope,
+ void *pconf,
+ const char *name,
+ const char *value,
+ void *dstptr,
+ rd_kafka_conf_set_mode_t set_mode,
+ char *errstr,
+ size_t errstr_size) {
+
+ assert(scope == _RK_GLOBAL);
+ return rd_kafka_plugins_conf_set0(
+ (rd_kafka_conf_t *)pconf,
+ set_mode == _RK_CONF_PROP_SET_DEL ? NULL : value, errstr,
+ errstr_size);
+}