diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_statsd/statsd.c')
-rw-r--r-- | src/fluent-bit/plugins/in_statsd/statsd.c | 386 |
1 files changed, 0 insertions, 386 deletions
diff --git a/src/fluent-bit/plugins/in_statsd/statsd.c b/src/fluent-bit/plugins/in_statsd/statsd.c deleted file mode 100644 index 0cccb679a..000000000 --- a/src/fluent-bit/plugins/in_statsd/statsd.c +++ /dev/null @@ -1,386 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_log_event_encoder.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_socket.h> -#include <fluent-bit/flb_pack.h> - -#define MAX_PACKET_SIZE 65536 -#define DEFAULT_LISTEN "0.0.0.0" -#define DEFAULT_PORT 8125 - -#define STATSD_TYPE_COUNTER 1 -#define STATSD_TYPE_GAUGE 2 -#define STATSD_TYPE_TIMER 3 -#define STATSD_TYPE_SET 4 - -struct flb_statsd { - char *buf; /* buffer */ - char listen[256]; /* listening address (RFC-2181) */ - char port[6]; /* listening port (RFC-793) */ - flb_sockfd_t server_fd; /* server socket */ - flb_pipefd_t coll_fd; /* server handler */ - struct flb_input_instance *ins; /* input instance */ - struct flb_log_event_encoder *log_encoder; -}; - -/* - * The "statsd_message" represents a single line in UDP packet. - * It's just a bunch of pointers to ephemeral buffer. - */ -struct statsd_message { - char *bucket; - int bucket_len; - char *value; - int value_len; - int type; - double sample_rate; -}; - -static int get_statsd_type(char *str) -{ - switch (*str) { - case 'g': - return STATSD_TYPE_GAUGE; - case 's': - return STATSD_TYPE_SET; - case 'c': - return STATSD_TYPE_COUNTER; - case 'm': - if (*(str + 1) == 's') { - return STATSD_TYPE_TIMER; - } - } - return STATSD_TYPE_COUNTER; -} - -static int is_incremental(char *str) -{ - return (*str == '+' || *str == '-'); -} - -static int statsd_process_message(struct flb_statsd *ctx, - struct statsd_message *m) -{ - int ret; - - ret = flb_log_event_encoder_begin_record(ctx->log_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - switch (m->type) { - case STATSD_TYPE_COUNTER: - ret = flb_log_event_encoder_append_body_values( - ctx->log_encoder, - - FLB_LOG_EVENT_CSTRING_VALUE("type"), - FLB_LOG_EVENT_CSTRING_VALUE("counter"), - FLB_LOG_EVENT_CSTRING_VALUE("bucket"), - FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), - FLB_LOG_EVENT_CSTRING_VALUE("value"), - FLB_LOG_EVENT_DOUBLE_VALUE(strtod(m->value, NULL)), - FLB_LOG_EVENT_CSTRING_VALUE("sample_rate"), - FLB_LOG_EVENT_DOUBLE_VALUE(m->sample_rate)); - - break; - case STATSD_TYPE_GAUGE: - ret = flb_log_event_encoder_append_body_values( - ctx->log_encoder, - - FLB_LOG_EVENT_CSTRING_VALUE("type"), - FLB_LOG_EVENT_CSTRING_VALUE("gauge"), - FLB_LOG_EVENT_CSTRING_VALUE("bucket"), - FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), - FLB_LOG_EVENT_CSTRING_VALUE("value"), - FLB_LOG_EVENT_DOUBLE_VALUE(strtod(m->value, NULL)), - FLB_LOG_EVENT_CSTRING_VALUE("incremental"), - FLB_LOG_EVENT_INT64_VALUE(is_incremental(m->value))); - break; - case STATSD_TYPE_TIMER: - ret = flb_log_event_encoder_append_body_values( - ctx->log_encoder, - - FLB_LOG_EVENT_CSTRING_VALUE("type"), - FLB_LOG_EVENT_CSTRING_VALUE("timer"), - FLB_LOG_EVENT_CSTRING_VALUE("bucket"), - FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), - FLB_LOG_EVENT_CSTRING_VALUE("value"), - FLB_LOG_EVENT_DOUBLE_VALUE(strtod(m->value, NULL)), - FLB_LOG_EVENT_CSTRING_VALUE("sample_rate"), - FLB_LOG_EVENT_DOUBLE_VALUE(m->sample_rate)); - - case STATSD_TYPE_SET: - ret = flb_log_event_encoder_append_body_values( - ctx->log_encoder, - - FLB_LOG_EVENT_CSTRING_VALUE("type"), - FLB_LOG_EVENT_CSTRING_VALUE("set"), - FLB_LOG_EVENT_CSTRING_VALUE("bucket"), - FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), - FLB_LOG_EVENT_CSTRING_VALUE("value"), - FLB_LOG_EVENT_STRING_VALUE(m->value, m->value_len)); - break; - } - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(ctx->log_encoder); - } - - return ret; -} - -static int statsd_process_line(struct flb_statsd *ctx, char *line) -{ - char *colon, *bar, *atmark; - struct statsd_message m; - - /* - * bucket:value|type|@sample_rate - * ------ - */ - colon = strchr(line, ':'); - if (colon == NULL) { - flb_plg_error(ctx->ins, "no bucket name found"); - return -1; - } - m.bucket = line; - m.bucket_len = (colon - line); - - /* - * bucket:value|type|@sample_rate - * ---- - */ - bar = strchr(colon + 1, '|'); - if (bar == NULL) { - flb_plg_error(ctx->ins, "no metric type found"); - return -1; - } - m.type = get_statsd_type(bar + 1); - - /* - * bucket:value|type|@sample_rate - * ----- - */ - m.value = colon + 1; - m.value_len = (bar - colon - 1); - - /* - * bucket:value|type|@sample_rate - * ------------ - */ - atmark = strstr(bar + 1, "|@"); - if (atmark == NULL || atof(atmark + 2) == 0) { - m.sample_rate = 1.0; - } - else { - m.sample_rate = atof(atmark + 2); - } - - return statsd_process_message(ctx, &m); -} - - -static int cb_statsd_receive(struct flb_input_instance *ins, - struct flb_config *config, void *data) -{ - int ret; - char *line; - int len; - struct flb_statsd *ctx = data; - - /* Receive a UDP datagram */ - len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0); - if (len < 0) { - flb_errno(); - return -1; - } - ctx->buf[len] = '\0'; - - ret = FLB_EVENT_ENCODER_SUCCESS; - /* Process all messages in buffer */ - line = strtok(ctx->buf, "\n"); - while (line != NULL) { - flb_plg_trace(ctx->ins, "received a line: '%s'", line); - - ret = statsd_process_line(ctx, line); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, "failed to process line: '%s'", line); - - break; - } - - line = strtok(NULL, "\n"); - } - - if (ctx->log_encoder->output_length > 0) { - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder->output_buffer, - ctx->log_encoder->output_length); - } - else { - flb_plg_error(ctx->ins, "log event encoding error : %d", ret); - } - - flb_log_event_encoder_reset(ctx->log_encoder); - - return 0; -} - -static int cb_statsd_init(struct flb_input_instance *ins, - struct flb_config *config, void *data) -{ - struct flb_statsd *ctx; - char *listen; - int port; - int ret; - - ctx = flb_calloc(1, sizeof(struct flb_statsd)); - if (!ctx) { - flb_errno(); - return -1; - } - ctx->ins = ins; - - ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (ctx->log_encoder == NULL) { - flb_plg_error(ins, "could not initialize event encoder"); - flb_free(ctx); - - return -1; - } - - ctx->buf = flb_malloc(MAX_PACKET_SIZE); - if (!ctx->buf) { - flb_errno(); - flb_log_event_encoder_destroy(ctx->log_encoder); - flb_free(ctx); - return -1; - } - - /* Load the config map */ - ret = flb_input_config_map_set(ins, (void *)ctx); - if (ret == -1) { - flb_plg_error(ins, "unable to load configuration"); - flb_log_event_encoder_destroy(ctx->log_encoder); - flb_free(ctx); - return -1; - } - - /* Listening address */ - if (ins->host.listen) { - listen = ins->host.listen; - } - else { - listen = DEFAULT_LISTEN; - } - strncpy(ctx->listen, listen, sizeof(ctx->listen) - 1); - - /* Listening port */ - if (ins->host.port) { - port = ins->host.port; - } - else { - port = DEFAULT_PORT; - } - snprintf(ctx->port, sizeof(ctx->port), "%hu", (unsigned short) port); - - /* Export plugin context */ - flb_input_set_context(ins, ctx); - - /* Accepts metrics from UDP connections. */ - ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen); - if (ctx->server_fd == -1) { - flb_plg_error(ctx->ins, "can't bind to %s:%s", ctx->listen, ctx->port); - flb_log_event_encoder_destroy(ctx->log_encoder); - flb_free(ctx->buf); - flb_free(ctx); - return -1; - } - - /* Set up the UDP connection callback */ - ctx->coll_fd = flb_input_set_collector_socket(ins, cb_statsd_receive, - ctx->server_fd, config); - if (ctx->coll_fd == -1) { - flb_plg_error(ctx->ins, "cannot set up connection callback "); - flb_log_event_encoder_destroy(ctx->log_encoder); - flb_socket_close(ctx->server_fd); - flb_free(ctx->buf); - flb_free(ctx); - return -1; - } - - flb_plg_info(ctx->ins, "start UDP server on %s:%s", ctx->listen, ctx->port); - return 0; -} - -static void cb_statsd_pause(void *data, struct flb_config *config) -{ - struct flb_statsd *ctx = data; - flb_input_collector_pause(ctx->coll_fd, ctx->ins); -} - -static void cb_statsd_resume(void *data, struct flb_config *config) -{ - struct flb_statsd *ctx = data; - flb_input_collector_resume(ctx->coll_fd, ctx->ins); -} - -static int cb_statsd_exit(void *data, struct flb_config *config) -{ - struct flb_statsd *ctx = data; - - if (ctx->log_encoder != NULL) { - flb_log_event_encoder_destroy(ctx->log_encoder); - } - - flb_socket_close(ctx->server_fd); - flb_free(ctx->buf); - flb_free(ctx); - - return 0; -} - -static struct flb_config_map config_map[] = { - /* EOF */ - {0} -}; - -/* Plugin reference */ -struct flb_input_plugin in_statsd_plugin = { - .name = "statsd", - .description = "StatsD input plugin", - .cb_init = cb_statsd_init, - .cb_pre_run = NULL, - .cb_collect = NULL, - .cb_ingest = NULL, - .cb_flush_buf = NULL, - .cb_pause = cb_statsd_pause, - .cb_resume = cb_statsd_resume, - .cb_exit = cb_statsd_exit, - .config_map = config_map, - .flags = FLB_INPUT_NET_SERVER, -}; |