diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c | 213 |
1 files changed, 0 insertions, 213 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c deleted file mode 100644 index f58bc506..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_plugin.c +++ /dev/null @@ -1,213 +0,0 @@ -/* - * librdkafka - The Apache Kafka C/C++ library - * - * Copyright (c) 2017 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "rdkafka_int.h" -#include "rdkafka_plugin.h" -#include "rddl.h" - - -typedef struct rd_kafka_plugin_s { - char *rkplug_path; /* Library path */ - rd_kafka_t *rkplug_rk; /* Backpointer to the rk handle */ - void *rkplug_handle; /* dlopen (or similar) handle */ - void *rkplug_opaque; /* Plugin's opaque */ - -} rd_kafka_plugin_t; - - -/** - * @brief Plugin path comparator - */ -static int rd_kafka_plugin_cmp(const void *_a, const void *_b) { - const rd_kafka_plugin_t *a = _a, *b = _b; - - return strcmp(a->rkplug_path, b->rkplug_path); -} - - -/** - * @brief Add plugin (by library path) and calls its conf_init() constructor - * - * @returns an error code on error. - * @remark duplicate plugins are silently ignored. - * - * @remark Libraries are refcounted and thus not unloaded until all - * plugins referencing the library have been destroyed. - * (dlopen() and LoadLibrary() does this for us) - */ -static rd_kafka_resp_err_t rd_kafka_plugin_new(rd_kafka_conf_t *conf, - const char *path, - char *errstr, - size_t errstr_size) { - rd_kafka_plugin_t *rkplug; - const rd_kafka_plugin_t skel = {.rkplug_path = (char *)path}; - rd_kafka_plugin_f_conf_init_t *conf_init; - rd_kafka_resp_err_t err; - void *handle; - void *plug_opaque = NULL; - - /* Avoid duplicates */ - if (rd_list_find(&conf->plugins, &skel, rd_kafka_plugin_cmp)) { - rd_snprintf(errstr, errstr_size, "Ignoring duplicate plugin %s", - path); - return RD_KAFKA_RESP_ERR_NO_ERROR; - } - - rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", "Loading plugin \"%s\"", path); - - /* Attempt to load library */ - if (!(handle = rd_dl_open(path, errstr, errstr_size))) { - rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", - "Failed to load plugin \"%s\": %s", path, errstr); - return RD_KAFKA_RESP_ERR__FS; - } - - /* Find conf_init() function */ - if (!(conf_init = - rd_dl_sym(handle, "conf_init", errstr, errstr_size))) { - rd_dl_close(handle); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - /* Call conf_init() */ - rd_kafka_dbg0(conf, PLUGIN, "PLUGINIT", - "Calling plugin \"%s\" conf_init()", path); - - if ((err = conf_init(conf, &plug_opaque, errstr, errstr_size))) { - rd_dl_close(handle); - return err; - } - - rkplug = rd_calloc(1, sizeof(*rkplug)); - rkplug->rkplug_path = rd_strdup(path); - rkplug->rkplug_handle = handle; - rkplug->rkplug_opaque = plug_opaque; - - rd_list_add(&conf->plugins, rkplug); - - rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", "Plugin \"%s\" loaded", path); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Free the plugin, any conf_destroy() interceptors will have been - * called prior to this call. - * @remark plugin is not removed from any list (caller's responsibility) - * @remark this relies on the actual library loader to refcount libraries, - * especially in the config copy case. - * This is true for POSIX dlopen() and Win32 LoadLibrary(). - * @locality application thread - */ -static void rd_kafka_plugin_destroy(rd_kafka_plugin_t *rkplug) { - rd_dl_close(rkplug->rkplug_handle); - rd_free(rkplug->rkplug_path); - rd_free(rkplug); -} - - - -/** - * @brief Initialize all configured plugins. - * - * @remark Any previously loaded plugins will be unloaded. - * - * @returns the error code of the first failing plugin. - * @locality application thread calling rd_kafka_new(). - */ -static rd_kafka_conf_res_t rd_kafka_plugins_conf_set0(rd_kafka_conf_t *conf, - const char *paths, - char *errstr, - size_t errstr_size) { - char *s; - - rd_list_destroy(&conf->plugins); - rd_list_init(&conf->plugins, 0, (void *)&rd_kafka_plugin_destroy); - - if (!paths || !*paths) - return RD_KAFKA_CONF_OK; - - /* Split paths by ; */ - rd_strdupa(&s, paths); - - rd_kafka_dbg0(conf, PLUGIN, "PLUGLOAD", - "Loading plugins from conf object %p: \"%s\"", conf, - paths); - - while (s && *s) { - char *path = s; - char *t; - rd_kafka_resp_err_t err; - - if ((t = strchr(s, ';'))) { - *t = '\0'; - s = t + 1; - } else { - s = NULL; - } - - if ((err = rd_kafka_plugin_new(conf, path, errstr, - errstr_size))) { - /* Failed to load plugin */ - size_t elen = errstr_size > 0 ? strlen(errstr) : 0; - - /* See if there is room for appending the - * plugin path to the error message. */ - if (elen + strlen("(plugin )") + strlen(path) < - errstr_size) - rd_snprintf(errstr + elen, errstr_size - elen, - " (plugin %s)", path); - - rd_list_destroy(&conf->plugins); - return RD_KAFKA_CONF_INVALID; - } - } - - return RD_KAFKA_CONF_OK; -} - - -/** - * @brief Conf setter for "plugin.library.paths" - */ -rd_kafka_conf_res_t rd_kafka_plugins_conf_set(int scope, - void *pconf, - const char *name, - const char *value, - void *dstptr, - rd_kafka_conf_set_mode_t set_mode, - char *errstr, - size_t errstr_size) { - - assert(scope == _RK_GLOBAL); - return rd_kafka_plugins_conf_set0( - (rd_kafka_conf_t *)pconf, - set_mode == _RK_CONF_PROP_SET_DEL ? NULL : value, errstr, - errstr_size); -} |