diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/in_mqtt | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_mqtt')
-rw-r--r-- | fluent-bit/plugins/in_mqtt/CMakeLists.txt | 7 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt.c | 162 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt.h | 45 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt_config.c | 82 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt_config.h | 29 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt_conn.c | 157 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt_conn.h | 49 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt_prot.c | 465 | ||||
-rw-r--r-- | fluent-bit/plugins/in_mqtt/mqtt_prot.h | 62 |
9 files changed, 1058 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_mqtt/CMakeLists.txt b/fluent-bit/plugins/in_mqtt/CMakeLists.txt new file mode 100644 index 000000000..53259d541 --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + mqtt.c + mqtt_conn.c + mqtt_prot.c + mqtt_config.c) + +FLB_PLUGIN(in_mqtt "${src}" "") diff --git a/fluent-bit/plugins/in_mqtt/mqtt.c b/fluent-bit/plugins/in_mqtt/mqtt.c new file mode 100644 index 000000000..d1ae74f1e --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt.c @@ -0,0 +1,162 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_network.h> +#include <fluent-bit/flb_downstream.h> +#include <fluent-bit/flb_config_map.h> + +#include "mqtt.h" +#include "mqtt_conn.h" +#include "mqtt_config.h" + +/* Initialize plugin */ +static int in_mqtt_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_in_mqtt_config *ctx; + + (void) data; + + /* Allocate space for the configuration */ + ctx = mqtt_config_init(in); + if (!ctx) { + return -1; + } + ctx->ins = in; + ctx->msgp_len = 0; + + /* Set the context */ + flb_input_set_context(in, ctx); + + /* Create downstream */ + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + in->flags, + ctx->listen, + port, + in->tls, + config, + &in->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + mqtt_config_free(ctx); + + return -1; + } + + flb_input_downstream_set(ctx->downstream, ctx->ins); + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_event(in, + in_mqtt_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not set collector for MQTT input plugin"); + mqtt_config_free(ctx); + return -1; + } + + return 0; +} + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new MQTT instance which will wait for + * events/data (MQTT control packages) + */ +int in_mqtt_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct mqtt_conn *conn; + struct flb_in_mqtt_config *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_debug(ctx->ins, "[fd=%i] new TCP connection", connection->fd); + + conn = mqtt_conn_add(connection, ctx); + + if (!conn) { + flb_downstream_conn_release(connection); + + return -1; + } + + return 0; +} + +static int in_mqtt_exit(void *data, struct flb_config *config) +{ + (void) *config; + struct flb_in_mqtt_config *ctx = data; + + if (!ctx) { + return 0; + } + + mqtt_conn_destroy_all(ctx); + + mqtt_config_free(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "payload_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, payload_key), + "Key where the payload will be preserved" + }, + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_mqtt_plugin = { + .name = "mqtt", + .description = "MQTT, listen for Publish messages", + .cb_init = in_mqtt_init, + .cb_pre_run = NULL, + .cb_collect = in_mqtt_collect, + .cb_flush_buf = NULL, + .cb_exit = in_mqtt_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/fluent-bit/plugins/in_mqtt/mqtt.h b/fluent-bit/plugins/in_mqtt/mqtt.h new file mode 100644 index 000000000..01c3b7be9 --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt.h @@ -0,0 +1,45 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_MQTT_H +#define FLB_IN_MQTT_H + +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#define MQTT_MSGP_BUF_SIZE 8192 + +struct flb_in_mqtt_config { + char *listen; /* Listen interface */ + char *tcp_port; /* TCP Port */ + + flb_sds_t payload_key; /* payload key */ + + int msgp_len; /* msgpack data length */ + char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */ + struct flb_input_instance *ins; /* plugin input instance */ + struct flb_downstream *downstream; /* Client manager */ + struct mk_list conns; /* Active connections */ + struct flb_log_event_encoder *log_encoder; +}; + +int in_mqtt_collect(struct flb_input_instance *i_ins, + struct flb_config *config, void *in_context); + +#endif diff --git a/fluent-bit/plugins/in_mqtt/mqtt_config.c b/fluent-bit/plugins/in_mqtt/mqtt_config.c new file mode 100644 index 000000000..800834c05 --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt_config.c @@ -0,0 +1,82 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdlib.h> + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_downstream.h> +#include <fluent-bit/flb_utils.h> + +#include "mqtt.h" +#include "mqtt_config.h" + +struct flb_in_mqtt_config *mqtt_config_init(struct flb_input_instance *ins) +{ + char tmp[16]; + struct flb_in_mqtt_config *config; + int ret; + + config = flb_calloc(1, sizeof(struct flb_in_mqtt_config)); + if (!config) { + flb_errno(); + return NULL; + } + + ret = flb_input_config_map_set(ins, (void*) config); + if (ret == -1) { + flb_plg_error(ins, "could not initialize config map"); + flb_free(config); + return NULL; + } + + config->log_encoder = flb_log_event_encoder_create( + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (config->log_encoder == NULL) { + flb_plg_error(ins, "could not initialize event encoder"); + mqtt_config_free(config); + + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0) */ + flb_input_net_default_listener("0.0.0.0", 1883, ins); + + /* Map 'listen' and 'port' into the local context */ + config->listen = ins->host.listen; + snprintf(tmp, sizeof(tmp) - 1, "%d", ins->host.port); + config->tcp_port = flb_strdup(tmp); + + mk_list_init(&config->conns); + return config; +} + +void mqtt_config_free(struct flb_in_mqtt_config *config) +{ + if (config->downstream != NULL) { + flb_downstream_destroy(config->downstream); + } + + if (config->log_encoder != NULL) { + flb_log_event_encoder_destroy(config->log_encoder); + } + + flb_free(config->tcp_port); + flb_free(config); +} diff --git a/fluent-bit/plugins/in_mqtt/mqtt_config.h b/fluent-bit/plugins/in_mqtt/mqtt_config.h new file mode 100644 index 000000000..709c3dd95 --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_MQTT_CONFIG_H +#define FLB_MQTT_CONFIG_H + +#include "mqtt.h" +#include <fluent-bit/flb_input.h> + +struct flb_in_mqtt_config *mqtt_config_init(struct flb_input_instance *in); +void mqtt_config_free(struct flb_in_mqtt_config *config); + +#endif diff --git a/fluent-bit/plugins/in_mqtt/mqtt_conn.c b/fluent-bit/plugins/in_mqtt/mqtt_conn.c new file mode 100644 index 000000000..32ade2f6e --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt_conn.c @@ -0,0 +1,157 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_engine.h> +#include <fluent-bit/flb_network.h> +#include <fluent-bit/flb_downstream.h> + +#include "mqtt.h" +#include "mqtt_prot.h" +#include "mqtt_conn.h" + +/* Callback invoked every time an event is triggered for a connection */ +int mqtt_conn_event(void *data) +{ + int ret; + int bytes; + int available; + struct mk_event *event; + struct mqtt_conn *conn; + struct flb_in_mqtt_config *ctx; + struct flb_connection *connection; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = sizeof(conn->buf) - conn->buf_len; + + bytes = flb_io_net_read(connection, + (void *) &conn->buf[conn->buf_len], + available); + + if (bytes > 0) { + conn->buf_len += bytes; + flb_plg_trace(ctx->ins, "[fd=%i] read()=%i bytes", + connection->fd, + bytes); + + ret = mqtt_prot_parser(conn); + if (ret < 0) { + mqtt_conn_del(conn); + return -1; + } + } + else { + flb_plg_debug(ctx->ins, "[fd=%i] connection closed", + connection->fd); + + mqtt_conn_del(conn); + } + } + else if (event->mask & MK_EVENT_CLOSE) { + flb_plg_debug(ctx->ins, "[fd=%i] hangup", event->fd); + } + + return 0; +} + +/* Create a new mqtt request instance */ +struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection, + struct flb_in_mqtt_config *ctx) +{ + struct mqtt_conn *conn; + int ret; + + conn = flb_malloc(sizeof(struct mqtt_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = mqtt_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_pos = 0; + conn->buf_len = 0; + conn->buf_frame_end = 0; + conn->status = MQTT_NEW; + + /* Register instance into the event loop */ + ret = mk_event_add(flb_engine_evl_get(), + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + flb_free(conn); + + return NULL; + } + + mk_list_add(&conn->_head, &ctx->conns); + + return conn; +} + +int mqtt_conn_del(struct mqtt_conn *conn) +{ + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + /* Release resources */ + mk_list_del(&conn->_head); + + flb_free(conn); + + return 0; +} + +int mqtt_conn_destroy_all(struct flb_in_mqtt_config *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct mqtt_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->conns) { + conn = mk_list_entry(head, struct mqtt_conn, _head); + mqtt_conn_del(conn); + } + + return 0; +} diff --git a/fluent-bit/plugins/in_mqtt/mqtt_conn.h b/fluent-bit/plugins/in_mqtt/mqtt_conn.h new file mode 100644 index 000000000..43f98f09e --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt_conn.h @@ -0,0 +1,49 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_MQTT_CONN_H +#define FLB_MQTT_CONN_H + +#include <fluent-bit/flb_connection.h> + +enum { + MQTT_NEW = 1, /* it's a new connection */ + MQTT_CONNECTED = 2, /* MQTT connection per protocol spec OK */ + MQTT_NEXT = 4 /* Waiting for Control packets */ +}; + +/* This structure respresents a MQTT connection */ +struct mqtt_conn { + int status; /* Connection status */ + int packet_type; /* MQTT packet type */ + int packet_length; + int buf_frame_end; /* Frame end position */ + int buf_pos; /* Index position */ + int buf_len; /* Buffer content length */ + unsigned char buf[1024]; /* Buffer data */ + struct flb_in_mqtt_config *ctx; /* Plugin configuration context */ + struct flb_connection *connection; + struct mk_list _head; /* Link to flb_in_mqtt_config->conns */ +}; + +struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection, struct flb_in_mqtt_config *ctx); +int mqtt_conn_del(struct mqtt_conn *conn); +int mqtt_conn_destroy_all(struct flb_in_mqtt_config *ctx); + +#endif diff --git a/fluent-bit/plugins/in_mqtt/mqtt_prot.c b/fluent-bit/plugins/in_mqtt/mqtt_prot.c new file mode 100644 index 000000000..e0267daf2 --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt_prot.c @@ -0,0 +1,465 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <msgpack.h> + +#include "mqtt.h" +#include "mqtt_prot.h" + +#define BUFC() conn->buf[conn->buf_pos] +#define BUF_AVAIL() conn->buf_len - conn->buf_pos +#define BIT_SET(a, b) ((a) |= (1 << (b))) +#define BIT_CHECK(a, b) ((a) & (1 << (b))) + +/* +static inline void print_hex(struct mqtt_conn *conn) +{ + int x; + + printf("\n--------HEX--------> "); + printf("buf_pos=%i buf_len=%i\n", conn->buf_pos, conn->buf_len); + for (x = conn->buf_pos; x < conn->buf_len; x++) { + printf("%x ", conn->buf[x]); + } + printf("\n--------------------\n\n"); +} + +static inline void print_str(struct mqtt_conn *conn) +{ + int x; + + printf("\n--------HEX--------> "); + printf("buf_pos=%i buf_len=%i\n", conn->buf_pos, conn->buf_len); + for (x = conn->buf_pos; x < conn->buf_len; x++) { + printf("%c", conn->buf[x]); + } + printf("\n--------------------\n\n"); +} +*/ + +/* + * It drop the current packet from the buffer, it move the remaining bytes + * from right-to-left and adjust the new length. + */ +static inline int mqtt_packet_drop(struct mqtt_conn *conn) +{ + int move_bytes; + + if (conn->buf_pos == conn->buf_len) { + conn->buf_frame_end = 0; + conn->buf_len = 0; + conn->buf_pos = 0; + return 0; + } + + /* Check boundaries */ + if (conn->buf_pos + 1 > conn->buf_len) { + conn->buf_frame_end = 0; + conn->buf_len = 0; + conn->buf_pos = 0; + return 0; + } + + move_bytes = conn->buf_pos + 1; + memmove(conn->buf, + conn->buf + move_bytes, + conn->buf_len - move_bytes); + + conn->buf_frame_end = 0; + conn->buf_len -= move_bytes; + conn->buf_pos = 0; + + return 0; +} + +/* + * It writes the packet control header which includes the packet type + * and the remaining length of the packet. The incoming buffer must have + * at least 6 bytes of space. + * + * The function returns the number of bytes used. + */ +static inline int mqtt_packet_header(int type, int length, char *buf) +{ + int i = 0; + uint8_t byte; + + buf[i] = (type << 4) | 0; + i++; + + do { + byte = length % 128; + length = (length / 128); + if (length > 0) { + byte = (byte | 128); + } + buf[i] = byte; + i++; + } while (length > 0); + + return i; +} + +/* Collect a buffer of JSON data and convert it to Fluent Bit format */ +static int mqtt_data_append(char *topic, size_t topic_len, + char *msg, int msg_len, + void *in_context) +{ + int i; + int ret; + int root_type; + size_t out; + size_t off = 0; + char *pack; + msgpack_object root; + msgpack_unpacked result; + struct flb_in_mqtt_config *ctx = in_context; + + /* Convert our incoming JSON to MsgPack */ + ret = flb_pack_json(msg, msg_len, &pack, &out, &root_type, NULL); + if (ret != 0) { + flb_plg_warn(ctx->ins, "MQTT Packet incomplete or is not JSON"); + return -1; + } + + off = 0; + msgpack_unpacked_init(&result); + if (msgpack_unpack_next(&result, pack, out, &off) != MSGPACK_UNPACK_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + if (result.data.type != MSGPACK_OBJECT_MAP){ + msgpack_unpacked_destroy(&result); + return -1; + } + root = result.data; + + + ret = flb_log_event_encoder_begin_record(ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("topic"), + FLB_LOG_EVENT_STRING_VALUE(topic, topic_len)); + } + + if (ctx->payload_key) { + flb_log_event_encoder_append_body_string_length(ctx->log_encoder, flb_sds_len(ctx->payload_key)); + flb_log_event_encoder_append_body_string_body(ctx->log_encoder, ctx->payload_key, + flb_sds_len(ctx->payload_key)); + flb_log_event_encoder_body_begin_map(ctx->log_encoder); + } + + /* Re-pack original KVs */ + for (i = 0; + i < root.via.map.size && + ret == FLB_EVENT_ENCODER_SUCCESS; + i++) { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&root.via.map.ptr[i].key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&root.via.map.ptr[i].val)); + } + + if (ctx->payload_key) { + flb_log_event_encoder_body_commit_map(ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + ret = 0; + } + else { + flb_plg_error(ctx->ins, "log event encoding error : %d", ret); + + ret = -1; + } + + flb_log_event_encoder_reset(ctx->log_encoder); + + msgpack_unpacked_destroy(&result); + flb_free(pack); + + return ret; +} + + +/* + * Handle a CONNECT request control packet: + * + * basically we need to acknoledge the sender so it can start + * publishing messages to our service. + */ +static int mqtt_handle_connect(struct mqtt_conn *conn) +{ + int i; + int ret; + size_t sent; + char buf[4] = {0, 0, 0, 0}; + struct flb_in_mqtt_config *ctx = conn->ctx; + + i = mqtt_packet_header(MQTT_CONNACK, 2 , (char *) &buf); + BIT_SET(buf[i], 0); + i++; + buf[i] = MQTT_CONN_ACCEPTED; + + /* write CONNACK message */ + ret = flb_io_net_write(conn->connection, + (void *) buf, + 4, + &sent); + + flb_plg_trace(ctx->ins, "[fd=%i] CMD CONNECT (connack=%i bytes)", + conn->connection->fd, ret); + + return ret; +} + +/* + * Handle a PUBLISH control packet + */ +static int mqtt_handle_publish(struct mqtt_conn *conn) +{ + int topic; + int topic_len; + uint8_t qos; + size_t sent; + uint16_t hlen; + uint16_t packet_id; + char buf[4]; + struct flb_in_mqtt_config *ctx = conn->ctx; + + /* + * DUP: we skip duplicated messages. + * QOS: We process this. + * Retain: skipped + */ + + qos = ((conn->buf[0] >> 1) & 0x03); + conn->buf_pos++; + + /* Topic */ + hlen = BUFC() << 8; + conn->buf_pos++; + hlen |= BUFC(); + + /* Validate topic length against current buffer capacity (overflow) */ + if (hlen > (conn->buf_len - conn->buf_pos)) { + flb_plg_debug(ctx->ins, "invalid topic length"); + return -1; + } + + conn->buf_pos++; + topic = conn->buf_pos; + topic_len = hlen; + conn->buf_pos += hlen; + + /* Check QOS flag and respond if required */ + if (qos > MQTT_QOS_LEV0) { + /* Packet Identifier */ + packet_id = BUFC() << 8; + conn->buf_pos++; + packet_id |= BUFC(); + conn->buf_pos++; + + if (qos == MQTT_QOS_LEV1) { + mqtt_packet_header(MQTT_PUBACK, 2 , (char *) &buf); + } + else if (qos == MQTT_QOS_LEV2) { + mqtt_packet_header(MQTT_PUBREC, 2 , (char *) &buf); + } + /* Set the identifier that we are replying to */ + buf[2] = (packet_id >> 8) & 0xff; + buf[3] = (packet_id & 0xff); + + /* This operation should be checked */ + flb_io_net_write(conn->connection, + (void *) buf, + 4, + &sent); + } + + /* Message */ + mqtt_data_append((char *) (conn->buf + topic), topic_len, + (char *) (conn->buf + conn->buf_pos), + conn->buf_frame_end - conn->buf_pos + 1, + conn->ctx); + + flb_plg_trace(ctx->ins, "[fd=%i] CMD PUBLISH", + conn->connection->fd); + return 0; +} + +/* Handle a PINGREQ control packet */ +static int mqtt_handle_ping(struct mqtt_conn *conn) +{ + int ret; + size_t sent; + char buf[2] = {0, 0}; + struct flb_in_mqtt_config *ctx = conn->ctx; + + mqtt_packet_header(MQTT_PINGRESP, 0 , (char *) &buf); + + /* write PINGRESP message */ + + ret = flb_io_net_write(conn->connection, + (void *) buf, + 2, + &sent); + + flb_plg_trace(ctx->ins, "[fd=%i] CMD PING (pong=%i bytes)", + conn->connection->fd, ret); + return ret; +} + +int mqtt_prot_parser(struct mqtt_conn *conn) +{ + int ret; + int length = 0; + int pos = conn->buf_pos; + int mult; + struct flb_in_mqtt_config *ctx = conn->ctx; + + for (; conn->buf_pos < conn->buf_len; conn->buf_pos++) { + if (conn->status & (MQTT_NEW | MQTT_NEXT)) { + /* + * Do we have at least the Control Packet fixed header + * and the remaining length byte field ? + */ + if (BUF_AVAIL() < 2) { + conn->buf_pos = pos; + flb_plg_trace(ctx->ins, "[fd=%i] Need more data", + conn->connection->fd); + return MQTT_MORE; + } + + /* As the connection is new we expect a MQTT_CONNECT request */ + conn->packet_type = BUFC() >> 4; + if (conn->status == MQTT_NEW && conn->packet_type != MQTT_CONNECT) { + flb_plg_trace(ctx->ins, "[fd=%i] error, expecting MQTT_CONNECT", + conn->connection->fd); + return MQTT_ERROR; + } + conn->packet_length = conn->buf_pos; + conn->buf_pos++; + + /* Get the remaining length */ + mult = 1; + length = 0; + + do { + if (conn->buf_pos + 1 > conn->buf_len) { + conn->buf_pos = pos; + flb_plg_trace(ctx->ins, "[fd=%i] Need more data", + conn->connection->fd); + return MQTT_MORE; + } + + length += (BUFC() & 127) * mult; + mult *= 128; + if (mult > 128*128*128) { + return MQTT_ERROR; + } + + if (length + 2 > (conn->buf_len - pos)) { + conn->buf_pos = pos; + flb_plg_trace(ctx->ins, "[fd=%i] Need more data", + conn->connection->fd); + return MQTT_MORE; + } + + if ((BUFC() & 128) == 0) { + if (conn->buf_len - 2 < length) { + conn->buf_pos = pos; + flb_plg_trace(ctx->ins, "[fd=%i] Need more data", + conn->connection->fd); + return MQTT_MORE; + } + else { + conn->buf_frame_end = conn->buf_pos + length; + break; + } + } + + if (conn->buf_pos + 1 < conn->buf_len) { + conn->buf_pos++; + } + else { + conn->buf_pos = pos; + flb_plg_trace(ctx->ins, "[fd=%i] Need more data", + conn->connection->fd); + return MQTT_MORE; + } + } while (1); + + conn->packet_length = length; + + /* At this point we have a full control packet in place */ + if (conn->packet_type == MQTT_CONNECT) { + mqtt_handle_connect(conn); + } + else if (conn->packet_type == MQTT_PUBLISH) { + ret = mqtt_handle_publish(conn); + if (ret == -1) { + return MQTT_ERROR; + } + } + else if (conn->packet_type == MQTT_PINGREQ) { + mqtt_handle_ping(conn); + } + else if (conn->packet_type == MQTT_DISCONNECT) { + flb_plg_trace(ctx->ins, "[fd=%i] CMD DISCONNECT", + conn->connection->fd); + return MQTT_HANGUP; + } + else { + } + + /* Prepare for next round */ + conn->status = MQTT_NEXT; + conn->buf_pos = conn->buf_frame_end; + + mqtt_packet_drop(conn); + + if (conn->buf_len > 0) { + conn->buf_pos = -1; + } + } + } + conn->buf_pos--; + return 0; +} diff --git a/fluent-bit/plugins/in_mqtt/mqtt_prot.h b/fluent-bit/plugins/in_mqtt/mqtt_prot.h new file mode 100644 index 000000000..74c4fe32e --- /dev/null +++ b/fluent-bit/plugins/in_mqtt/mqtt_prot.h @@ -0,0 +1,62 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_MQTT_PROT_H +#define FLB_MQTT_PROT_H + +#include "mqtt_conn.h" + +/* + * Specs definition from 2.2.1 MQTT Control Packet: + * + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021 + */ +#define MQTT_CONNECT 1 +#define MQTT_CONNACK 2 +#define MQTT_PUBLISH 3 +#define MQTT_PUBACK 4 +#define MQTT_PUBREC 5 +#define MQTT_PUBREL 6 +#define MQTT_PUBCOMP 7 +#define MQTT_PINGREQ 12 +#define MQTT_PINGRESP 13 +#define MQTT_DISCONNECT 14 + +/* CONNACK status codes */ +#define MQTT_CONN_ACCEPTED 0 +#define MQTT_CONN_REFUSED_PROTOCOL 1 +#define MQTT_CONN_REFUSED_IDENTIF 2 +#define MQTT_CONN_REFUSED_SERVER 3 +#define MQTT_CONN_REFUSED_BADCRED 4 +#define MQTT_CONN_REFUSED_NOAUTH 5 + +/* QOS Flag status */ +#define MQTT_QOS_LEV0 0 /* no reply */ +#define MQTT_QOS_LEV1 1 /* PUBACK packet */ +#define MQTT_QOS_LEV2 2 /* PUBREC packet */ + +/* Specific macros for Fluent Bit handling, not related to MQTT spec */ +#define MQTT_HANGUP -2 /* MQTT client is closing */ +#define MQTT_ERROR -1 /* MQTT protocol error, hangup */ +#define MQTT_OK 0 /* Everything is OK */ +#define MQTT_MORE 1 /* need to read more data */ + +int mqtt_prot_parser(struct mqtt_conn *conn); + +#endif |