summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_log_to_metrics/log_to_metrics.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_log_to_metrics/log_to_metrics.c')
-rw-r--r--src/fluent-bit/plugins/filter_log_to_metrics/log_to_metrics.c965
1 files changed, 965 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_log_to_metrics/log_to_metrics.c b/src/fluent-bit/plugins/filter_log_to_metrics/log_to_metrics.c
new file mode 100644
index 000000000..a61e4827f
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_log_to_metrics/log_to_metrics.c
@@ -0,0 +1,965 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "log_to_metrics.h"
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_utils.h>
+#include <cmetrics/cmetrics.h>
+#include <cmetrics/cmt_gauge.h>
+#include <cmetrics/cmt_counter.h>
+#include <cmetrics/cmt_histogram.h>
+#include <msgpack.h>
+#include <stdio.h>
+#include <sys/types.h>
+
+
+static char kubernetes_label_keys[NUMBER_OF_KUBERNETES_LABELS][16] =
+ { "namespace_name",
+ "pod_name",
+ "container_name",
+ "docker_id",
+ "pod_id"
+ };
+
+static void delete_rules(struct log_to_metrics_ctx *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct grep_rule *rule;
+
+ mk_list_foreach_safe(head, tmp, &ctx->rules) {
+ rule = mk_list_entry(head, struct grep_rule, _head);
+ flb_sds_destroy(rule->field);
+ flb_free(rule->regex_pattern);
+ flb_ra_destroy(rule->ra);
+ flb_regex_destroy(rule->regex);
+ mk_list_del(&rule->_head);
+ flb_free(rule);
+ }
+}
+
+static int log_to_metrics_destroy(struct log_to_metrics_ctx *ctx)
+{
+ int i;
+ if (!ctx) {
+ return 0;
+ }
+ if(ctx->histogram_buckets){
+ cmt_histogram_buckets_destroy(ctx->histogram_buckets);
+ }
+
+ if (ctx->cmt) {
+ cmt_destroy(ctx->cmt);
+ }
+
+ delete_rules(ctx);
+
+ if (ctx->label_accessors != NULL) {
+ for (i = 0; i < MAX_LABEL_COUNT; i++) {
+ flb_free(ctx->label_accessors[i]);
+ }
+ flb_free(ctx->label_accessors);
+ }
+ if (ctx->label_keys != NULL) {
+ for (i = 0; i < MAX_LABEL_COUNT; i++) {
+ flb_free(ctx->label_keys[i]);
+ }
+ flb_free(ctx->label_keys);
+ }
+ flb_free(ctx->buckets);
+ flb_free(ctx->bucket_counter);
+ flb_free(ctx->label_counter);
+ flb_free(ctx);
+ return 0;
+}
+
+static int set_rules(struct log_to_metrics_ctx *ctx,
+ struct flb_filter_instance *f_ins)
+{
+ flb_sds_t tmp;
+ struct mk_list *head;
+ struct mk_list *split;
+ struct flb_split_entry *sentry;
+ struct flb_kv *kv;
+ struct grep_rule *rule;
+
+ /* Iterate all filter properties */
+ mk_list_foreach(head, &f_ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ /* Create a new rule */
+ rule = flb_malloc(sizeof(struct grep_rule));
+ if (!rule) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Get the type */
+ if (strcasecmp(kv->key, "regex") == 0) {
+ rule->type = GREP_REGEX;
+ }
+ else if (strcasecmp(kv->key, "exclude") == 0) {
+ rule->type = GREP_EXCLUDE;
+ }
+ else {
+ flb_free(rule);
+ continue;
+ }
+
+ /* As a value we expect a pair of field name and a regular expression */
+ split = flb_utils_split(kv->val, ' ', 1);
+ if (mk_list_size(split) != 2) {
+ flb_plg_error(ctx->ins,
+ "invalid regex, expected field and regular expression");
+ delete_rules(ctx);
+ flb_free(rule);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ /* Get first value (field) */
+ sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
+ if (*sentry->value == '$') {
+ rule->field = flb_sds_create_len(sentry->value, sentry->len);
+ }
+ else {
+ rule->field = flb_sds_create_size(sentry->len + 2);
+ tmp = flb_sds_cat(rule->field, "$", 1);
+ rule->field = tmp;
+
+ tmp = flb_sds_cat(rule->field, sentry->value, sentry->len);
+ rule->field = tmp;
+ }
+
+ /* Get remaining content (regular expression) */
+ sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
+ rule->regex_pattern = flb_strndup(sentry->value, sentry->len);
+ if (rule->regex_pattern == NULL) {
+ flb_errno();
+ delete_rules(ctx);
+ flb_free(rule);
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ /* Release split */
+ flb_utils_split_free(split);
+
+ /* Create a record accessor context for this rule */
+ rule->ra = flb_ra_create(rule->field, FLB_FALSE);
+ if (!rule->ra) {
+ flb_plg_error(ctx->ins, "invalid record accessor? '%s'",
+ rule->field);
+ delete_rules(ctx);
+ flb_free(rule);
+ return -1;
+ }
+
+ /* Convert string to regex pattern */
+ rule->regex = flb_regex_create(rule->regex_pattern);
+ if (!rule->regex) {
+ flb_plg_error(ctx->ins, "could not compile regex pattern '%s'",
+ rule->regex_pattern);
+ delete_rules(ctx);
+ flb_free(rule);
+ return -1;
+ }
+
+ /* Link to parent list */
+ mk_list_add(&rule->_head, &ctx->rules);
+ }
+
+ return 0;
+}
+
+/* Given a msgpack record, do some filter action based on the defined rules */
+static inline int grep_filter_data(msgpack_object map,
+ struct log_to_metrics_ctx *ctx)
+{
+ ssize_t ret;
+ struct mk_list *head;
+ struct grep_rule *rule;
+
+ /* For each rule, validate against map fields */
+ mk_list_foreach(head, &ctx->rules) {
+ rule = mk_list_entry(head, struct grep_rule, _head);
+
+ ret = flb_ra_regex_match(rule->ra, map, rule->regex, NULL);
+ if (ret <= 0) { /* no match */
+ if (rule->type == GREP_REGEX) {
+ return GREP_RET_EXCLUDE;
+ }
+ }
+ else {
+ if (rule->type == GREP_EXCLUDE) {
+ return GREP_RET_EXCLUDE;
+ }
+ else {
+ return GREP_RET_KEEP;
+ }
+ }
+ }
+
+ return GREP_RET_KEEP;
+}
+
+static int set_labels(struct log_to_metrics_ctx *ctx,
+ char **label_accessors,
+ char **label_keys,
+ int *label_counter,
+ struct flb_filter_instance *f_ins)
+{
+
+ struct mk_list *head;
+ struct mk_list *split;
+ flb_sds_t tmp;
+ struct flb_kv *kv;
+ struct flb_split_entry *sentry;
+ int counter = 0;
+ int i;
+ if (MAX_LABEL_COUNT < NUMBER_OF_KUBERNETES_LABELS){
+ flb_errno();
+ return -1;
+ }
+ if (ctx->kubernetes_mode){
+ for (i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){
+ snprintf(label_keys[i], MAX_LABEL_LENGTH - 1, "%s",
+ kubernetes_label_keys[i]);
+ }
+ counter = NUMBER_OF_KUBERNETES_LABELS;
+ }
+
+ /* Iterate all filter properties */
+ mk_list_foreach(head, &f_ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ if (counter >= MAX_LABEL_COUNT) {
+ return MAX_LABEL_COUNT;
+ }
+
+ if (strcasecmp(kv->key, "label_field") == 0) {
+ snprintf(label_accessors[counter], MAX_LABEL_LENGTH - 1, "%s", kv->val);
+ snprintf(label_keys[counter], MAX_LABEL_LENGTH - 1, "%s", kv->val);
+ counter++;
+ }
+ else if (strcasecmp(kv->key, "add_label") == 0) {
+ split = flb_utils_split(kv->val, ' ', 1);
+ if (mk_list_size(split) != 2) {
+ flb_plg_error(ctx->ins, "invalid label, expected name and key");
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
+ tmp = flb_sds_create_len(sentry->value, sentry->len);
+ snprintf(label_keys[counter], MAX_LABEL_LENGTH - 1, "%s", tmp);
+ flb_sds_destroy(tmp);
+
+ sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
+ tmp = flb_sds_create_len(sentry->value, sentry->len);
+ snprintf(label_accessors[counter], MAX_LABEL_LENGTH - 1, "%s", tmp);
+ flb_sds_destroy(tmp);
+ counter++;
+
+ flb_utils_split_free(split);
+ }
+ else {
+ continue;
+ }
+ }
+ *label_counter = counter;
+ return counter;
+}
+
+static int convert_double(char *str, double *value)
+{
+ char *endptr = str;
+ int valid = 1;
+ int i = 0;
+ /* input validation */
+ for (i = 0; str[i] != '\0'; i++) {
+ if (!(str[i]>='0') && !(str[i] <= '9') && str[i] != '.'
+ && str[i] != '-' && str[i] != '+') {
+ valid = 0;
+ break;
+ }
+ }
+ /* convert to double */
+ if (valid) {
+ *value = strtod(str, &endptr);
+ if (str == endptr) {
+ valid = 0;
+ }
+ }
+ return valid;
+}
+
+static void sort_doubles_ascending(double *arr, int size)
+{
+ int i, j;
+ double tmp;
+
+ for (i = 0; i < size - 1; i++) {
+ for (j = 0; j < size - i - 1; j++) {
+ if (arr[j] > arr[j + 1]) {
+ tmp = arr[j];
+ arr[j] = arr[j + 1];
+ arr[j + 1] = tmp;
+ }
+ }
+ }
+}
+static int set_buckets(struct log_to_metrics_ctx *ctx,
+ struct flb_filter_instance *f_ins)
+{
+
+ struct mk_list *head;
+ struct flb_kv *kv;
+ double parsed_double = 0.0;
+ int counter = 0;
+ int valid = 1;
+
+ /* Iterate filter properties to get count of buckets to allocate memory */
+ mk_list_foreach(head, &f_ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ if (strcasecmp(kv->key, "bucket") != 0) {
+ continue;
+ }
+ counter++;
+ }
+ /* Allocate the memory for buckets */
+ ctx->buckets = (double *) flb_malloc(counter * sizeof(double));
+ /* Set the buckets */
+ counter = 0;
+ mk_list_foreach(head, &f_ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ if (strcasecmp(kv->key, "bucket") != 0) {
+ continue;
+ }
+ valid = convert_double(kv->val, &parsed_double);
+ if(!valid){
+ flb_error("Error during conversion");
+ return -1;
+ }
+ else{
+ ctx->buckets[counter++] = parsed_double;
+ }
+ }
+ *ctx->bucket_counter = counter;
+ sort_doubles_ascending(ctx->buckets, counter);
+ return 0;
+}
+
+static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
+ char kubernetes_label_values
+ [NUMBER_OF_KUBERNETES_LABELS][MAX_LABEL_LENGTH],
+ char **label_accessors, int label_counter, msgpack_object map)
+{
+ int label_iterator_start = 0;
+ int i;
+ struct flb_record_accessor *ra = NULL;
+ struct flb_ra_value *rval = NULL;
+
+ if (label_counter == 0 && !ctx->kubernetes_mode){
+ return 0;
+ }
+ if (MAX_LABEL_COUNT < NUMBER_OF_KUBERNETES_LABELS){
+ flb_errno();
+ return -1;
+ }
+ if (ctx->kubernetes_mode){
+ for (i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){
+ if (kubernetes_label_keys[i] == NULL){
+ return -1;
+ }
+ snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s",
+ kubernetes_label_values[i]);
+ }
+ label_iterator_start = NUMBER_OF_KUBERNETES_LABELS;
+ }
+
+ for (i = label_iterator_start; i < label_counter; i++){
+ ra = flb_ra_create(label_accessors[i], FLB_TRUE);
+ if (!ra) {
+ flb_warn("invalid record accessor key, aborting");
+ break;
+ }
+
+ rval = flb_ra_get_value_object(ra, map);
+ if (!rval) {
+ /* Set value to empty string, so the value will be dropped in Cmetrics*/
+ label_values[i][0] = '\0';
+ } else if (rval->type == FLB_RA_STRING) {
+ snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%s",
+ rval->val.string);
+ }
+ else if (rval->type == FLB_RA_FLOAT) {
+ snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%f",
+ rval->val.f64);
+ }
+ else if (rval->type == FLB_RA_INT) {
+ snprintf(label_values[i], MAX_LABEL_LENGTH - 1, "%ld",
+ (long)rval->val.i64);
+ }
+ else {
+ flb_warn("cannot convert given value to metric");
+ break;
+ }
+ if (rval){
+ flb_ra_key_value_destroy(rval);
+ rval = NULL;
+ }
+ if (ra){
+ flb_ra_destroy(ra);
+ ra = NULL;
+ }
+ }
+ return label_counter;
+}
+
+static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ struct log_to_metrics_ctx *ctx;
+ flb_sds_t tmp;
+ char metric_description[MAX_METRIC_LENGTH];
+ char metric_name[MAX_METRIC_LENGTH];
+ char value_field[MAX_METRIC_LENGTH];
+ struct flb_input_instance *input_ins;
+ int label_count;
+ int i;
+ /* Create context */
+ ctx = flb_malloc(sizeof(struct log_to_metrics_ctx));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ if (flb_filter_config_map_set(f_ins, ctx) < 0) {
+ flb_errno();
+ flb_plg_error(f_ins, "configuration error");
+ flb_free(ctx);
+ return -1;
+ }
+ mk_list_init(&ctx->rules);
+
+ ctx->ins = f_ins;
+
+ /* Load rules */
+ ret = set_rules(ctx, f_ins);
+ if (ret == -1) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Set the context */
+ flb_filter_set_context(f_ins, ctx);
+
+ /* Set buckets for histogram */
+ ctx->bucket_counter = NULL;
+ ctx->histogram_buckets = NULL;
+ ctx->buckets = NULL;
+ ctx->bucket_counter = flb_malloc(sizeof(int));
+ if(set_buckets(ctx, f_ins) != 0)
+ {
+ flb_plg_error(f_ins, "Setting buckets failed");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ ctx->label_accessors = NULL;
+ ctx->label_accessors = (char **) flb_malloc(MAX_LABEL_COUNT * sizeof(char *));
+ for (i = 0; i < MAX_LABEL_COUNT; i++) {
+ ctx->label_accessors[i] = flb_malloc(MAX_LABEL_LENGTH * sizeof(char));
+ }
+ /* Set label keys */
+ ctx->label_keys = NULL;
+ ctx->label_keys = (char **) flb_malloc(MAX_LABEL_COUNT * sizeof(char *));
+ for (i = 0; i < MAX_LABEL_COUNT; i++) {
+ ctx->label_keys[i] = flb_malloc(MAX_LABEL_LENGTH * sizeof(char));
+ }
+ ctx->label_counter = NULL;
+ ctx->label_counter = flb_malloc(sizeof(int));
+ label_count = set_labels(ctx, ctx->label_accessors, ctx->label_keys, ctx->label_counter, f_ins);
+ if (label_count < 0){
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ /* Check metric tag */
+ if (ctx->tag == NULL || strlen(ctx->tag) == 0) {
+ flb_plg_error(f_ins, "Metric tag is not set");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ /* Check property metric mode */
+ ctx->mode = 0;
+ tmp = (char *)flb_filter_get_property("metric_mode", f_ins);
+ if (tmp != NULL) {
+ if (strcasecmp(tmp, FLB_LOG_TO_METRICS_COUNTER_STR) == 0) {
+ ctx->mode = FLB_LOG_TO_METRICS_COUNTER;
+ }
+ else if (strcasecmp(tmp, FLB_LOG_TO_METRICS_GAUGE_STR) == 0) {
+ ctx->mode = FLB_LOG_TO_METRICS_GAUGE;
+ }
+ else if (strcasecmp(tmp, FLB_LOG_TO_METRICS_HISTOGRAM_STR) == 0) {
+ ctx->mode = FLB_LOG_TO_METRICS_HISTOGRAM;
+ }
+ else {
+ flb_plg_error(f_ins,
+ "invalid 'mode' value. Only "
+ "'counter', 'gauge' or "
+ "'histogram' types are allowed");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+ }
+ else {
+ flb_plg_error(f_ins, "configuration property not set");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ /* Check property metric name */
+ if (ctx->metric_name == NULL || strlen(ctx->metric_name) == 0) {
+ flb_plg_error(f_ins, "metric_name is not set");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+ snprintf(metric_name, sizeof(metric_name) - 1, "%s", ctx->metric_name);
+
+ /* Check property metric description */
+ if (ctx->metric_description == NULL ||
+ strlen(ctx->metric_description) == 0) {
+ flb_plg_error(f_ins, "metric_description is not set");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+ snprintf(metric_description, sizeof(metric_description) - 1, "%s",
+ ctx->metric_description);
+
+ /* Value field only needed for modes gauge and histogram */
+ if (ctx->mode > 0) {
+ if (ctx->value_field == NULL || strlen(ctx->value_field) == 0) {
+ flb_plg_error(f_ins, "value_field is not set");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+ snprintf(value_field, sizeof(value_field) - 1, "%s",
+ ctx->value_field);
+ }
+
+
+ /* Check if buckets are defined for histogram, if not assume defaults */
+ if (ctx->mode == FLB_LOG_TO_METRICS_HISTOGRAM ){
+ if (ctx->bucket_counter == 0){
+ flb_plg_error(f_ins,
+ "buckets are not set for histogram."
+ "Will use defaults: 0.005, 0.01, 0.025, "
+ "0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0");
+ ctx->histogram_buckets = cmt_histogram_buckets_default_create();
+ }
+ else{
+ ctx->histogram_buckets = cmt_histogram_buckets_create_size(
+ ctx->buckets, *ctx->bucket_counter);
+ }
+ }
+
+
+ /* create the metric */
+ ctx->cmt = NULL;
+ ctx->cmt = cmt_create();
+
+ /* Depending on mode create different types of cmetrics metrics */
+ switch (ctx->mode) {
+ case FLB_LOG_TO_METRICS_COUNTER:
+ ctx->c = cmt_counter_create(ctx->cmt, "log_metric", "counter",
+ metric_name, metric_description,
+ label_count, ctx->label_keys);
+ break;
+ case FLB_LOG_TO_METRICS_GAUGE:
+ ctx->g = cmt_gauge_create(ctx->cmt, "log_metric", "gauge",
+ metric_name, metric_description,
+ label_count, ctx->label_keys);
+ break;
+ case FLB_LOG_TO_METRICS_HISTOGRAM:
+ ctx->h = cmt_histogram_create(ctx->cmt, "log_metric", "histogram",
+ metric_name, metric_description,
+ ctx->histogram_buckets,
+ label_count, ctx->label_keys);
+ break;
+ default:
+ flb_plg_error(f_ins, "unsupported mode");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ input_ins = flb_input_new(config, "emitter", NULL, FLB_FALSE);
+ if (!input_ins) {
+ flb_plg_error(f_ins, "cannot create metrics emitter instance");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ /* Set the storage type for emitter */
+ ret = flb_input_set_property(input_ins, "storage.type", "memory");
+ if (ret == -1) {
+ flb_plg_error(f_ins, "cannot set storage type for emitter instance");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ /* Initialize emitter plugin */
+ ret = flb_input_instance_init(input_ins, config);
+ if (ret == -1) {
+ flb_errno();
+ flb_plg_error(f_ins, "cannot initialize metrics emitter instance.");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ ret = flb_storage_input_create(config->cio, input_ins);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot initialize storage for metrics stream");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+ ctx->input_ins = input_ins;
+
+ return 0;
+}
+
+static int cb_log_to_metrics_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_size,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins, void *context,
+ struct flb_config *config)
+{
+ int ret;
+ msgpack_unpacked result;
+ msgpack_object map;
+ msgpack_object root;
+ size_t off = 0;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ uint64_t ts;
+ struct log_to_metrics_ctx *ctx = context;
+ struct flb_ra_value *rval = NULL;
+ struct flb_record_accessor *ra = NULL;
+ char fmt[MAX_LABEL_LENGTH];
+ char **label_values = NULL;
+ int label_count = 0;
+ int i;
+ double gauge_value = 0;
+ double histogram_value = 0;
+ char kubernetes_label_values
+ [NUMBER_OF_KUBERNETES_LABELS][MAX_LABEL_LENGTH];
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ /* Iterate each item array and apply rules and generate metric values */
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, data, bytes, &off) ==
+ MSGPACK_UNPACK_SUCCESS) {
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_ARRAY) {
+ continue;
+ }
+
+ /* get time and map */
+ map = root.via.array.ptr[1];
+
+ ret = grep_filter_data(map, context);
+ if (ret == GREP_RET_KEEP) {
+ ts = cfl_time_now();
+ if(ctx->kubernetes_mode){
+ for(i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){
+ if (kubernetes_label_keys[i] == NULL){
+ flb_error("error during kubernetes label processing. "
+ "Skipping labels.");
+ ctx->label_counter = 0;
+ break;
+ }
+ snprintf(fmt, MAX_LABEL_LENGTH - 1, "$kubernetes['%s']",
+ kubernetes_label_keys[i]);
+ ra = flb_ra_create(fmt, FLB_TRUE);
+ if (!ra) {
+ flb_error("invalid record accessor key, aborting");
+ break;
+ }
+ rval = flb_ra_get_value_object(ra, map);
+ if (!rval) {
+ flb_error("given value field is empty or not "
+ "existent: %s. Skipping labels.", fmt);
+ ctx->label_counter = 0;
+ }
+ else if (rval->type != FLB_RA_STRING) {
+ flb_plg_error(f_ins,
+ "cannot access label %s", kubernetes_label_keys[i]);
+ break;
+ }
+ else {
+ snprintf(kubernetes_label_values[i],
+ MAX_LABEL_LENGTH - 1, "%s", rval->val.string);
+ }
+ if (rval){
+ flb_ra_key_value_destroy(rval);
+ rval = NULL;
+ }
+ if (ra){
+ flb_ra_destroy(ra);
+ ra = NULL;
+ }
+ }
+ }
+ if (ctx->label_counter > 0){
+ /* Fill optional labels */
+ label_values = flb_malloc(MAX_LABEL_COUNT * sizeof(char *));
+ for (i = 0; i < MAX_LABEL_COUNT; i++) {
+ label_values[i] = flb_malloc(MAX_LABEL_LENGTH *
+ sizeof(char));
+ }
+
+ label_count = fill_labels(ctx, label_values,
+ kubernetes_label_values, ctx->label_accessors,
+ *ctx->label_counter, map);
+ if (label_count != *ctx->label_counter){
+ label_count = 0;
+ }
+ }
+
+ /* Calculating and setting metric depending on the mode */
+ switch (ctx->mode) {
+ case FLB_LOG_TO_METRICS_COUNTER:
+ ret = cmt_counter_inc(ctx->c, ts, label_count,
+ label_values);
+ break;
+
+ case FLB_LOG_TO_METRICS_GAUGE:
+ ra = flb_ra_create(ctx->value_field, FLB_TRUE);
+ if (!ra) {
+ flb_error("invalid record accessor key, aborting");
+ break;
+ }
+
+ rval = flb_ra_get_value_object(ra, map);
+
+ if (!rval) {
+ flb_warn("given value field is empty or not existent");
+ break;
+ }
+ if (rval->type == FLB_RA_STRING) {
+ sscanf(rval->val.string, "%lf", &gauge_value);
+ }
+ else if (rval->type == FLB_RA_FLOAT) {
+ gauge_value = rval->val.f64;
+ }
+ else if (rval->type == FLB_RA_INT) {
+ gauge_value = (double)rval->val.i64;
+ }
+ else {
+ flb_plg_error(f_ins,
+ "cannot convert given value to metric");
+ break;
+ }
+
+ ret = cmt_gauge_set(ctx->g, ts, gauge_value,
+ label_count, label_values);
+ if (rval) {
+ flb_ra_key_value_destroy(rval);
+ rval = NULL;
+ }
+ if (ra) {
+ flb_ra_destroy(ra);
+ ra = NULL;
+ }
+ break;
+
+ case FLB_LOG_TO_METRICS_HISTOGRAM:
+ ra = flb_ra_create(ctx->value_field, FLB_TRUE);
+ if (!ra) {
+ flb_error("invalid record accessor key, aborting");
+ break;
+ }
+
+ rval = flb_ra_get_value_object(ra, map);
+
+ if (!rval) {
+ flb_warn("given value field is empty or not existent");
+ break;
+ }
+ if (rval->type == FLB_RA_STRING) {
+ sscanf(rval->val.string, "%lf", &histogram_value);
+ }
+ else if (rval->type == FLB_RA_FLOAT) {
+ histogram_value = rval->val.f64;
+ }
+ else if (rval->type == FLB_RA_INT) {
+ histogram_value = (double)rval->val.i64;
+ }
+ else {
+ flb_plg_error(f_ins,
+ "cannot convert given value to metric");
+ break;
+ }
+
+ ret = cmt_histogram_observe(ctx->h, ts, histogram_value,
+ label_count, label_values);
+ if (rval) {
+ flb_ra_key_value_destroy(rval);
+ rval = NULL;
+ }
+ if (ra) {
+ flb_ra_destroy(ra);
+ ra = NULL;
+ }
+ break;
+ default:
+ flb_plg_error(f_ins, "unsupported mode");
+ log_to_metrics_destroy(ctx);
+ return -1;
+ }
+
+ ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt);
+
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "could not append metrics");
+ }
+
+ /* Cleanup */
+ msgpack_unpacked_destroy(&result);
+ if (label_values != NULL){
+ for (i = 0; i < MAX_LABEL_COUNT; i++) {
+ if (label_values[i] != NULL){
+ flb_free(label_values[i]);
+ }
+ }
+ flb_free(label_values);
+ }
+ }
+ else if (ret == GREP_RET_EXCLUDE) {
+ /* Do nothing */
+ }
+ }
+ /* Cleanup */
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+
+ /* Do not modify message stream */
+ return FLB_FILTER_NOTOUCH;
+}
+
+static int cb_log_to_metrics_exit(void *data, struct flb_config *config)
+{
+ struct log_to_metrics_ctx *ctx = data;
+
+ return log_to_metrics_destroy(ctx);
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "regex", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Optional filter for records in which the content of KEY "
+ "matches the regular expression."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "exclude", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Optional filter for records in which the content of KEY "
+ "does not matches the regular expression."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "metric_mode", "counter",
+ FLB_FALSE, FLB_TRUE,
+ offsetof(struct log_to_metrics_ctx, mode),
+ "Mode selector. Values counter, gauge,"
+ " or histogram. Summary is not supported"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "value_field", NULL,
+ FLB_FALSE, FLB_TRUE,
+ offsetof(struct log_to_metrics_ctx, value_field),
+ "Numeric field to use for gauge or histogram"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "metric_name", NULL,
+ FLB_FALSE, FLB_TRUE,
+ offsetof(struct log_to_metrics_ctx, metric_name),
+ "Name of metric"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "metric_description", NULL,
+ FLB_FALSE, FLB_TRUE,
+ offsetof(struct log_to_metrics_ctx, metric_description),
+ "Help text for metric"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "kubernetes_mode", "false",
+ 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, kubernetes_mode),
+ "Enable kubernetes log metric fields"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "add_label", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Add a label to the metric by supporting record accessor pattern"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "label_field", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Specify message field that should be included in the metric"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "bucket", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Specify bucket for histogram metric"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tag", NULL,
+ FLB_FALSE, FLB_TRUE,
+ offsetof(struct log_to_metrics_ctx, tag),
+ "Metric Tag"
+ },
+ {0}
+};
+
+struct flb_filter_plugin filter_log_to_metrics_plugin = {
+ .name = "log_to_metrics",
+ .description = "generate log derived metrics",
+ .cb_init = cb_log_to_metrics_init,
+ .cb_filter = cb_log_to_metrics_filter,
+ .cb_exit = cb_log_to_metrics_exit,
+ .config_map = config_map,
+ .flags = 0};