diff options
Diffstat (limited to 'fluent-bit/plugins/in_syslog/syslog_prot.c')
-rw-r--r-- | fluent-bit/plugins/in_syslog/syslog_prot.c | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_syslog/syslog_prot.c b/fluent-bit/plugins/in_syslog/syslog_prot.c new file mode 100644 index 000000000..1ec2c97cd --- /dev/null +++ b/fluent-bit/plugins/in_syslog/syslog_prot.c @@ -0,0 +1,324 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_parser.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> + +#include "syslog.h" +#include "syslog_conn.h" +#include "syslog_prot.h" + +#include <string.h> + +static inline void consume_bytes(char *buf, int bytes, int length) +{ + memmove(buf, buf + bytes, length - bytes); +} + +static int append_message_to_record_data(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *message_buffer, + size_t message_size, + int message_type) +{ + int result = FLB_MAP_NOT_MODIFIED; + char *modified_data_buffer; + int modified_data_size; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + *result_buffer = NULL; + *result_size = 0; + modified_data_buffer = NULL; + + if (message_key_name != NULL) { + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + if (message_type == MSGPACK_OBJECT_BIN) { + message_entry.val.type = MSGPACK_OBJECT_BIN; + message_entry.val.via.bin.size = message_size; + message_entry.val.via.bin.ptr = message_buffer; + } + else if (message_type == MSGPACK_OBJECT_STR) { + message_entry.val.type = MSGPACK_OBJECT_STR; + message_entry.val.via.str.size = message_size; + message_entry.val.via.str.ptr = message_buffer; + } + else { + result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE; + } + + if (result == FLB_MAP_NOT_MODIFIED) { + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } + } + } + + if (result == FLB_MAP_EXPAND_SUCCESS) { + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + } + + return result; +} + +static inline int pack_line(struct flb_syslog *ctx, + struct flb_time *time, + struct flb_connection *connection, + char *data, size_t data_size, + char *raw_data, size_t raw_data_size) +{ + char *modified_data_buffer; + size_t modified_data_size; + char *appended_address_buffer; + size_t appended_address_size; + int result; + char *source_address; + + source_address = NULL; + modified_data_buffer = NULL; + appended_address_buffer = NULL; + + if (ctx->raw_message_key != NULL) { + result = append_message_to_record_data(&modified_data_buffer, + &modified_data_size, + ctx->raw_message_key, + data, + data_size, + raw_data, + raw_data_size, + MSGPACK_OBJECT_BIN); + + if (result == FLB_MAP_EXPANSION_ERROR) { + flb_plg_debug(ctx->ins, "error expanding raw message : %d", result); + } + } + + if (ctx->source_address_key != NULL) { + source_address = flb_connection_get_remote_address(connection); + if (source_address != NULL) { + if (modified_data_buffer != NULL) { + result = append_message_to_record_data(&appended_address_buffer, + &appended_address_size, + ctx->source_address_key, + modified_data_buffer, + modified_data_size, + source_address, + strlen(source_address), + MSGPACK_OBJECT_STR); + } + else { + result = append_message_to_record_data(&appended_address_buffer, + &appended_address_size, + ctx->source_address_key, + data, + data_size, + source_address, + strlen(source_address), + MSGPACK_OBJECT_STR); + } + + if (result == FLB_MAP_EXPANSION_ERROR) { + flb_plg_debug(ctx->ins, "error expanding source_address : %d", result); + } + } + } + + result = flb_log_event_encoder_begin_record(ctx->log_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp(ctx->log_encoder, time); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + if (appended_address_buffer != NULL) { + result = flb_log_event_encoder_set_body_from_raw_msgpack( + ctx->log_encoder, appended_address_buffer, appended_address_size); + } + else if (modified_data_buffer != NULL) { + result = flb_log_event_encoder_set_body_from_raw_msgpack( + ctx->log_encoder, modified_data_buffer, modified_data_size); + } + else { + result = flb_log_event_encoder_set_body_from_raw_msgpack( + ctx->log_encoder, data, data_size); + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(ctx->log_encoder); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + result = 0; + } + else { + flb_plg_error(ctx->ins, "log event encoding error : %d", result); + + result = -1; + } + + flb_log_event_encoder_reset(ctx->log_encoder); + + if (modified_data_buffer != NULL) { + flb_free(modified_data_buffer); + } + if (appended_address_buffer != NULL) { + flb_free(appended_address_buffer); + } + + return result; +} + +int syslog_prot_process(struct syslog_conn *conn) +{ + int len; + int ret; + char *p; + char *eof; + char *end; + void *out_buf; + size_t out_size; + struct flb_time out_time; + struct flb_syslog *ctx = conn->ctx; + + eof = conn->buf_data; + end = conn->buf_data + conn->buf_len; + + /* Always parse while some remaining bytes exists */ + while (eof < end) { + /* Lookup the ending byte */ + eof = p = conn->buf_data + conn->buf_parsed; + while (*eof != '\n' && *eof != '\0' && eof < end) { + eof++; + } + + /* Incomplete message */ + if (eof == end || (*eof != '\n' && *eof != '\0')) { + break; + } + + /* No data ? */ + len = (eof - p); + if (len == 0) { + consume_bytes(conn->buf_data, 1, conn->buf_len); + conn->buf_len--; + conn->buf_parsed = 0; + conn->buf_data[conn->buf_len] = '\0'; + end = conn->buf_data + conn->buf_len; + + if (conn->buf_len == 0) { + break; + } + + continue; + } + + /* Process the string */ + ret = flb_parser_do(ctx->parser, p, len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + if (flb_time_to_nanosec(&out_time) == 0L) { + flb_time_get(&out_time); + } + pack_line(ctx, &out_time, + conn->connection, + out_buf, out_size, + p, len); + flb_free(out_buf); + } + else { + flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'", + ctx->parser->name); + flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p); + } + + conn->buf_parsed += len + 1; + end = conn->buf_data + conn->buf_len; + eof = conn->buf_data + conn->buf_parsed; + } + + if (conn->buf_parsed > 0) { + consume_bytes(conn->buf_data, conn->buf_parsed, conn->buf_len); + conn->buf_len -= conn->buf_parsed; + conn->buf_parsed = 0; + conn->buf_data[conn->buf_len] = '\0'; + } + + return 0; +} + +int syslog_prot_process_udp(struct syslog_conn *conn) +{ + int ret; + void *out_buf; + size_t out_size; + struct flb_time out_time = {0}; + char *buf; + size_t size; + struct flb_syslog *ctx; + struct flb_connection *connection; + + buf = conn->buf_data; + size = conn->buf_len; + ctx = conn->ctx; + connection = conn->connection; + + ret = flb_parser_do(ctx->parser, buf, size, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + if (flb_time_to_double(&out_time) == 0) { + flb_time_get(&out_time); + } + pack_line(ctx, &out_time, + connection, + out_buf, out_size, + buf, size); + flb_free(out_buf); + } + else { + flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'", + ctx->parser->name); + flb_plg_debug(ctx->ins, "unparsed log message: %.*s", + (int) size, buf); + return -1; + } + + return 0; +} |