summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/filter_throttle
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/filter_throttle
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/filter_throttle')
-rw-r--r--fluent-bit/plugins/filter_throttle/CMakeLists.txt6
-rw-r--r--fluent-bit/plugins/filter_throttle/throttle.c337
-rw-r--r--fluent-bit/plugins/filter_throttle/throttle.h56
-rw-r--r--fluent-bit/plugins/filter_throttle/window.c97
-rw-r--r--fluent-bit/plugins/filter_throttle/window.h37
5 files changed, 0 insertions, 533 deletions
diff --git a/fluent-bit/plugins/filter_throttle/CMakeLists.txt b/fluent-bit/plugins/filter_throttle/CMakeLists.txt
deleted file mode 100644
index adc7b8f4c..000000000
--- a/fluent-bit/plugins/filter_throttle/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-set(src
- window.c
- throttle.c
- )
-
-FLB_PLUGIN(filter_throttle "${src}" "")
diff --git a/fluent-bit/plugins/filter_throttle/throttle.c b/fluent-bit/plugins/filter_throttle/throttle.c
deleted file mode 100644
index 2a5ce29a4..000000000
--- a/fluent-bit/plugins/filter_throttle/throttle.c
+++ /dev/null
@@ -1,337 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_filter.h>
-#include <fluent-bit/flb_filter_plugin.h>
-#include <fluent-bit/flb_mem.h>
-#include <fluent-bit/flb_str.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_log.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-#include <fluent-bit/flb_log_event_encoder.h>
-#include <msgpack.h>
-#include "stdlib.h"
-
-#include "throttle.h"
-#include "window.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-
-pthread_mutex_t throttle_mut;
-
-
-static bool apply_suffix (double *x, char suffix_char)
-{
- int multiplier;
-
- switch (suffix_char)
- {
- case 0:
- case 's':
- multiplier = 1;
- break;
- case 'm':
- multiplier = 60;
- break;
- case 'h':
- multiplier = 60 * 60;
- break;
- case 'd':
- multiplier = 60 * 60 * 24;
- break;
- default:
- return false;
- }
-
- *x *= multiplier;
-
- return true;
-}
-
-
-void *time_ticker(void *args)
-{
- struct flb_time ftm;
- long timestamp;
- struct flb_filter_throttle_ctx *ctx = args;
-
- while (1) {
- flb_time_get(&ftm);
- timestamp = flb_time_to_double(&ftm);
- pthread_mutex_lock(&throttle_mut);
- window_add(ctx->hash, timestamp, 0);
-
- ctx->hash->current_timestamp = timestamp;
-
- if (ctx->print_status) {
- flb_plg_info(ctx->ins,
- "%ld: limit is %0.2f per %s with window size of %i, "
- "current rate is: %i per interval",
- timestamp, ctx->max_rate, ctx->slide_interval,
- ctx->window_size,
- ctx->hash->total / ctx->hash->size);
- }
- pthread_mutex_unlock(&throttle_mut);
- /* sleep is a cancelable function */
- sleep(ctx->ticker_data.seconds);
- }
-}
-
-/* Given a msgpack record, do some filter action based on the defined rules */
-static inline int throttle_data(struct flb_filter_throttle_ctx *ctx)
-{
- if ((ctx->hash->total / (double) ctx->hash->size) >= ctx->max_rate) {
- return THROTTLE_RET_DROP;
- }
-
- window_add(ctx->hash, ctx->hash->current_timestamp, 1);
-
- return THROTTLE_RET_KEEP;
-}
-
-static int configure(struct flb_filter_throttle_ctx *ctx, struct flb_filter_instance *f_ins)
-{
- int ret;
-
- ret = flb_filter_config_map_set(f_ins, ctx);
- if (ret == -1) {
- flb_plg_error(f_ins, "unable to load configuration");
- return -1;
- }
- if (ctx->max_rate <= 1.0) {
- ctx->max_rate = strtod(THROTTLE_DEFAULT_RATE, NULL);
- }
- if (ctx->window_size <= 1) {
- ctx->window_size = strtoul(THROTTLE_DEFAULT_WINDOW, NULL, 10);
- }
-
- return 0;
-}
-
-static int parse_duration(struct flb_filter_throttle_ctx *ctx,
- const char *interval)
-{
- double seconds = 0.0;
- double s;
- char *p;
-
- s = strtod(interval, &p);
- if ( 0 >= s
- /* No extra chars after the number and an optional s,m,h,d char. */
- || (*p && *(p+1))
- /* Check any suffix char and update S based on the suffix. */
- || ! apply_suffix (&s, *p))
- {
- flb_plg_warn(ctx->ins,
- "invalid time interval %s falling back to default: 1 "
- "second",
- interval);
- }
-
- seconds += s;
- return seconds;
-}
-
-static int cb_throttle_init(struct flb_filter_instance *f_ins,
- struct flb_config *config,
- void *data)
-{
- int ret;
- struct flb_filter_throttle_ctx *ctx;
-
- pthread_mutex_init(&throttle_mut, NULL);
-
- /* Create context */
- ctx = flb_calloc(1, sizeof(struct flb_filter_throttle_ctx));
- if (!ctx) {
- flb_errno();
- return -1;
- }
- ctx->ins = f_ins;
-
- /* parse plugin configuration */
- ret = configure(ctx, f_ins);
- if (ret == -1) {
- flb_free(ctx);
- return -1;
- }
-
- /* Set our context */
- flb_filter_set_context(f_ins, ctx);
-
- ctx->hash = window_create(ctx->window_size);
-
- ctx->ticker_data.seconds = parse_duration(ctx, ctx->slide_interval);
- pthread_create(&ctx->ticker_data.thr, NULL, &time_ticker, ctx);
- return 0;
-}
-
-static int cb_throttle_filter(const void *data, size_t bytes,
- const char *tag, int tag_len,
- void **out_buf, size_t *out_size,
- struct flb_filter_instance *f_ins,
- struct flb_input_instance *i_ins,
- void *context,
- struct flb_config *config)
-{
- int ret;
- int old_size = 0;
- int new_size = 0;
- struct flb_log_event_encoder log_encoder;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
-
- (void) f_ins;
- (void) i_ins;
- (void) config;
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(f_ins,
- "Log event decoder initialization error : %d", ret);
-
- return FLB_FILTER_NOTOUCH;
- }
-
- ret = flb_log_event_encoder_init(&log_encoder,
- FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
- flb_plg_error(f_ins,
- "Log event encoder initialization error : %d", ret);
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- return FLB_FILTER_NOTOUCH;
- }
-
- /* Iterate each item array and apply rules */
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- old_size++;
- pthread_mutex_lock(&throttle_mut);
- ret = throttle_data(context);
- pthread_mutex_unlock(&throttle_mut);
-
- if (ret == THROTTLE_RET_KEEP) {
- ret = flb_log_event_encoder_emit_raw_record(
- &log_encoder,
- &((char *) data)[log_decoder.previous_offset],
- log_decoder.record_length);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- new_size++;
- }
- }
- else if (ret == THROTTLE_RET_DROP) {
- /* Do nothing */
- }
- }
-
- /* we keep everything ? */
- if (old_size == new_size) {
- /* Destroy the buffer to avoid more overhead */
- ret = FLB_FILTER_NOTOUCH;
- }
- else {
- *out_buf = log_encoder.output_buffer;
- *out_size = log_encoder.output_length;
-
- flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
-
- ret = FLB_FILTER_MODIFIED;
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
- flb_log_event_encoder_destroy(&log_encoder);
-
- return ret;
-}
-
-static int cb_throttle_exit(void *data, struct flb_config *config)
-{
- void *thr_res;
- struct flb_filter_throttle_ctx *ctx = data;
-
- int s = pthread_cancel(ctx->ticker_data.thr);
- if (s != 0) {
- flb_plg_error(ctx->ins, "Unable to cancel ticker. Leaking context to avoid memory corruption.");
- return 1;
- }
-
- s = pthread_join(ctx->ticker_data.thr, &thr_res);
- if (s != 0) {
- flb_plg_error(ctx->ins, "Unable to join ticker. Leaking context to avoid memory corruption.");
- return 1;
- }
-
- if (thr_res != PTHREAD_CANCELED) {
- flb_plg_error(ctx->ins, "Thread joined but was not canceled which is impossible.");
- }
-
- flb_free(ctx->hash->table);
- flb_free(ctx->hash);
- flb_free(ctx);
- return 0;
-}
-
-static struct flb_config_map config_map[] = {
- // rate
- // window
- // print_status
- // interval
- {
- FLB_CONFIG_MAP_DOUBLE, "rate", THROTTLE_DEFAULT_RATE,
- 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, max_rate),
- "Set throttle rate"
- },
- {
- FLB_CONFIG_MAP_INT, "window", THROTTLE_DEFAULT_WINDOW,
- 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, window_size),
- "Set throttle window"
- },
- {
- FLB_CONFIG_MAP_BOOL, "print_status", THROTTLE_DEFAULT_STATUS,
- 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, print_status),
- "Set whether or not to print status information"
- },
- {
- FLB_CONFIG_MAP_STR, "interval", THROTTLE_DEFAULT_INTERVAL,
- 0, FLB_TRUE, offsetof(struct flb_filter_throttle_ctx, slide_interval),
- "Set the slide interval"
- },
- /* EOF */
- {0}
-};
-
-struct flb_filter_plugin filter_throttle_plugin = {
- .name = "throttle",
- .description = "Throttle messages using sliding window algorithm",
- .cb_init = cb_throttle_init,
- .cb_filter = cb_throttle_filter,
- .cb_exit = cb_throttle_exit,
- .config_map = config_map,
- .flags = 0
-};
diff --git a/fluent-bit/plugins/filter_throttle/throttle.h b/fluent-bit/plugins/filter_throttle/throttle.h
deleted file mode 100644
index 30ca318c1..000000000
--- a/fluent-bit/plugins/filter_throttle/throttle.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit Throttling
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_FILTER_THROTTLE_H
-#define FLB_FILTER_THROTTLE_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_filter.h>
-#include <fluent-bit/flb_pthread.h>
-
-/* actions */
-#define THROTTLE_RET_KEEP 0
-#define THROTTLE_RET_DROP 1
-
-/* defaults */
-#define THROTTLE_DEFAULT_RATE "1"
-#define THROTTLE_DEFAULT_WINDOW "5"
-#define THROTTLE_DEFAULT_INTERVAL "1"
-#define THROTTLE_DEFAULT_STATUS "false"
-
-struct ticker {
- pthread_t thr;
- double seconds;
-};
-
-struct flb_filter_throttle_ctx {
- double max_rate;
- unsigned int window_size;
- const char *slide_interval;
- int print_status;
-
- /* internal */
- struct throttle_window *hash;
- struct flb_filter_instance *ins;
- struct ticker ticker_data;
-};
-
-
-
-#endif
diff --git a/fluent-bit/plugins/filter_throttle/window.c b/fluent-bit/plugins/filter_throttle/window.c
deleted file mode 100644
index 75fcb492e..000000000
--- a/fluent-bit/plugins/filter_throttle/window.c
+++ /dev/null
@@ -1,97 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_mem.h>
-#include <fluent-bit/flb_log.h>
-
-#include "window.h"
-#include "throttle.h"
-
-
-struct throttle_window *window_create(size_t size) {
- struct throttle_window *tw;
-
- if (size <= 0) {
- return NULL;
- }
-
- tw = flb_malloc(sizeof(struct throttle_window));
- if (!tw) {
- flb_errno();
- return NULL;
- }
-
- tw->size = size;
- tw->total = 0;
- tw->current_timestamp = 0;
- tw->max_index = -1;
- tw->table = flb_calloc(size, sizeof(struct throttle_pane));
- if (!tw->table) {
- flb_errno();
- flb_free(tw);
- return NULL;
- }
-
- return tw;
-}
-
-
-int window_get(struct throttle_window *tw, long timestamp) {
- int i;
- for (i=0; i< tw->size; i++ ) {
- if (tw->table[i].timestamp == timestamp) {
- return i;
- }
- }
- return NOT_FOUND;
-}
-
-
-int window_add(struct throttle_window *tw, long timestamp, int val) {
- int i, index, size;
- int sum = 0;
- tw->current_timestamp = timestamp;
-
- size = tw->size;
- index = window_get(tw, timestamp);
-
- if (index == NOT_FOUND) {
- if (size - 1 == tw->max_index) {
- /* window must be shifted */
- tw->max_index = -1;
- }
- tw->max_index += 1;
- tw->table[tw->max_index].timestamp= timestamp;
- tw->table[tw->max_index].counter = val;
- } else {
- tw->table[index].counter += val;
- }
-
- for (i=0; i < tw->size; i++ ) {
- sum += tw->table[i].counter;
- flb_debug("timestamp: %ld, value: %ld",
- tw->table[i].timestamp, tw->table[i].counter);
- }
- tw->total = sum;
- flb_debug("total: %i", tw->total);
- return 0;
-}
diff --git a/fluent-bit/plugins/filter_throttle/window.h b/fluent-bit/plugins/filter_throttle/window.h
deleted file mode 100644
index c7f392e07..000000000
--- a/fluent-bit/plugins/filter_throttle/window.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define NOT_FOUND -1
-
-struct throttle_pane {
- long timestamp;
- long counter;
-};
-
-struct throttle_window {
- long current_timestamp;
- unsigned size;
- unsigned total;
- pthread_mutex_t result_mutex;
- int max_index;
- struct throttle_pane *table;
-};
-
-struct throttle_window *window_create(size_t size);
-int window_add(struct throttle_window *tw, long timestamp, int val);