summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_router.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_router.c
parentInitial commit. (diff)
downloadnetdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz
netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_router.c')
-rw-r--r--fluent-bit/src/flb_router.c271
1 files changed, 271 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_router.c b/fluent-bit/src/flb_router.c
new file mode 100644
index 00000000..551e9c33
--- /dev/null
+++ b/fluent-bit/src/flb_router.c
@@ -0,0 +1,271 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_chunk.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_router.h>
+
+#ifdef FLB_HAVE_REGEX
+#include <onigmo.h>
+#endif
+
+#include <string.h>
+
+/* wildcard support */
+/* tag and match should be null terminated. */
+static inline int router_match(const char *tag, int tag_len,
+ const char *match,
+ void *match_r)
+{
+ int ret = FLB_FALSE;
+ char *pos = NULL;
+
+#ifdef FLB_HAVE_REGEX
+ struct flb_regex *match_regex = match_r;
+ int n;
+ if (match_regex) {
+ n = onig_match(match_regex->regex,
+ (const unsigned char *) tag,
+ (const unsigned char *) tag + tag_len,
+ (const unsigned char *) tag, 0,
+ ONIG_OPTION_NONE);
+ if (n > 0) {
+ return 1;
+ }
+ }
+#else
+ (void) match_r;
+#endif
+
+ while (match) {
+ if (*match == '*') {
+ while (*++match == '*'){
+ /* skip successive '*' */
+ }
+ if (*match == '\0') {
+ /* '*' is last of string */
+ ret = 1;
+ break;
+ }
+
+ while ((pos = strchr(tag, (int) *match))) {
+#ifndef FLB_HAVE_REGEX
+ if (router_match(pos, tag_len, match, NULL)) {
+#else
+ /* We don't need to pass the regex recursively,
+ * we matched in order above
+ */
+ if (router_match(pos, tag_len, match, NULL)) {
+#endif
+ ret = 1;
+ break;
+ }
+ tag = pos+1;
+ }
+ break;
+ }
+ else if (*tag != *match) {
+ /* mismatch! */
+ break;
+ }
+ else if (*tag == '\0') {
+ /* end of tag. so matched! */
+ ret = 1;
+ break;
+ }
+ tag++;
+ match++;
+ }
+
+ return ret;
+}
+
+int flb_router_match(const char *tag, int tag_len, const char *match,
+ void *match_regex)
+{
+ int ret;
+ flb_sds_t t;
+
+ if (tag[tag_len] != '\0') {
+ t = flb_sds_create_len(tag, tag_len);
+ if (!t) {
+ return FLB_FALSE;
+ }
+
+ ret = router_match(t, tag_len, match, match_regex);
+ flb_sds_destroy(t);
+ }
+ else {
+ ret = router_match(tag, tag_len, match, match_regex);
+ }
+
+ return ret;
+}
+
+/* Associate and input and output instances due to a previous match */
+int flb_router_connect(struct flb_input_instance *in,
+ struct flb_output_instance *out)
+{
+ struct flb_router_path *p;
+
+ p = flb_malloc(sizeof(struct flb_router_path));
+ if (!p) {
+ flb_errno();
+ return -1;
+ }
+
+ p->ins = out;
+ mk_list_add(&p->_head, &in->routes);
+
+ return 0;
+}
+
+int flb_router_connect_direct(struct flb_input_instance *in,
+ struct flb_output_instance *out)
+{
+ struct flb_router_path *p;
+
+ p = flb_malloc(sizeof(struct flb_router_path));
+ if (!p) {
+ flb_errno();
+ return -1;
+ }
+
+ p->ins = out;
+ mk_list_add(&p->_head, &in->routes_direct);
+
+ return 0;
+}
+
+/*
+ * This routine defines static routes for the plugins that have registered
+ * tags. It check where data should go before the service start running, each
+ * input 'instance' plugin will contain a list of destinations.
+ */
+int flb_router_io_set(struct flb_config *config)
+{
+ int in_count = 0;
+ int out_count = 0;
+ struct mk_list *i_head;
+ struct mk_list *o_head;
+ struct flb_input_instance *i_ins;
+ struct flb_output_instance *o_ins;
+
+ /* Quick setup for 1:1 */
+ in_count = mk_list_size(&config->inputs);
+ out_count = mk_list_size(&config->outputs);
+
+ /* Mostly used for command line tests */
+ if (in_count == 1 && out_count == 1) {
+ i_ins = mk_list_entry_first(&config->inputs, struct flb_input_instance, _head);
+ o_ins = mk_list_entry_first(&config->outputs, struct flb_output_instance, _head);
+
+ if (!o_ins->match
+#ifdef FLB_HAVE_REGEX
+ && !o_ins->match_regex
+#endif
+ ) {
+
+ o_ins->match = flb_sds_create_len("*", 1);
+ }
+ flb_router_connect(i_ins, o_ins);
+ return 0;
+ }
+
+ /* N:M case, iterate all input instances */
+ mk_list_foreach(i_head, &config->inputs) {
+ i_ins = mk_list_entry(i_head, struct flb_input_instance, _head);
+ if (!i_ins->p) {
+ continue;
+ }
+
+ if (!i_ins->tag) {
+ flb_warn("[router] NO tag for %s input instance",
+ i_ins->name);
+ continue;
+ }
+
+
+ flb_trace("[router] input=%s tag=%s", i_ins->name, i_ins->tag);
+
+ /* Try to find a match with output instances */
+ mk_list_foreach(o_head, &config->outputs) {
+ o_ins = mk_list_entry(o_head, struct flb_output_instance, _head);
+ if (!o_ins->match
+#ifdef FLB_HAVE_REGEX
+ && !o_ins->match_regex
+#endif
+ ) {
+ flb_warn("[router] NO match for %s output instance",
+ o_ins->name);
+ continue;
+ }
+
+ if (flb_router_match(i_ins->tag, i_ins->tag_len, o_ins->match
+#ifdef FLB_HAVE_REGEX
+ , o_ins->match_regex
+#else
+ , NULL
+#endif
+ )) {
+
+ flb_debug("[router] match rule %s:%s",
+ i_ins->name, o_ins->name);
+ flb_router_connect(i_ins, o_ins);
+ }
+ }
+ }
+
+ return 0;
+}
+
+void flb_router_exit(struct flb_config *config)
+{
+ struct mk_list *tmp;
+ struct mk_list *r_tmp;
+ struct mk_list *head;
+ struct mk_list *r_head;
+ struct flb_input_instance *in;
+ struct flb_router_path *r;
+
+ /* Iterate input plugins */
+ mk_list_foreach_safe(head, tmp, &config->inputs) {
+ in = mk_list_entry(head, struct flb_input_instance, _head);
+
+ /* Iterate instance routes */
+ mk_list_foreach_safe(r_head, r_tmp, &in->routes) {
+ r = mk_list_entry(r_head, struct flb_router_path, _head);
+ mk_list_del(&r->_head);
+ flb_free(r);
+ }
+
+ /* Iterate instance routes direct */
+ mk_list_foreach_safe(r_head, r_tmp, &in->routes_direct) {
+ r = mk_list_entry(r_head, struct flb_router_path, _head);
+ mk_list_del(&r->_head);
+ flb_free(r);
+ }
+ }
+}