summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/in_mqtt
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/in_mqtt
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_mqtt')
-rw-r--r--fluent-bit/plugins/in_mqtt/CMakeLists.txt7
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt.c162
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt.h45
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt_config.c82
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt_config.h29
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt_conn.c157
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt_conn.h49
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt_prot.c465
-rw-r--r--fluent-bit/plugins/in_mqtt/mqtt_prot.h62
9 files changed, 1058 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_mqtt/CMakeLists.txt b/fluent-bit/plugins/in_mqtt/CMakeLists.txt
new file mode 100644
index 000000000..53259d541
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(src
+ mqtt.c
+ mqtt_conn.c
+ mqtt_prot.c
+ mqtt_config.c)
+
+FLB_PLUGIN(in_mqtt "${src}" "")
diff --git a/fluent-bit/plugins/in_mqtt/mqtt.c b/fluent-bit/plugins/in_mqtt/mqtt.c
new file mode 100644
index 000000000..d1ae74f1e
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt.c
@@ -0,0 +1,162 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_config_map.h>
+
+#include "mqtt.h"
+#include "mqtt_conn.h"
+#include "mqtt_config.h"
+
+/* Initialize plugin */
+static int in_mqtt_init(struct flb_input_instance *in,
+ struct flb_config *config, void *data)
+{
+ unsigned short int port;
+ int ret;
+ struct flb_in_mqtt_config *ctx;
+
+ (void) data;
+
+ /* Allocate space for the configuration */
+ ctx = mqtt_config_init(in);
+ if (!ctx) {
+ return -1;
+ }
+ ctx->ins = in;
+ ctx->msgp_len = 0;
+
+ /* Set the context */
+ flb_input_set_context(in, ctx);
+
+ /* Create downstream */
+ port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);
+
+ ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
+ in->flags,
+ ctx->listen,
+ port,
+ in->tls,
+ config,
+ &in->net_setup);
+
+ if (ctx->downstream == NULL) {
+ flb_plg_error(ctx->ins,
+ "could not initialize downstream on %s:%s. Aborting",
+ ctx->listen, ctx->tcp_port);
+
+ mqtt_config_free(ctx);
+
+ return -1;
+ }
+
+ flb_input_downstream_set(ctx->downstream, ctx->ins);
+
+ /* Collect upon data available on the standard input */
+ ret = flb_input_set_collector_event(in,
+ in_mqtt_collect,
+ ctx->downstream->server_fd,
+ config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not set collector for MQTT input plugin");
+ mqtt_config_free(ctx);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * For a server event, the collection event means a new client have arrived, we
+ * accept the connection and create a new MQTT instance which will wait for
+ * events/data (MQTT control packages)
+ */
+int in_mqtt_collect(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ struct flb_connection *connection;
+ struct mqtt_conn *conn;
+ struct flb_in_mqtt_config *ctx;
+
+ ctx = in_context;
+
+ connection = flb_downstream_conn_get(ctx->downstream);
+
+ if (connection == NULL) {
+ flb_plg_error(ctx->ins, "could not accept new connection");
+
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins, "[fd=%i] new TCP connection", connection->fd);
+
+ conn = mqtt_conn_add(connection, ctx);
+
+ if (!conn) {
+ flb_downstream_conn_release(connection);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static int in_mqtt_exit(void *data, struct flb_config *config)
+{
+ (void) *config;
+ struct flb_in_mqtt_config *ctx = data;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ mqtt_conn_destroy_all(ctx);
+
+ mqtt_config_free(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "payload_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, payload_key),
+ "Key where the payload will be preserved"
+ },
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_input_plugin in_mqtt_plugin = {
+ .name = "mqtt",
+ .description = "MQTT, listen for Publish messages",
+ .cb_init = in_mqtt_init,
+ .cb_pre_run = NULL,
+ .cb_collect = in_mqtt_collect,
+ .cb_flush_buf = NULL,
+ .cb_exit = in_mqtt_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
+};
diff --git a/fluent-bit/plugins/in_mqtt/mqtt.h b/fluent-bit/plugins/in_mqtt/mqtt.h
new file mode 100644
index 000000000..01c3b7be9
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt.h
@@ -0,0 +1,45 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_MQTT_H
+#define FLB_IN_MQTT_H
+
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#define MQTT_MSGP_BUF_SIZE 8192
+
+struct flb_in_mqtt_config {
+ char *listen; /* Listen interface */
+ char *tcp_port; /* TCP Port */
+
+ flb_sds_t payload_key; /* payload key */
+
+ int msgp_len; /* msgpack data length */
+ char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */
+ struct flb_input_instance *ins; /* plugin input instance */
+ struct flb_downstream *downstream; /* Client manager */
+ struct mk_list conns; /* Active connections */
+ struct flb_log_event_encoder *log_encoder;
+};
+
+int in_mqtt_collect(struct flb_input_instance *i_ins,
+ struct flb_config *config, void *in_context);
+
+#endif
diff --git a/fluent-bit/plugins/in_mqtt/mqtt_config.c b/fluent-bit/plugins/in_mqtt/mqtt_config.c
new file mode 100644
index 000000000..800834c05
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt_config.c
@@ -0,0 +1,82 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "mqtt.h"
+#include "mqtt_config.h"
+
+struct flb_in_mqtt_config *mqtt_config_init(struct flb_input_instance *ins)
+{
+ char tmp[16];
+ struct flb_in_mqtt_config *config;
+ int ret;
+
+ config = flb_calloc(1, sizeof(struct flb_in_mqtt_config));
+ if (!config) {
+ flb_errno();
+ return NULL;
+ }
+
+ ret = flb_input_config_map_set(ins, (void*) config);
+ if (ret == -1) {
+ flb_plg_error(ins, "could not initialize config map");
+ flb_free(config);
+ return NULL;
+ }
+
+ config->log_encoder = flb_log_event_encoder_create(
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (config->log_encoder == NULL) {
+ flb_plg_error(ins, "could not initialize event encoder");
+ mqtt_config_free(config);
+
+ return NULL;
+ }
+
+ /* Listen interface (if not set, defaults to 0.0.0.0) */
+ flb_input_net_default_listener("0.0.0.0", 1883, ins);
+
+ /* Map 'listen' and 'port' into the local context */
+ config->listen = ins->host.listen;
+ snprintf(tmp, sizeof(tmp) - 1, "%d", ins->host.port);
+ config->tcp_port = flb_strdup(tmp);
+
+ mk_list_init(&config->conns);
+ return config;
+}
+
+void mqtt_config_free(struct flb_in_mqtt_config *config)
+{
+ if (config->downstream != NULL) {
+ flb_downstream_destroy(config->downstream);
+ }
+
+ if (config->log_encoder != NULL) {
+ flb_log_event_encoder_destroy(config->log_encoder);
+ }
+
+ flb_free(config->tcp_port);
+ flb_free(config);
+}
diff --git a/fluent-bit/plugins/in_mqtt/mqtt_config.h b/fluent-bit/plugins/in_mqtt/mqtt_config.h
new file mode 100644
index 000000000..709c3dd95
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt_config.h
@@ -0,0 +1,29 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_MQTT_CONFIG_H
+#define FLB_MQTT_CONFIG_H
+
+#include "mqtt.h"
+#include <fluent-bit/flb_input.h>
+
+struct flb_in_mqtt_config *mqtt_config_init(struct flb_input_instance *in);
+void mqtt_config_free(struct flb_in_mqtt_config *config);
+
+#endif
diff --git a/fluent-bit/plugins/in_mqtt/mqtt_conn.c b/fluent-bit/plugins/in_mqtt/mqtt_conn.c
new file mode 100644
index 000000000..32ade2f6e
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt_conn.c
@@ -0,0 +1,157 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_engine.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_downstream.h>
+
+#include "mqtt.h"
+#include "mqtt_prot.h"
+#include "mqtt_conn.h"
+
+/* Callback invoked every time an event is triggered for a connection */
+int mqtt_conn_event(void *data)
+{
+ int ret;
+ int bytes;
+ int available;
+ struct mk_event *event;
+ struct mqtt_conn *conn;
+ struct flb_in_mqtt_config *ctx;
+ struct flb_connection *connection;
+
+ connection = (struct flb_connection *) data;
+
+ conn = connection->user_data;
+
+ ctx = conn->ctx;
+
+ event = &connection->event;
+
+ if (event->mask & MK_EVENT_READ) {
+ available = sizeof(conn->buf) - conn->buf_len;
+
+ bytes = flb_io_net_read(connection,
+ (void *) &conn->buf[conn->buf_len],
+ available);
+
+ if (bytes > 0) {
+ conn->buf_len += bytes;
+ flb_plg_trace(ctx->ins, "[fd=%i] read()=%i bytes",
+ connection->fd,
+ bytes);
+
+ ret = mqtt_prot_parser(conn);
+ if (ret < 0) {
+ mqtt_conn_del(conn);
+ return -1;
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins, "[fd=%i] connection closed",
+ connection->fd);
+
+ mqtt_conn_del(conn);
+ }
+ }
+ else if (event->mask & MK_EVENT_CLOSE) {
+ flb_plg_debug(ctx->ins, "[fd=%i] hangup", event->fd);
+ }
+
+ return 0;
+}
+
+/* Create a new mqtt request instance */
+struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection,
+ struct flb_in_mqtt_config *ctx)
+{
+ struct mqtt_conn *conn;
+ int ret;
+
+ conn = flb_malloc(sizeof(struct mqtt_conn));
+ if (!conn) {
+ flb_errno();
+ return NULL;
+ }
+
+ conn->connection = connection;
+
+ /* Set data for the event-loop */
+ MK_EVENT_NEW(&connection->event);
+
+ connection->user_data = conn;
+ connection->event.type = FLB_ENGINE_EV_CUSTOM;
+ connection->event.handler = mqtt_conn_event;
+
+ /* Connection info */
+ conn->ctx = ctx;
+ conn->buf_pos = 0;
+ conn->buf_len = 0;
+ conn->buf_frame_end = 0;
+ conn->status = MQTT_NEW;
+
+ /* Register instance into the event loop */
+ ret = mk_event_add(flb_engine_evl_get(),
+ connection->fd,
+ FLB_ENGINE_EV_CUSTOM,
+ MK_EVENT_READ,
+ &connection->event);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not register new connection");
+ flb_free(conn);
+
+ return NULL;
+ }
+
+ mk_list_add(&conn->_head, &ctx->conns);
+
+ return conn;
+}
+
+int mqtt_conn_del(struct mqtt_conn *conn)
+{
+ /* The downstream unregisters the file descriptor from the event-loop
+ * so there's nothing to be done by the plugin
+ */
+ flb_downstream_conn_release(conn->connection);
+
+ /* Release resources */
+ mk_list_del(&conn->_head);
+
+ flb_free(conn);
+
+ return 0;
+}
+
+int mqtt_conn_destroy_all(struct flb_in_mqtt_config *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mqtt_conn *conn;
+
+ mk_list_foreach_safe(head, tmp, &ctx->conns) {
+ conn = mk_list_entry(head, struct mqtt_conn, _head);
+ mqtt_conn_del(conn);
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_mqtt/mqtt_conn.h b/fluent-bit/plugins/in_mqtt/mqtt_conn.h
new file mode 100644
index 000000000..43f98f09e
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt_conn.h
@@ -0,0 +1,49 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_MQTT_CONN_H
+#define FLB_MQTT_CONN_H
+
+#include <fluent-bit/flb_connection.h>
+
+enum {
+ MQTT_NEW = 1, /* it's a new connection */
+ MQTT_CONNECTED = 2, /* MQTT connection per protocol spec OK */
+ MQTT_NEXT = 4 /* Waiting for Control packets */
+};
+
+/* This structure respresents a MQTT connection */
+struct mqtt_conn {
+ int status; /* Connection status */
+ int packet_type; /* MQTT packet type */
+ int packet_length;
+ int buf_frame_end; /* Frame end position */
+ int buf_pos; /* Index position */
+ int buf_len; /* Buffer content length */
+ unsigned char buf[1024]; /* Buffer data */
+ struct flb_in_mqtt_config *ctx; /* Plugin configuration context */
+ struct flb_connection *connection;
+ struct mk_list _head; /* Link to flb_in_mqtt_config->conns */
+};
+
+struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection, struct flb_in_mqtt_config *ctx);
+int mqtt_conn_del(struct mqtt_conn *conn);
+int mqtt_conn_destroy_all(struct flb_in_mqtt_config *ctx);
+
+#endif
diff --git a/fluent-bit/plugins/in_mqtt/mqtt_prot.c b/fluent-bit/plugins/in_mqtt/mqtt_prot.c
new file mode 100644
index 000000000..e0267daf2
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt_prot.c
@@ -0,0 +1,465 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <msgpack.h>
+
+#include "mqtt.h"
+#include "mqtt_prot.h"
+
+#define BUFC() conn->buf[conn->buf_pos]
+#define BUF_AVAIL() conn->buf_len - conn->buf_pos
+#define BIT_SET(a, b) ((a) |= (1 << (b)))
+#define BIT_CHECK(a, b) ((a) & (1 << (b)))
+
+/*
+static inline void print_hex(struct mqtt_conn *conn)
+{
+ int x;
+
+ printf("\n--------HEX--------> ");
+ printf("buf_pos=%i buf_len=%i\n", conn->buf_pos, conn->buf_len);
+ for (x = conn->buf_pos; x < conn->buf_len; x++) {
+ printf("%x ", conn->buf[x]);
+ }
+ printf("\n--------------------\n\n");
+}
+
+static inline void print_str(struct mqtt_conn *conn)
+{
+ int x;
+
+ printf("\n--------HEX--------> ");
+ printf("buf_pos=%i buf_len=%i\n", conn->buf_pos, conn->buf_len);
+ for (x = conn->buf_pos; x < conn->buf_len; x++) {
+ printf("%c", conn->buf[x]);
+ }
+ printf("\n--------------------\n\n");
+}
+*/
+
+/*
+ * It drop the current packet from the buffer, it move the remaining bytes
+ * from right-to-left and adjust the new length.
+ */
+static inline int mqtt_packet_drop(struct mqtt_conn *conn)
+{
+ int move_bytes;
+
+ if (conn->buf_pos == conn->buf_len) {
+ conn->buf_frame_end = 0;
+ conn->buf_len = 0;
+ conn->buf_pos = 0;
+ return 0;
+ }
+
+ /* Check boundaries */
+ if (conn->buf_pos + 1 > conn->buf_len) {
+ conn->buf_frame_end = 0;
+ conn->buf_len = 0;
+ conn->buf_pos = 0;
+ return 0;
+ }
+
+ move_bytes = conn->buf_pos + 1;
+ memmove(conn->buf,
+ conn->buf + move_bytes,
+ conn->buf_len - move_bytes);
+
+ conn->buf_frame_end = 0;
+ conn->buf_len -= move_bytes;
+ conn->buf_pos = 0;
+
+ return 0;
+}
+
+/*
+ * It writes the packet control header which includes the packet type
+ * and the remaining length of the packet. The incoming buffer must have
+ * at least 6 bytes of space.
+ *
+ * The function returns the number of bytes used.
+ */
+static inline int mqtt_packet_header(int type, int length, char *buf)
+{
+ int i = 0;
+ uint8_t byte;
+
+ buf[i] = (type << 4) | 0;
+ i++;
+
+ do {
+ byte = length % 128;
+ length = (length / 128);
+ if (length > 0) {
+ byte = (byte | 128);
+ }
+ buf[i] = byte;
+ i++;
+ } while (length > 0);
+
+ return i;
+}
+
+/* Collect a buffer of JSON data and convert it to Fluent Bit format */
+static int mqtt_data_append(char *topic, size_t topic_len,
+ char *msg, int msg_len,
+ void *in_context)
+{
+ int i;
+ int ret;
+ int root_type;
+ size_t out;
+ size_t off = 0;
+ char *pack;
+ msgpack_object root;
+ msgpack_unpacked result;
+ struct flb_in_mqtt_config *ctx = in_context;
+
+ /* Convert our incoming JSON to MsgPack */
+ ret = flb_pack_json(msg, msg_len, &pack, &out, &root_type, NULL);
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "MQTT Packet incomplete or is not JSON");
+ return -1;
+ }
+
+ off = 0;
+ msgpack_unpacked_init(&result);
+ if (msgpack_unpack_next(&result, pack, out, &off) != MSGPACK_UNPACK_SUCCESS) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ if (result.data.type != MSGPACK_OBJECT_MAP){
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ root = result.data;
+
+
+ ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ ctx->log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("topic"),
+ FLB_LOG_EVENT_STRING_VALUE(topic, topic_len));
+ }
+
+ if (ctx->payload_key) {
+ flb_log_event_encoder_append_body_string_length(ctx->log_encoder, flb_sds_len(ctx->payload_key));
+ flb_log_event_encoder_append_body_string_body(ctx->log_encoder, ctx->payload_key,
+ flb_sds_len(ctx->payload_key));
+ flb_log_event_encoder_body_begin_map(ctx->log_encoder);
+ }
+
+ /* Re-pack original KVs */
+ for (i = 0;
+ i < root.via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ ret = flb_log_event_encoder_append_body_values(
+ ctx->log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&root.via.map.ptr[i].key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&root.via.map.ptr[i].val));
+ }
+
+ if (ctx->payload_key) {
+ flb_log_event_encoder_body_commit_map(ctx->log_encoder);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ flb_input_log_append(ctx->ins, NULL, 0,
+ ctx->log_encoder->output_buffer,
+ ctx->log_encoder->output_length);
+ ret = 0;
+ }
+ else {
+ flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
+
+ ret = -1;
+ }
+
+ flb_log_event_encoder_reset(ctx->log_encoder);
+
+ msgpack_unpacked_destroy(&result);
+ flb_free(pack);
+
+ return ret;
+}
+
+
+/*
+ * Handle a CONNECT request control packet:
+ *
+ * basically we need to acknoledge the sender so it can start
+ * publishing messages to our service.
+ */
+static int mqtt_handle_connect(struct mqtt_conn *conn)
+{
+ int i;
+ int ret;
+ size_t sent;
+ char buf[4] = {0, 0, 0, 0};
+ struct flb_in_mqtt_config *ctx = conn->ctx;
+
+ i = mqtt_packet_header(MQTT_CONNACK, 2 , (char *) &buf);
+ BIT_SET(buf[i], 0);
+ i++;
+ buf[i] = MQTT_CONN_ACCEPTED;
+
+ /* write CONNACK message */
+ ret = flb_io_net_write(conn->connection,
+ (void *) buf,
+ 4,
+ &sent);
+
+ flb_plg_trace(ctx->ins, "[fd=%i] CMD CONNECT (connack=%i bytes)",
+ conn->connection->fd, ret);
+
+ return ret;
+}
+
+/*
+ * Handle a PUBLISH control packet
+ */
+static int mqtt_handle_publish(struct mqtt_conn *conn)
+{
+ int topic;
+ int topic_len;
+ uint8_t qos;
+ size_t sent;
+ uint16_t hlen;
+ uint16_t packet_id;
+ char buf[4];
+ struct flb_in_mqtt_config *ctx = conn->ctx;
+
+ /*
+ * DUP: we skip duplicated messages.
+ * QOS: We process this.
+ * Retain: skipped
+ */
+
+ qos = ((conn->buf[0] >> 1) & 0x03);
+ conn->buf_pos++;
+
+ /* Topic */
+ hlen = BUFC() << 8;
+ conn->buf_pos++;
+ hlen |= BUFC();
+
+ /* Validate topic length against current buffer capacity (overflow) */
+ if (hlen > (conn->buf_len - conn->buf_pos)) {
+ flb_plg_debug(ctx->ins, "invalid topic length");
+ return -1;
+ }
+
+ conn->buf_pos++;
+ topic = conn->buf_pos;
+ topic_len = hlen;
+ conn->buf_pos += hlen;
+
+ /* Check QOS flag and respond if required */
+ if (qos > MQTT_QOS_LEV0) {
+ /* Packet Identifier */
+ packet_id = BUFC() << 8;
+ conn->buf_pos++;
+ packet_id |= BUFC();
+ conn->buf_pos++;
+
+ if (qos == MQTT_QOS_LEV1) {
+ mqtt_packet_header(MQTT_PUBACK, 2 , (char *) &buf);
+ }
+ else if (qos == MQTT_QOS_LEV2) {
+ mqtt_packet_header(MQTT_PUBREC, 2 , (char *) &buf);
+ }
+ /* Set the identifier that we are replying to */
+ buf[2] = (packet_id >> 8) & 0xff;
+ buf[3] = (packet_id & 0xff);
+
+ /* This operation should be checked */
+ flb_io_net_write(conn->connection,
+ (void *) buf,
+ 4,
+ &sent);
+ }
+
+ /* Message */
+ mqtt_data_append((char *) (conn->buf + topic), topic_len,
+ (char *) (conn->buf + conn->buf_pos),
+ conn->buf_frame_end - conn->buf_pos + 1,
+ conn->ctx);
+
+ flb_plg_trace(ctx->ins, "[fd=%i] CMD PUBLISH",
+ conn->connection->fd);
+ return 0;
+}
+
+/* Handle a PINGREQ control packet */
+static int mqtt_handle_ping(struct mqtt_conn *conn)
+{
+ int ret;
+ size_t sent;
+ char buf[2] = {0, 0};
+ struct flb_in_mqtt_config *ctx = conn->ctx;
+
+ mqtt_packet_header(MQTT_PINGRESP, 0 , (char *) &buf);
+
+ /* write PINGRESP message */
+
+ ret = flb_io_net_write(conn->connection,
+ (void *) buf,
+ 2,
+ &sent);
+
+ flb_plg_trace(ctx->ins, "[fd=%i] CMD PING (pong=%i bytes)",
+ conn->connection->fd, ret);
+ return ret;
+}
+
+int mqtt_prot_parser(struct mqtt_conn *conn)
+{
+ int ret;
+ int length = 0;
+ int pos = conn->buf_pos;
+ int mult;
+ struct flb_in_mqtt_config *ctx = conn->ctx;
+
+ for (; conn->buf_pos < conn->buf_len; conn->buf_pos++) {
+ if (conn->status & (MQTT_NEW | MQTT_NEXT)) {
+ /*
+ * Do we have at least the Control Packet fixed header
+ * and the remaining length byte field ?
+ */
+ if (BUF_AVAIL() < 2) {
+ conn->buf_pos = pos;
+ flb_plg_trace(ctx->ins, "[fd=%i] Need more data",
+ conn->connection->fd);
+ return MQTT_MORE;
+ }
+
+ /* As the connection is new we expect a MQTT_CONNECT request */
+ conn->packet_type = BUFC() >> 4;
+ if (conn->status == MQTT_NEW && conn->packet_type != MQTT_CONNECT) {
+ flb_plg_trace(ctx->ins, "[fd=%i] error, expecting MQTT_CONNECT",
+ conn->connection->fd);
+ return MQTT_ERROR;
+ }
+ conn->packet_length = conn->buf_pos;
+ conn->buf_pos++;
+
+ /* Get the remaining length */
+ mult = 1;
+ length = 0;
+
+ do {
+ if (conn->buf_pos + 1 > conn->buf_len) {
+ conn->buf_pos = pos;
+ flb_plg_trace(ctx->ins, "[fd=%i] Need more data",
+ conn->connection->fd);
+ return MQTT_MORE;
+ }
+
+ length += (BUFC() & 127) * mult;
+ mult *= 128;
+ if (mult > 128*128*128) {
+ return MQTT_ERROR;
+ }
+
+ if (length + 2 > (conn->buf_len - pos)) {
+ conn->buf_pos = pos;
+ flb_plg_trace(ctx->ins, "[fd=%i] Need more data",
+ conn->connection->fd);
+ return MQTT_MORE;
+ }
+
+ if ((BUFC() & 128) == 0) {
+ if (conn->buf_len - 2 < length) {
+ conn->buf_pos = pos;
+ flb_plg_trace(ctx->ins, "[fd=%i] Need more data",
+ conn->connection->fd);
+ return MQTT_MORE;
+ }
+ else {
+ conn->buf_frame_end = conn->buf_pos + length;
+ break;
+ }
+ }
+
+ if (conn->buf_pos + 1 < conn->buf_len) {
+ conn->buf_pos++;
+ }
+ else {
+ conn->buf_pos = pos;
+ flb_plg_trace(ctx->ins, "[fd=%i] Need more data",
+ conn->connection->fd);
+ return MQTT_MORE;
+ }
+ } while (1);
+
+ conn->packet_length = length;
+
+ /* At this point we have a full control packet in place */
+ if (conn->packet_type == MQTT_CONNECT) {
+ mqtt_handle_connect(conn);
+ }
+ else if (conn->packet_type == MQTT_PUBLISH) {
+ ret = mqtt_handle_publish(conn);
+ if (ret == -1) {
+ return MQTT_ERROR;
+ }
+ }
+ else if (conn->packet_type == MQTT_PINGREQ) {
+ mqtt_handle_ping(conn);
+ }
+ else if (conn->packet_type == MQTT_DISCONNECT) {
+ flb_plg_trace(ctx->ins, "[fd=%i] CMD DISCONNECT",
+ conn->connection->fd);
+ return MQTT_HANGUP;
+ }
+ else {
+ }
+
+ /* Prepare for next round */
+ conn->status = MQTT_NEXT;
+ conn->buf_pos = conn->buf_frame_end;
+
+ mqtt_packet_drop(conn);
+
+ if (conn->buf_len > 0) {
+ conn->buf_pos = -1;
+ }
+ }
+ }
+ conn->buf_pos--;
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_mqtt/mqtt_prot.h b/fluent-bit/plugins/in_mqtt/mqtt_prot.h
new file mode 100644
index 000000000..74c4fe32e
--- /dev/null
+++ b/fluent-bit/plugins/in_mqtt/mqtt_prot.h
@@ -0,0 +1,62 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_MQTT_PROT_H
+#define FLB_MQTT_PROT_H
+
+#include "mqtt_conn.h"
+
+/*
+ * Specs definition from 2.2.1 MQTT Control Packet:
+ *
+ * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021
+ */
+#define MQTT_CONNECT 1
+#define MQTT_CONNACK 2
+#define MQTT_PUBLISH 3
+#define MQTT_PUBACK 4
+#define MQTT_PUBREC 5
+#define MQTT_PUBREL 6
+#define MQTT_PUBCOMP 7
+#define MQTT_PINGREQ 12
+#define MQTT_PINGRESP 13
+#define MQTT_DISCONNECT 14
+
+/* CONNACK status codes */
+#define MQTT_CONN_ACCEPTED 0
+#define MQTT_CONN_REFUSED_PROTOCOL 1
+#define MQTT_CONN_REFUSED_IDENTIF 2
+#define MQTT_CONN_REFUSED_SERVER 3
+#define MQTT_CONN_REFUSED_BADCRED 4
+#define MQTT_CONN_REFUSED_NOAUTH 5
+
+/* QOS Flag status */
+#define MQTT_QOS_LEV0 0 /* no reply */
+#define MQTT_QOS_LEV1 1 /* PUBACK packet */
+#define MQTT_QOS_LEV2 2 /* PUBREC packet */
+
+/* Specific macros for Fluent Bit handling, not related to MQTT spec */
+#define MQTT_HANGUP -2 /* MQTT client is closing */
+#define MQTT_ERROR -1 /* MQTT protocol error, hangup */
+#define MQTT_OK 0 /* Everything is OK */
+#define MQTT_MORE 1 /* need to read more data */
+
+int mqtt_prot_parser(struct mqtt_conn *conn);
+
+#endif