diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c new file mode 100644 index 00000000..f58bc506 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c @@ -0,0 +1,213 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_plugin.h" +#include "rddl.h" + + +typedef struct rd_kafka_plugin_s { + char *rkplug_path; /* Library path */ + rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */ + void *rkplug_handle; /* dlopen (or similar) handle */ + void *rkplug_opaque; /* Plugin's opaque */ + +} rd_kafka_plugin_t; + + +/** + * @brief Plugin path comparator + */ +static int rd_kafka_plugin_cmp(const void *_a, const void *_b) { + const rd_kafka_plugin_t *a = _a, *b = _b; + + return strcmp(a->rkplug_path, b->rkplug_path); +} + + +/** + * @brief Add plugin (by library path) and calls its conf_init() constructor + * + * @returns an error code on error. + * @remark duplicate plugins are silently ignored. + * + * @remark Libraries are refcounted and thus not unloaded until all + * plugins referencing the library have been destroyed. + * (dlopen() and LoadLibrary() does this for us) + */ +static rd_kafka_resp_err_t rd_kafka_plugin_new(rd_kafka_conf_t *conf, + const char *path, + char *errstr, + size_t errstr_size) { + rd_kafka_plugin_t *rkplug; + const rd_kafka_plugin_t skel = {.rkplug_path = (char *)path}; + rd_kafka_plugin_f_conf_init_t *conf_init; + rd_kafka_resp_err_t err; + void *handle; + void *plug_opaque = NULL; + + /* Avoid duplicates */ + if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) { + rd_snprintf(errstr, errstr_size, "Ignoring duplicate plugin %s", + path); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", "Loading plugin \"%s\"", path); + + /* Attempt to load library */ + if (!(handle = rd_dl_open(path, errstr, errstr_size))) { + rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", + "Failed to load plugin \"%s\": %s", path, errstr); + return RD_KAFKA_RESP_ERR__FS; + } + + /* Find conf_init() function */ + if (!(conf_init = + rd_dl_sym(handle, "conf_init", errstr, errstr_size))) { + rd_dl_close(handle); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + /* Call conf_init() */ + rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT", + "Calling plugin \"%s\" conf_init()", path); + + if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) { + rd_dl_close(handle); + return err; + } + + rkplug = rd_calloc(1, sizeof(*rkplug)); + rkplug->rkplug_path = rd_strdup(path); + rkplug->rkplug_handle = handle; + rkplug->rkplug_opaque = plug_opaque; + + rd_list_add(&conf->plugins, rkplug); + + rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", "Plugin \"%s\" loaded", path); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Free the plugin, any conf_destroy() interceptors will have been + * called prior to this call. + * @remark plugin is not removed from any list (caller's responsibility) + * @remark this relies on the actual library loader to refcount libraries, + * especially in the config copy case. + * This is true for POSIX dlopen() and Win32 LoadLibrary(). + * @locality application thread + */ +static void rd_kafka_plugin_destroy(rd_kafka_plugin_t *rkplug) { + rd_dl_close(rkplug->rkplug_handle); + rd_free(rkplug->rkplug_path); + rd_free(rkplug); +} + + + +/** + * @brief Initialize all configured plugins. + * + * @remark Any previously loaded plugins will be unloaded. + * + * @returns the error code of the first failing plugin. + * @locality application thread calling rd_kafka_new(). + */ +static rd_kafka_conf_res_t rd_kafka_plugins_conf_set0(rd_kafka_conf_t *conf, + const char *paths, + char *errstr, + size_t errstr_size) { + char *s; + + rd_list_destroy(&conf->plugins); + rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy); + + if (!paths || !*paths) + return RD_KAFKA_CONF_OK; + + /* Split paths by ; */ + rd_strdupa(&s, paths); + + rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", + "Loading plugins from conf object %p: \"%s\"", conf, + paths); + + while (s && *s) { + char *path = s; + char *t; + rd_kafka_resp_err_t err; + + if ((t = strchr(s, ';'))) { + *t = '\0'; + s = t + 1; + } else { + s = NULL; + } + + if ((err = rd_kafka_plugin_new(conf, path, errstr, + errstr_size))) { + /* Failed to load plugin */ + size_t elen = errstr_size > 0 ? strlen(errstr) : 0; + + /* See if there is room for appending the + * plugin path to the error message. */ + if (elen + strlen("(plugin )") + strlen(path) < + errstr_size) + rd_snprintf(errstr + elen, errstr_size - elen, + " (plugin %s)", path); + + rd_list_destroy(&conf->plugins); + return RD_KAFKA_CONF_INVALID; + } + } + + return RD_KAFKA_CONF_OK; +} + + +/** + * @brief Conf setter for "plugin.library.paths" + */ +rd_kafka_conf_res_t rd_kafka_plugins_conf_set(int scope, + void *pconf, + const char *name, + const char *value, + void *dstptr, + rd_kafka_conf_set_mode_t set_mode, + char *errstr, + size_t errstr_size) { + + assert(scope == _RK_GLOBAL); + return rd_kafka_plugins_conf_set0( + (rd_kafka_conf_t *)pconf, + set_mode == _RK_CONF_PROP_SET_DEL ? NULL : value, errstr, + errstr_size); +} |