summaryrefslogtreecommitdiffstats
path: root/rust/src/mqtt
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:39:49 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:39:49 +0000
commita0aa2307322cd47bbf416810ac0292925e03be87 (patch)
tree37076262a026c4b48c8a0e84f44ff9187556ca35 /rust/src/mqtt
parentInitial commit. (diff)
downloadsuricata-a0aa2307322cd47bbf416810ac0292925e03be87.tar.xz
suricata-a0aa2307322cd47bbf416810ac0292925e03be87.zip
Adding upstream version 1:7.0.3.upstream/1%7.0.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'rust/src/mqtt')
-rw-r--r--rust/src/mqtt/detect.rs520
-rw-r--r--rust/src/mqtt/logger.rs305
-rw-r--r--rust/src/mqtt/mod.rs25
-rw-r--r--rust/src/mqtt/mqtt.rs805
-rw-r--r--rust/src/mqtt/mqtt_message.rs205
-rw-r--r--rust/src/mqtt/mqtt_property.rs269
-rw-r--r--rust/src/mqtt/parser.rs1135
7 files changed, 3264 insertions, 0 deletions
diff --git a/rust/src/mqtt/detect.rs b/rust/src/mqtt/detect.rs
new file mode 100644
index 0000000..b47a84f
--- /dev/null
+++ b/rust/src/mqtt/detect.rs
@@ -0,0 +1,520 @@
+/* Copyright (C) 2020 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+// written by Sascha Steinbiss <sascha@steinbiss.name>
+
+use crate::mqtt::mqtt::{MQTTState, MQTTTransaction};
+use crate::mqtt::mqtt_message::{MQTTOperation, MQTTTypeCode};
+use std::ffi::CStr;
+use std::ptr;
+use std::str::FromStr;
+
+#[derive(FromPrimitive, Debug, Copy, Clone, PartialOrd, PartialEq, Eq)]
+#[allow(non_camel_case_types)]
+#[repr(u8)]
+pub enum MQTTFlagState {
+ MQTT_DONT_CARE = 0,
+ MQTT_MUST_BE_SET = 1,
+ MQTT_CANT_BE_SET = 2,
+}
+
+#[inline]
+fn check_flag_state(flag_state: MQTTFlagState, flag_value: bool, ok: &mut bool) {
+ match flag_state {
+ MQTTFlagState::MQTT_MUST_BE_SET => {
+ if !flag_value {
+ *ok = false;
+ }
+ }
+ MQTTFlagState::MQTT_CANT_BE_SET => {
+ if flag_value {
+ *ok = false;
+ }
+ }
+ _ => {}
+ }
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_tx_has_type(tx: &MQTTTransaction, mtype: u8) -> u8 {
+ for msg in tx.msg.iter() {
+ if mtype == msg.header.message_type as u8 {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_cstr_message_code(
+ str: *const std::os::raw::c_char,
+) -> std::os::raw::c_int {
+ let msgtype: &CStr = CStr::from_ptr(str);
+ if let Ok(s) = msgtype.to_str() {
+ if let Ok(x) = MQTTTypeCode::from_str(s) {
+ return x as i32;
+ }
+ }
+ return -1;
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_tx_has_flags(
+ tx: &MQTTTransaction, qretain: MQTTFlagState, qdup: MQTTFlagState,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ let mut ok = true;
+ check_flag_state(qretain, msg.header.retain, &mut ok);
+ check_flag_state(qdup, msg.header.dup_flag, &mut ok);
+ if ok {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_tx_has_qos(tx: &MQTTTransaction, qos: u8) -> u8 {
+ for msg in tx.msg.iter() {
+ if qos == msg.header.qos_level {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_tx_get_protocol_version(state: &MQTTState) -> u8 {
+ return state.protocol_version;
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_tx_has_connect_flags(
+ tx: &MQTTTransaction, username: MQTTFlagState, password: MQTTFlagState, will: MQTTFlagState,
+ will_retain: MQTTFlagState, clean_session: MQTTFlagState,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNECT(ref cv) = msg.op {
+ let mut ok = true;
+ check_flag_state(username, cv.username_flag, &mut ok);
+ check_flag_state(password, cv.password_flag, &mut ok);
+ check_flag_state(will, cv.will_flag, &mut ok);
+ check_flag_state(will_retain, cv.will_retain, &mut ok);
+ check_flag_state(clean_session, cv.clean_session, &mut ok);
+ if ok {
+ return 1;
+ }
+ }
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNECT(ref cv) = msg.op {
+ let p = &cv.client_id;
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNECT(ref cv) = msg.op {
+ if let Some(p) = &cv.username {
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNECT(ref cv) = msg.op {
+ if let Some(p) = &cv.password {
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNECT(ref cv) = msg.op {
+ if let Some(p) = &cv.will_topic {
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNECT(ref cv) = msg.op {
+ if let Some(p) = &cv.will_message {
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent(
+ tx: &MQTTTransaction, session_present: *mut bool,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::CONNACK(ref ca) = msg.op {
+ *session_present = ca.session_present;
+ return 1;
+ }
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::PUBLISH(ref pubv) = msg.op {
+ let p = &pubv.topic;
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message(
+ tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
+) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::PUBLISH(ref pubv) = msg.op {
+ let p = &pubv.message;
+ if !p.is_empty() {
+ *buffer = p.as_ptr();
+ *buffer_len = p.len() as u32;
+ return 1;
+ }
+ }
+ }
+
+ *buffer = ptr::null();
+ *buffer_len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(
+ tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32,
+) -> u8 {
+ let mut offset = 0;
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::SUBSCRIBE(ref subv) = msg.op {
+ if (i as usize) < subv.topics.len() + offset {
+ let topic = &subv.topics[(i as usize) - offset];
+ if !topic.topic_name.is_empty() {
+ *len = topic.topic_name.len() as u32;
+ *buf = topic.topic_name.as_ptr();
+ return 1;
+ }
+ } else {
+ offset += subv.topics.len();
+ }
+ }
+ }
+
+ *buf = ptr::null();
+ *len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(
+ tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32,
+) -> u8 {
+ let mut offset = 0;
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::UNSUBSCRIBE(ref unsubv) = msg.op {
+ if (i as usize) < unsubv.topics.len() + offset {
+ let topic = &unsubv.topics[(i as usize) - offset];
+ if !topic.is_empty() {
+ *len = topic.len() as u32;
+ *buf = topic.as_ptr();
+ return 1;
+ }
+ } else {
+ offset += unsubv.topics.len();
+ }
+ }
+ }
+
+ *buf = ptr::null();
+ *len = 0;
+
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(tx: &MQTTTransaction, result: *mut u8) -> u8 {
+ for msg in tx.msg.iter() {
+ match msg.op {
+ MQTTOperation::PUBACK(ref v)
+ | MQTTOperation::PUBREL(ref v)
+ | MQTTOperation::PUBREC(ref v)
+ | MQTTOperation::PUBCOMP(ref v) => {
+ if let Some(rcode) = v.reason_code {
+ *result = rcode;
+ return 1;
+ }
+ }
+ MQTTOperation::AUTH(ref v) => {
+ *result = v.reason_code;
+ return 1;
+ }
+ MQTTOperation::CONNACK(ref v) => {
+ *result = v.return_code;
+ return 1;
+ }
+ MQTTOperation::DISCONNECT(ref v) => {
+ if let Some(rcode) = v.reason_code {
+ *result = rcode;
+ return 1;
+ }
+ }
+ _ => return 0,
+ }
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(tx: &MQTTTransaction, code: u8) -> u8 {
+ for msg in tx.msg.iter() {
+ if let MQTTOperation::UNSUBACK(ref unsuback) = msg.op {
+ if let Some(ref reason_codes) = unsuback.reason_codes {
+ for rc in reason_codes.iter() {
+ if *rc == code {
+ return 1;
+ }
+ }
+ }
+ }
+ }
+ return 0;
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::mqtt::mqtt::MQTTTransaction;
+ use crate::mqtt::mqtt_message::*;
+ use crate::mqtt::parser::FixedHeader;
+ use crate::core::Direction;
+ use std;
+
+ #[test]
+ fn test_multi_unsubscribe() {
+ let mut t = MQTTTransaction::new(MQTTMessage {
+ header: FixedHeader {
+ message_type: MQTTTypeCode::UNSUBSCRIBE,
+ dup_flag: false,
+ qos_level: 0,
+ retain: false,
+ remaining_length: 0,
+ },
+ op: MQTTOperation::UNSUBSCRIBE(MQTTUnsubscribeData {
+ message_id: 1,
+ topics: vec!["foo".to_string(), "baar".to_string()],
+ properties: None,
+ }),
+ }, Direction::ToServer);
+ t.msg.push(MQTTMessage {
+ header: FixedHeader {
+ message_type: MQTTTypeCode::UNSUBSCRIBE,
+ dup_flag: false,
+ qos_level: 0,
+ retain: false,
+ remaining_length: 0,
+ },
+ op: MQTTOperation::UNSUBSCRIBE(MQTTUnsubscribeData {
+ message_id: 1,
+ topics: vec!["fieee".to_string(), "baaaaz".to_string()],
+ properties: None,
+ }),
+ });
+ let mut s: *const u8 = std::ptr::null_mut();
+ let mut slen: u32 = 0;
+ let mut r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "foo");
+ r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "baar");
+ r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "fieee");
+ r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "baaaaz");
+ r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen) };
+ assert_eq!(r, 0);
+ }
+
+ #[test]
+ fn test_multi_subscribe() {
+ let mut t = MQTTTransaction::new(MQTTMessage {
+ header: FixedHeader {
+ message_type: MQTTTypeCode::SUBSCRIBE,
+ dup_flag: false,
+ qos_level: 0,
+ retain: false,
+ remaining_length: 0,
+ },
+ op: MQTTOperation::SUBSCRIBE(MQTTSubscribeData {
+ message_id: 1,
+ topics: vec![
+ MQTTSubscribeTopicData {
+ topic_name: "foo".to_string(),
+ qos: 0,
+ },
+ MQTTSubscribeTopicData {
+ topic_name: "baar".to_string(),
+ qos: 1,
+ },
+ ],
+ properties: None,
+ }),
+ }, Direction::ToServer);
+ t.msg.push(MQTTMessage {
+ header: FixedHeader {
+ message_type: MQTTTypeCode::SUBSCRIBE,
+ dup_flag: false,
+ qos_level: 0,
+ retain: false,
+ remaining_length: 0,
+ },
+ op: MQTTOperation::SUBSCRIBE(MQTTSubscribeData {
+ message_id: 1,
+ topics: vec![
+ MQTTSubscribeTopicData {
+ topic_name: "fieee".to_string(),
+ qos: 0,
+ },
+ MQTTSubscribeTopicData {
+ topic_name: "baaaaz".to_string(),
+ qos: 1,
+ },
+ ],
+ properties: None,
+ }),
+ });
+ let mut s: *const u8 = std::ptr::null_mut();
+ let mut slen: u32 = 0;
+ let mut r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "foo");
+ r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "baar");
+ r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "fieee");
+ r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen) };
+ assert_eq!(r, 1);
+ topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
+ assert_eq!(topic, "baaaaz");
+ r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen) };
+ assert_eq!(r, 0);
+ }
+}
diff --git a/rust/src/mqtt/logger.rs b/rust/src/mqtt/logger.rs
new file mode 100644
index 0000000..af0db17
--- /dev/null
+++ b/rust/src/mqtt/logger.rs
@@ -0,0 +1,305 @@
+/* Copyright (C) 2020 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+// written by Sascha Steinbiss <sascha@steinbiss.name>
+
+use super::mqtt::MQTTTransaction;
+use crate::jsonbuilder::{JsonBuilder, JsonError};
+use crate::mqtt::mqtt_message::{MQTTOperation, MQTTSubscribeTopicData};
+use crate::mqtt::parser::FixedHeader;
+use std;
+
+pub const MQTT_LOG_PASSWORDS: u32 = BIT_U32!(0);
+
+#[inline]
+fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> {
+ js.start_object()?;
+ js.set_string("topic", &t.topic_name)?;
+ js.set_uint("qos", t.qos as u64)?;
+ js.close()?;
+ return Ok(());
+}
+
+#[inline]
+fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> {
+ js.set_uint("qos", hdr.qos_level as u64)?;
+ js.set_bool("retain", hdr.retain)?;
+ js.set_bool("dup", hdr.dup_flag)?;
+ return Ok(());
+}
+
+fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<(), JsonError> {
+ js.open_object("mqtt")?;
+ for msg in tx.msg.iter() {
+ match msg.op {
+ MQTTOperation::CONNECT(ref conn) => {
+ js.open_object("connect")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_string("protocol_string", &conn.protocol_string)?;
+ js.set_uint("protocol_version", conn.protocol_version as u64)?;
+ js.set_string("client_id", &conn.client_id)?;
+ js.open_object("flags")?;
+ js.set_bool("username", conn.username_flag)?;
+ js.set_bool("password", conn.password_flag)?;
+ js.set_bool("will_retain", conn.will_retain)?;
+ js.set_bool("will", conn.will_flag)?;
+ js.set_bool("clean_session", conn.clean_session)?;
+ js.close()?; // flags
+ if let Some(user) = &conn.username {
+ js.set_string("username", user)?;
+ }
+ if flags & MQTT_LOG_PASSWORDS != 0 {
+ if let Some(pass) = &conn.password {
+ js.set_string_from_bytes("password", pass)?;
+ }
+ }
+ if conn.will_flag {
+ js.open_object("will")?;
+ if let Some(will_topic) = &conn.will_topic {
+ js.set_string("topic", will_topic)?;
+ }
+ if let Some(will_message) = &conn.will_message {
+ js.set_string_from_bytes("message", will_message)?;
+ }
+ if let Some(will_properties) = &conn.will_properties {
+ js.open_object("properties")?;
+ for prop in will_properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // will
+ }
+ if let Some(properties) = &conn.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // connect
+ }
+ MQTTOperation::CONNACK(ref connack) => {
+ js.open_object("connack")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_bool("session_present", connack.session_present)?;
+ js.set_uint("return_code", connack.return_code as u64)?;
+ if let Some(properties) = &connack.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // connack
+ }
+ MQTTOperation::PUBLISH(ref publish) => {
+ js.open_object("publish")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_string("topic", &publish.topic)?;
+ if let Some(message_id) = publish.message_id {
+ js.set_uint("message_id", message_id as u64)?;
+ }
+ js.set_string_from_bytes("message", &publish.message)?;
+ if let Some(properties) = &publish.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // publish
+ }
+ MQTTOperation::PUBACK(ref msgidonly) => {
+ js.open_object("puback")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", msgidonly.message_id as u64)?;
+ if let Some(reason_code) = &msgidonly.reason_code {
+ js.set_uint("reason_code", *reason_code as u64)?;
+ }
+ if let Some(properties) = &msgidonly.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // puback
+ }
+ MQTTOperation::PUBREC(ref msgidonly) => {
+ js.open_object("pubrec")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", msgidonly.message_id as u64)?;
+ if let Some(reason_code) = &msgidonly.reason_code {
+ js.set_uint("reason_code", *reason_code as u64)?;
+ }
+ if let Some(properties) = &msgidonly.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // pubrec
+ }
+ MQTTOperation::PUBREL(ref msgidonly) => {
+ js.open_object("pubrel")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", msgidonly.message_id as u64)?;
+ if let Some(reason_code) = &msgidonly.reason_code {
+ js.set_uint("reason_code", *reason_code as u64)?;
+ }
+ if let Some(properties) = &msgidonly.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // pubrel
+ }
+ MQTTOperation::PUBCOMP(ref msgidonly) => {
+ js.open_object("pubcomp")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", msgidonly.message_id as u64)?;
+ if let Some(reason_code) = &msgidonly.reason_code {
+ js.set_uint("reason_code", *reason_code as u64)?;
+ }
+ if let Some(properties) = &msgidonly.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // pubcomp
+ }
+ MQTTOperation::SUBSCRIBE(ref subs) => {
+ js.open_object("subscribe")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", subs.message_id as u64)?;
+ js.open_array("topics")?;
+ for t in &subs.topics {
+ log_mqtt_topic(js, t)?;
+ }
+ js.close()?; //topics
+ if let Some(properties) = &subs.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // subscribe
+ }
+ MQTTOperation::SUBACK(ref suback) => {
+ js.open_object("suback")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", suback.message_id as u64)?;
+ js.open_array("qos_granted")?;
+ for t in &suback.qoss {
+ js.append_uint(*t as u64)?;
+ }
+ js.close()?; // qos_granted
+ js.close()?; // suback
+ }
+ MQTTOperation::UNSUBSCRIBE(ref unsub) => {
+ js.open_object("unsubscribe")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", unsub.message_id as u64)?;
+ js.open_array("topics")?;
+ for t in &unsub.topics {
+ js.append_string(t)?;
+ }
+ js.close()?; // topics
+ js.close()?; // unsubscribe
+ }
+ MQTTOperation::UNSUBACK(ref unsuback) => {
+ js.open_object("unsuback")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("message_id", unsuback.message_id as u64)?;
+ if let Some(codes) = &unsuback.reason_codes {
+ if !codes.is_empty() {
+ js.open_array("reason_codes")?;
+ for t in codes {
+ js.append_uint(*t as u64)?;
+ }
+ js.close()?; // reason_codes
+ }
+ }
+ js.close()?; // unsuback
+ }
+ MQTTOperation::PINGREQ => {
+ js.open_object("pingreq")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.close()?; // pingreq
+ }
+ MQTTOperation::PINGRESP => {
+ js.open_object("pingresp")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.close()?; // pingresp
+ }
+ MQTTOperation::AUTH(ref auth) => {
+ js.open_object("auth")?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_uint("reason_code", auth.reason_code as u64)?;
+ if let Some(properties) = &auth.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // auth
+ }
+ MQTTOperation::DISCONNECT(ref disco) => {
+ js.open_object("disconnect")?;
+ log_mqtt_header(js, &msg.header)?;
+ if let Some(reason_code) = &disco.reason_code {
+ js.set_uint("reason_code", *reason_code as u64)?;
+ }
+ if let Some(properties) = &disco.properties {
+ js.open_object("properties")?;
+ for prop in properties {
+ prop.set_json(js)?;
+ }
+ js.close()?; // properties
+ }
+ js.close()?; // disconnect
+ }
+ MQTTOperation::TRUNCATED(ref trunc) => {
+ js.open_object(&trunc.original_message_type.to_lower_str())?;
+ log_mqtt_header(js, &msg.header)?;
+ js.set_bool("truncated", true)?;
+ js.set_uint("skipped_length", trunc.skipped_length as u64)?;
+ js.close()?; // truncated
+ }
+ MQTTOperation::UNASSIGNED => {}
+ }
+ }
+ js.close()?; // mqtt
+
+ return Ok(());
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_logger_log(
+ tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder,
+) -> bool {
+ let tx = cast_pointer!(tx, MQTTTransaction);
+ log_mqtt(tx, flags, js).is_ok()
+}
diff --git a/rust/src/mqtt/mod.rs b/rust/src/mqtt/mod.rs
new file mode 100644
index 0000000..aefcc33
--- /dev/null
+++ b/rust/src/mqtt/mod.rs
@@ -0,0 +1,25 @@
+/* Copyright (C) 2020 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+//! MQTT application layer, detection, logger and parser module.
+
+pub mod detect;
+pub mod logger;
+pub mod mqtt;
+pub mod mqtt_message;
+pub mod mqtt_property;
+pub mod parser;
diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs
new file mode 100644
index 0000000..3f110df
--- /dev/null
+++ b/rust/src/mqtt/mqtt.rs
@@ -0,0 +1,805 @@
+/* Copyright (C) 2020-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+// written by Sascha Steinbiss <sascha@steinbiss.name>
+
+use super::mqtt_message::*;
+use super::parser::*;
+use crate::applayer::*;
+use crate::applayer::{self, LoggerFlags};
+use crate::conf::conf_get;
+use crate::core::*;
+use crate::frames::*;
+use nom7::Err;
+use std;
+use std::collections::VecDeque;
+use std::ffi::CString;
+
+// Used as a special pseudo packet identifier to denote the first CONNECT
+// packet in a connection. Note that there is no risk of collision with a
+// parsed packet identifier because in the protocol these are only 16 bit
+// unsigned.
+const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
+// Maximum message length in bytes. If the length of a message exceeds
+// this value, it will be truncated. Default: 1MB.
+static mut MAX_MSG_LEN: u32 = 1048576;
+
+static mut MQTT_MAX_TX: usize = 1024;
+
+static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
+
+#[derive(AppLayerFrameType)]
+pub enum MQTTFrameType {
+ Pdu,
+ Header,
+ Data,
+}
+
+#[derive(FromPrimitive, Debug, AppLayerEvent)]
+pub enum MQTTEvent {
+ MissingConnect,
+ MissingPublish,
+ MissingSubscribe,
+ MissingUnsubscribe,
+ DoubleConnect,
+ UnintroducedMessage,
+ InvalidQosLevel,
+ MissingMsgId,
+ UnassignedMsgType,
+ TooManyTransactions,
+ MalformedTraffic,
+}
+
+#[derive(Debug)]
+pub struct MQTTTransaction {
+ tx_id: u64,
+ pkt_id: Option<u32>,
+ pub msg: Vec<MQTTMessage>,
+ complete: bool,
+ toclient: bool,
+ toserver: bool,
+
+ logged: LoggerFlags,
+ tx_data: applayer::AppLayerTxData,
+}
+
+impl MQTTTransaction {
+ pub fn new(msg: MQTTMessage, direction: Direction) -> MQTTTransaction {
+ let mut m = MQTTTransaction::new_empty(direction);
+ m.msg.push(msg);
+ return m;
+ }
+
+ pub fn new_empty(direction: Direction) -> MQTTTransaction {
+ return MQTTTransaction {
+ tx_id: 0,
+ pkt_id: None,
+ complete: false,
+ logged: LoggerFlags::new(),
+ msg: Vec::new(),
+ toclient: direction.is_to_client(),
+ toserver: direction.is_to_server(),
+ tx_data: applayer::AppLayerTxData::for_direction(direction),
+ };
+ }
+}
+
+impl Transaction for MQTTTransaction {
+ fn id(&self) -> u64 {
+ self.tx_id
+ }
+}
+
+pub struct MQTTState {
+ state_data: AppLayerStateData,
+ tx_id: u64,
+ pub protocol_version: u8,
+ transactions: VecDeque<MQTTTransaction>,
+ connected: bool,
+ skip_request: usize,
+ skip_response: usize,
+ max_msg_len: usize,
+ tx_index_completed: usize,
+}
+
+impl State<MQTTTransaction> for MQTTState {
+ fn get_transaction_count(&self) -> usize {
+ self.transactions.len()
+ }
+
+ fn get_transaction_by_index(&self, index: usize) -> Option<&MQTTTransaction> {
+ self.transactions.get(index)
+ }
+}
+
+impl Default for MQTTState {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MQTTState {
+ pub fn new() -> Self {
+ Self {
+ state_data: AppLayerStateData::new(),
+ tx_id: 0,
+ protocol_version: 0,
+ transactions: VecDeque::new(),
+ connected: false,
+ skip_request: 0,
+ skip_response: 0,
+ max_msg_len: unsafe { MAX_MSG_LEN as usize },
+ tx_index_completed: 0,
+ }
+ }
+
+ fn free_tx(&mut self, tx_id: u64) {
+ let len = self.transactions.len();
+ let mut found = false;
+ let mut index = 0;
+ for i in 0..len {
+ let tx = &self.transactions[i];
+ if tx.tx_id == tx_id + 1 {
+ found = true;
+ index = i;
+ break;
+ }
+ }
+ if found {
+ self.tx_index_completed = 0;
+ self.transactions.remove(index);
+ }
+ }
+
+ pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
+ self.transactions.iter().find(|tx| tx.tx_id == tx_id + 1)
+ }
+
+ pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
+ for tx in &mut self.transactions.range_mut(self.tx_index_completed..) {
+ if !tx.complete {
+ if let Some(mpktid) = tx.pkt_id {
+ if mpktid == pkt_id {
+ return Some(tx);
+ }
+ }
+ }
+ }
+ return None;
+ }
+
+ fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
+ let direction = if toclient {
+ Direction::ToClient
+ } else {
+ Direction::ToServer
+ };
+ let mut tx = MQTTTransaction::new(msg, direction);
+ self.tx_id += 1;
+ tx.tx_id = self.tx_id;
+ if self.transactions.len() > unsafe { MQTT_MAX_TX } {
+ let mut index = self.tx_index_completed;
+ for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
+ index += 1;
+ if !tx_old.complete {
+ tx_old.complete = true;
+ MQTTState::set_event(tx_old, MQTTEvent::TooManyTransactions);
+ break;
+ }
+ }
+ self.tx_index_completed = index;
+ }
+ return tx;
+ }
+
+ // Handle a MQTT message depending on the direction and state.
+ // Note that we are trying to only have one mutable reference to msg
+ // and its components, however, since we are in a large match operation,
+ // we cannot pass around and/or store more references or move things
+ // without having to introduce lifetimes etc.
+ // This is the reason for the code duplication below. Maybe there is a
+ // more concise way to do it, but this works for now.
+ fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
+ match msg.op {
+ MQTTOperation::CONNECT(ref conn) => {
+ self.protocol_version = conn.protocol_version;
+ let mut tx = self.new_tx(msg, toclient);
+ tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
+ if self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
+ }
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::PUBLISH(ref publish) => {
+ let qos = msg.header.qos_level;
+ let pkt_id = publish.message_id;
+ let mut tx = self.new_tx(msg, toclient);
+ match qos {
+ 0 => {
+ // with QOS level 0, we do not need to wait for a
+ // response
+ tx.complete = true;
+ }
+ 1..=2 => {
+ if let Some(pkt_id) = pkt_id {
+ tx.pkt_id = Some(pkt_id as u32);
+ } else {
+ MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
+ }
+ }
+ _ => {
+ MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
+ }
+ }
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::SUBSCRIBE(ref subscribe) => {
+ let pkt_id = subscribe.message_id as u32;
+ let qos = msg.header.qos_level;
+ let mut tx = self.new_tx(msg, toclient);
+ match qos {
+ 0 => {
+ // with QOS level 0, we do not need to wait for a
+ // response
+ tx.complete = true;
+ }
+ 1..=2 => {
+ tx.pkt_id = Some(pkt_id);
+ }
+ _ => {
+ MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
+ }
+ }
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
+ let pkt_id = unsubscribe.message_id as u32;
+ let qos = msg.header.qos_level;
+ let mut tx = self.new_tx(msg, toclient);
+ match qos {
+ 0 => {
+ // with QOS level 0, we do not need to wait for a
+ // response
+ tx.complete = true;
+ }
+ 1..=2 => {
+ tx.pkt_id = Some(pkt_id);
+ }
+ _ => {
+ MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
+ }
+ }
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::CONNACK(ref _connack) => {
+ if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
+ tx.msg.push(msg);
+ tx.complete = true;
+ tx.pkt_id = None;
+ self.connected = true;
+ } else {
+ let mut tx = self.new_tx(msg, toclient);
+ MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
+ tx.complete = true;
+ self.transactions.push_back(tx);
+ }
+ }
+ MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
+ if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
+ tx.msg.push(msg);
+ } else {
+ let mut tx = self.new_tx(msg, toclient);
+ MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
+ self.transactions.push_back(tx);
+ }
+ }
+ MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
+ if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
+ tx.msg.push(msg);
+ tx.complete = true;
+ tx.pkt_id = None;
+ } else {
+ let mut tx = self.new_tx(msg, toclient);
+ MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
+ self.transactions.push_back(tx);
+ }
+ }
+ MQTTOperation::SUBACK(ref suback) => {
+ if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
+ tx.msg.push(msg);
+ tx.complete = true;
+ tx.pkt_id = None;
+ } else {
+ let mut tx = self.new_tx(msg, toclient);
+ MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
+ self.transactions.push_back(tx);
+ }
+ }
+ MQTTOperation::UNSUBACK(ref unsuback) => {
+ if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
+ tx.msg.push(msg);
+ tx.complete = true;
+ tx.pkt_id = None;
+ } else {
+ let mut tx = self.new_tx(msg, toclient);
+ MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ tx.complete = true;
+ self.transactions.push_back(tx);
+ }
+ }
+ MQTTOperation::UNASSIGNED => {
+ let mut tx = self.new_tx(msg, toclient);
+ tx.complete = true;
+ MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::TRUNCATED(_) => {
+ let mut tx = self.new_tx(msg, toclient);
+ tx.complete = true;
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
+ let mut tx = self.new_tx(msg, toclient);
+ tx.complete = true;
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ self.transactions.push_back(tx);
+ }
+ MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
+ let mut tx = self.new_tx(msg, toclient);
+ tx.complete = true;
+ if !self.connected {
+ MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
+ }
+ self.transactions.push_back(tx);
+ }
+ }
+ }
+
+ fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
+ let input = stream_slice.as_slice();
+ let mut current = input;
+
+ if input.is_empty() {
+ return AppLayerResult::ok();
+ }
+
+ let mut consumed = 0;
+ SCLogDebug!(
+ "skip_request {} input len {}",
+ self.skip_request,
+ input.len()
+ );
+ if self.skip_request > 0 {
+ if input.len() <= self.skip_request {
+ SCLogDebug!("reducing skip_request by {}", input.len());
+ self.skip_request -= input.len();
+ return AppLayerResult::ok();
+ } else {
+ current = &input[self.skip_request..];
+ SCLogDebug!(
+ "skip end reached, skipping {} :{:?}",
+ self.skip_request,
+ current
+ );
+ consumed = self.skip_request;
+ self.skip_request = 0;
+ }
+ }
+
+ while !current.is_empty() {
+ SCLogDebug!("request: handling {}", current.len());
+ match parse_message(current, self.protocol_version, self.max_msg_len) {
+ Ok((rem, msg)) => {
+ let _pdu = Frame::new(
+ flow,
+ &stream_slice,
+ input,
+ current.len() as i64,
+ MQTTFrameType::Pdu as u8,
+ );
+ SCLogDebug!("request msg {:?}", msg);
+ if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
+ SCLogDebug!(
+ "found truncated with skipped {} current len {}",
+ trunc.skipped_length,
+ current.len()
+ );
+ if trunc.skipped_length >= current.len() {
+ self.skip_request = trunc.skipped_length - current.len();
+ self.handle_msg(msg, true);
+ return AppLayerResult::ok();
+ } else {
+ consumed += trunc.skipped_length;
+ current = &current[trunc.skipped_length..];
+ self.handle_msg(msg, true);
+ self.skip_request = 0;
+ continue;
+ }
+ }
+
+ self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg);
+ self.handle_msg(msg, false);
+ consumed += current.len() - rem.len();
+ current = rem;
+ }
+ Err(Err::Incomplete(_)) => {
+ SCLogDebug!(
+ "incomplete request: consumed {} needed {} (input len {})",
+ consumed,
+ (current.len() + 1),
+ input.len()
+ );
+ return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
+ }
+ Err(_) => {
+ self.set_event_notx(MQTTEvent::MalformedTraffic, false);
+ return AppLayerResult::err();
+ }
+ }
+ }
+
+ return AppLayerResult::ok();
+ }
+
+ fn parse_response(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
+ let input = stream_slice.as_slice();
+ let mut current = input;
+
+ if input.is_empty() {
+ return AppLayerResult::ok();
+ }
+
+ let mut consumed = 0;
+ SCLogDebug!(
+ "skip_response {} input len {}",
+ self.skip_response,
+ current.len()
+ );
+ if self.skip_response > 0 {
+ if input.len() <= self.skip_response {
+ self.skip_response -= current.len();
+ return AppLayerResult::ok();
+ } else {
+ current = &input[self.skip_response..];
+ SCLogDebug!(
+ "skip end reached, skipping {} :{:?}",
+ self.skip_request,
+ current
+ );
+ consumed = self.skip_response;
+ self.skip_response = 0;
+ }
+ }
+
+ while !current.is_empty() {
+ SCLogDebug!("response: handling {}", current.len());
+ match parse_message(current, self.protocol_version, self.max_msg_len) {
+ Ok((rem, msg)) => {
+ let _pdu = Frame::new(
+ flow,
+ &stream_slice,
+ input,
+ input.len() as i64,
+ MQTTFrameType::Pdu as u8,
+ );
+
+ SCLogDebug!("response msg {:?}", msg);
+ if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
+ SCLogDebug!(
+ "found truncated with skipped {} current len {}",
+ trunc.skipped_length,
+ current.len()
+ );
+ if trunc.skipped_length >= current.len() {
+ self.skip_response = trunc.skipped_length - current.len();
+ self.handle_msg(msg, true);
+ SCLogDebug!("skip_response now {}", self.skip_response);
+ return AppLayerResult::ok();
+ } else {
+ consumed += trunc.skipped_length;
+ current = &current[trunc.skipped_length..];
+ self.handle_msg(msg, true);
+ self.skip_response = 0;
+ continue;
+ }
+ }
+
+ self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg);
+ self.handle_msg(msg, true);
+ consumed += current.len() - rem.len();
+ current = rem;
+ }
+ Err(Err::Incomplete(_)) => {
+ SCLogDebug!(
+ "incomplete response: consumed {} needed {} (input len {})",
+ consumed,
+ (current.len() + 1),
+ input.len()
+ );
+ return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
+ }
+ Err(_) => {
+ self.set_event_notx(MQTTEvent::MalformedTraffic, true);
+ return AppLayerResult::err();
+ }
+ }
+ }
+
+ return AppLayerResult::ok();
+ }
+
+ fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
+ tx.tx_data.set_event(event as u8);
+ }
+
+ fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
+ let mut tx = MQTTTransaction::new_empty(if toclient {
+ Direction::ToClient
+ } else {
+ Direction::ToServer
+ });
+ self.tx_id += 1;
+ tx.tx_id = self.tx_id;
+ if toclient {
+ tx.toclient = true;
+ } else {
+ tx.toserver = true;
+ }
+ tx.complete = true;
+ tx.tx_data.set_event(event as u8);
+ self.transactions.push_back(tx);
+ }
+
+ fn mqtt_hdr_and_data_frames(
+ &mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTMessage,
+ ) {
+ let hdr = stream_slice.as_slice();
+ //MQTT payload has a fixed header of 2 bytes
+ let _mqtt_hdr = Frame::new(flow, stream_slice, hdr, 2, MQTTFrameType::Header as u8);
+ SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr);
+ let rem_length = input.header.remaining_length as usize;
+ let data = &hdr[2..rem_length + 2];
+ let _mqtt_data = Frame::new(
+ flow,
+ stream_slice,
+ data,
+ rem_length as i64,
+ MQTTFrameType::Data as u8,
+ );
+ SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data);
+ }
+}
+
+// C exports.
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_probing_parser(
+ _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8,
+) -> AppProto {
+ let buf = build_slice!(input, input_len as usize);
+ match parse_fixed_header(buf) {
+ Ok((_, hdr)) => {
+ // reject unassigned message type
+ if hdr.message_type == MQTTTypeCode::UNASSIGNED {
+ return ALPROTO_FAILED;
+ }
+ // with 2 being the highest valid QoS level
+ if hdr.qos_level > 2 {
+ return ALPROTO_FAILED;
+ }
+ return ALPROTO_MQTT;
+ }
+ Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
+ Err(_) => ALPROTO_FAILED,
+ }
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_state_new(
+ _orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto,
+) -> *mut std::os::raw::c_void {
+ let state = MQTTState::new();
+ let boxed = Box::new(state);
+ return Box::into_raw(boxed) as *mut _;
+}
+
+#[no_mangle]
+pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
+ std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) });
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
+ let state = cast_pointer!(state, MQTTState);
+ state.free_tx(tx_id);
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_parse_request(
+ flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
+ stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
+) -> AppLayerResult {
+ let state = cast_pointer!(state, MQTTState);
+ return state.parse_request(flow, stream_slice);
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_parse_response(
+ flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
+ stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
+) -> AppLayerResult {
+ let state = cast_pointer!(state, MQTTState);
+ return state.parse_response(flow, stream_slice);
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_state_get_tx(
+ state: *mut std::os::raw::c_void, tx_id: u64,
+) -> *mut std::os::raw::c_void {
+ let state = cast_pointer!(state, MQTTState);
+ match state.get_tx(tx_id) {
+ Some(tx) => {
+ return tx as *const _ as *mut _;
+ }
+ None => {
+ return std::ptr::null_mut();
+ }
+ }
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
+ let state = cast_pointer!(state, MQTTState);
+ return state.tx_id;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(
+ tx: *const std::os::raw::c_void,
+) -> std::os::raw::c_int {
+ let tx = cast_pointer!(tx, MQTTTransaction);
+ if tx.toclient {
+ return 1;
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
+ tx: *mut std::os::raw::c_void, direction: u8,
+) -> std::os::raw::c_int {
+ let tx = cast_pointer!(tx, MQTTTransaction);
+ match direction.into() {
+ Direction::ToServer => {
+ if tx.complete || tx.toclient {
+ return 1;
+ }
+ }
+ Direction::ToClient => {
+ if tx.complete || tx.toserver {
+ return 1;
+ }
+ }
+ }
+ return 0;
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
+ _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void,
+) -> u32 {
+ let tx = cast_pointer!(tx, MQTTTransaction);
+ return tx.logged.get();
+}
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
+ _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, logged: u32,
+) {
+ let tx = cast_pointer!(tx, MQTTTransaction);
+ tx.logged.set(logged);
+}
+
+// Parser name as a C style string.
+const PARSER_NAME: &[u8] = b"mqtt\0";
+
+export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
+export_state_data_get!(rs_mqtt_get_state_data, MQTTState);
+
+#[no_mangle]
+pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
+ let default_port = CString::new("[1883]").unwrap();
+ let max_msg_len = &mut MAX_MSG_LEN;
+ *max_msg_len = cfg_max_msg_len;
+ let parser = RustParser {
+ name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
+ default_port: default_port.as_ptr(),
+ ipproto: IPPROTO_TCP,
+ probe_ts: Some(rs_mqtt_probing_parser),
+ probe_tc: Some(rs_mqtt_probing_parser),
+ min_depth: 0,
+ max_depth: 16,
+ state_new: rs_mqtt_state_new,
+ state_free: rs_mqtt_state_free,
+ tx_free: rs_mqtt_state_tx_free,
+ parse_ts: rs_mqtt_parse_request,
+ parse_tc: rs_mqtt_parse_response,
+ get_tx_count: rs_mqtt_state_get_tx_count,
+ get_tx: rs_mqtt_state_get_tx,
+ tx_comp_st_ts: 1,
+ tx_comp_st_tc: 1,
+ tx_get_progress: rs_mqtt_tx_get_alstate_progress,
+ get_eventinfo: Some(MQTTEvent::get_event_info),
+ get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id),
+ localstorage_new: None,
+ localstorage_free: None,
+ get_tx_files: None,
+ get_tx_iterator: Some(crate::applayer::state_get_tx_iterator::<MQTTState, MQTTTransaction>),
+ get_tx_data: rs_mqtt_get_tx_data,
+ get_state_data: rs_mqtt_get_state_data,
+ apply_tx_config: None,
+ flags: 0,
+ truncate: None,
+ get_frame_id_by_name: Some(MQTTFrameType::ffi_id_from_name),
+ get_frame_name_by_id: Some(MQTTFrameType::ffi_name_from_id),
+ };
+
+ let ip_proto_str = CString::new("tcp").unwrap();
+
+ if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
+ let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
+ ALPROTO_MQTT = alproto;
+ if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
+ let _ = AppLayerRegisterParser(&parser, alproto);
+ }
+ if let Some(val) = conf_get("app-layer.protocols.mqtt.max-tx") {
+ if let Ok(v) = val.parse::<usize>() {
+ MQTT_MAX_TX = v;
+ } else {
+ SCLogError!("Invalid value for mqtt.max-tx");
+ }
+ }
+ } else {
+ SCLogDebug!("Protocol detector and parser disabled for MQTT.");
+ }
+}
diff --git a/rust/src/mqtt/mqtt_message.rs b/rust/src/mqtt/mqtt_message.rs
new file mode 100644
index 0000000..390fc9e
--- /dev/null
+++ b/rust/src/mqtt/mqtt_message.rs
@@ -0,0 +1,205 @@
+/* Copyright (C) 2020 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+// written by Sascha Steinbiss <sascha@steinbiss.name>
+
+use crate::mqtt::mqtt_property::*;
+use crate::mqtt::parser::*;
+use std::fmt;
+
+#[derive(Debug)]
+pub struct MQTTMessage {
+ pub header: FixedHeader,
+ pub op: MQTTOperation,
+}
+
+#[derive(Debug)]
+pub enum MQTTOperation {
+ UNASSIGNED,
+ CONNECT(MQTTConnectData),
+ CONNACK(MQTTConnackData),
+ PUBLISH(MQTTPublishData),
+ PUBACK(MQTTMessageIdOnly),
+ PUBREC(MQTTMessageIdOnly),
+ PUBREL(MQTTMessageIdOnly),
+ PUBCOMP(MQTTMessageIdOnly),
+ SUBSCRIBE(MQTTSubscribeData),
+ SUBACK(MQTTSubackData),
+ UNSUBSCRIBE(MQTTUnsubscribeData),
+ UNSUBACK(MQTTUnsubackData),
+ AUTH(MQTTAuthData),
+ PINGREQ,
+ PINGRESP,
+ DISCONNECT(MQTTDisconnectData),
+ // TRUNCATED is special, representing a message that was not parsed
+ // in its entirety due to size constraints. There is no equivalent in
+ // the MQTT specification.
+ TRUNCATED(MQTTTruncatedData),
+}
+
+#[repr(u8)]
+#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, FromPrimitive, Debug)]
+pub enum MQTTTypeCode {
+ UNASSIGNED = 0,
+ CONNECT = 1,
+ CONNACK = 2,
+ PUBLISH = 3,
+ PUBACK = 4,
+ PUBREC = 5,
+ PUBREL = 6,
+ PUBCOMP = 7,
+ SUBSCRIBE = 8,
+ SUBACK = 9,
+ UNSUBSCRIBE = 10,
+ UNSUBACK = 11,
+ PINGREQ = 12,
+ PINGRESP = 13,
+ DISCONNECT = 14,
+ AUTH = 15,
+}
+
+impl MQTTTypeCode {
+ pub fn to_lower_str(&self) -> String {
+ self.to_string().to_lowercase()
+ }
+}
+
+impl fmt::Display for MQTTTypeCode {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{:?}", self)
+ }
+}
+
+impl std::str::FromStr for MQTTTypeCode {
+ type Err = String;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let su = s.to_uppercase();
+ let su_slice: &str = &su;
+ match su_slice {
+ "CONNECT" => Ok(MQTTTypeCode::CONNECT),
+ "CONNACK" => Ok(MQTTTypeCode::CONNACK),
+ "PUBLISH" => Ok(MQTTTypeCode::PUBLISH),
+ "PUBACK" => Ok(MQTTTypeCode::PUBACK),
+ "PUBREC" => Ok(MQTTTypeCode::PUBREC),
+ "PUBREL" => Ok(MQTTTypeCode::PUBREL),
+ "PUBCOMP" => Ok(MQTTTypeCode::PUBCOMP),
+ "SUBSCRIBE" => Ok(MQTTTypeCode::SUBSCRIBE),
+ "SUBACK" => Ok(MQTTTypeCode::SUBACK),
+ "UNSUBSCRIBE" => Ok(MQTTTypeCode::UNSUBSCRIBE),
+ "UNSUBACK" => Ok(MQTTTypeCode::UNSUBACK),
+ "PINGREQ" => Ok(MQTTTypeCode::PINGREQ),
+ "PINGRESP" => Ok(MQTTTypeCode::PINGRESP),
+ "DISCONNECT" => Ok(MQTTTypeCode::DISCONNECT),
+ "AUTH" => Ok(MQTTTypeCode::AUTH),
+ _ => Err(format!("'{}' is not a valid value for MQTTTypeCode", s)),
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct MQTTConnectData {
+ pub protocol_string: String,
+ pub protocol_version: u8,
+ pub username_flag: bool,
+ pub password_flag: bool,
+ pub will_retain: bool,
+ pub will_qos: u8,
+ pub will_flag: bool,
+ pub clean_session: bool,
+ pub keepalive: u16,
+ pub client_id: String,
+ pub will_topic: Option<String>,
+ pub will_message: Option<Vec<u8>>,
+ pub username: Option<String>,
+ pub password: Option<Vec<u8>>,
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+ pub will_properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTConnackData {
+ pub return_code: u8,
+ pub session_present: bool, // MQTT 3.1.1
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTPublishData {
+ pub topic: String,
+ pub message_id: Option<u16>,
+ pub message: Vec<u8>,
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTMessageIdOnly {
+ pub message_id: u16,
+ pub reason_code: Option<u8>, // MQTT 5.0
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTSubscribeTopicData {
+ pub topic_name: String,
+ pub qos: u8,
+}
+
+#[derive(Debug)]
+pub struct MQTTSubscribeData {
+ pub message_id: u16,
+ pub topics: Vec<MQTTSubscribeTopicData>,
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTSubackData {
+ pub message_id: u16,
+ pub qoss: Vec<u8>,
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTUnsubscribeData {
+ pub message_id: u16,
+ pub topics: Vec<String>,
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTUnsubackData {
+ pub message_id: u16,
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+ pub reason_codes: Option<Vec<u8>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTAuthData {
+ pub reason_code: u8, // MQTT 5.0
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTDisconnectData {
+ pub reason_code: Option<u8>, // MQTT 5.0
+ pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0
+}
+
+#[derive(Debug)]
+pub struct MQTTTruncatedData {
+ pub original_message_type: MQTTTypeCode,
+ pub skipped_length: usize,
+}
diff --git a/rust/src/mqtt/mqtt_property.rs b/rust/src/mqtt/mqtt_property.rs
new file mode 100644
index 0000000..a716c4b
--- /dev/null
+++ b/rust/src/mqtt/mqtt_property.rs
@@ -0,0 +1,269 @@
+/* Copyright (C) 2020 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+// written by Sascha Steinbiss <sascha@steinbiss.name>
+
+use crate::jsonbuilder::{JsonBuilder, JsonError};
+use crate::mqtt::parser::*;
+use nom7::number::streaming::*;
+use nom7::IResult;
+
+// TODO: It might be useful to also add detection on property presence and
+// content, e.g. mqtt.property: AUTHENTICATION_METHOD.
+#[derive(Debug, PartialEq, PartialOrd)]
+#[allow(non_camel_case_types)]
+pub enum MQTTProperty {
+ UNKNOWN,
+ PAYLOAD_FORMAT_INDICATOR(u8),
+ MESSAGE_EXPIRY_INTERVAL(u32),
+ CONTENT_TYPE(String),
+ RESPONSE_TOPIC(String),
+ CORRELATION_DATA(Vec<u8>),
+ SUBSCRIPTION_IDENTIFIER(u32),
+ SESSION_EXPIRY_INTERVAL(u32),
+ ASSIGNED_CLIENT_IDENTIFIER(String),
+ SERVER_KEEP_ALIVE(u16),
+ AUTHENTICATION_METHOD(String),
+ AUTHENTICATION_DATA(Vec<u8>),
+ REQUEST_PROBLEM_INFORMATION(u8),
+ WILL_DELAY_INTERVAL(u32),
+ REQUEST_RESPONSE_INFORMATION(u8),
+ RESPONSE_INFORMATION(String),
+ SERVER_REFERENCE(String),
+ REASON_STRING(String),
+ RECEIVE_MAXIMUM(u16),
+ TOPIC_ALIAS_MAXIMUM(u16),
+ TOPIC_ALIAS(u16),
+ MAXIMUM_QOS(u8),
+ RETAIN_AVAILABLE(u8),
+ USER_PROPERTY((String, String)),
+ MAXIMUM_PACKET_SIZE(u32),
+ WILDCARD_SUBSCRIPTION_AVAILABLE(u8),
+ SUBSCRIPTION_IDENTIFIER_AVAILABLE(u8),
+ SHARED_SUBSCRIPTION_AVAILABLE(u8),
+}
+
+impl crate::mqtt::mqtt_property::MQTTProperty {
+ pub fn set_json(&self, js: &mut JsonBuilder) -> Result<(), JsonError> {
+ match self {
+ crate::mqtt::mqtt_property::MQTTProperty::PAYLOAD_FORMAT_INDICATOR(v) => {
+ js.set_uint("payload_format_indicator", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::MESSAGE_EXPIRY_INTERVAL(v) => {
+ js.set_uint("message_expiry_interval", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::CONTENT_TYPE(v) => {
+ js.set_string("content_type", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::RESPONSE_TOPIC(v) => {
+ js.set_string("response_topic", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::CORRELATION_DATA(v) => {
+ js.set_string_from_bytes("correlation_data", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::SUBSCRIPTION_IDENTIFIER(v) => {
+ js.set_uint("subscription_identifier", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::SESSION_EXPIRY_INTERVAL(v) => {
+ js.set_uint("session_expiry_interval", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(v) => {
+ js.set_string("assigned_client_identifier", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::SERVER_KEEP_ALIVE(v) => {
+ js.set_uint("server_keep_alive", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::AUTHENTICATION_METHOD(v) => {
+ js.set_string("authentication_method", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::AUTHENTICATION_DATA(v) => {
+ js.set_string_from_bytes("authentication_data", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::REQUEST_PROBLEM_INFORMATION(v) => {
+ js.set_uint("request_problem_information", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::WILL_DELAY_INTERVAL(v) => {
+ js.set_uint("will_delay_interval", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::REQUEST_RESPONSE_INFORMATION(v) => {
+ js.set_uint("request_response_information", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::RESPONSE_INFORMATION(v) => {
+ js.set_string("response_information", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::SERVER_REFERENCE(v) => {
+ js.set_string("server_reference", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::REASON_STRING(v) => {
+ js.set_string("reason_string", v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::RECEIVE_MAXIMUM(v) => {
+ js.set_uint("receive_maximum", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::TOPIC_ALIAS_MAXIMUM(v) => {
+ js.set_uint("topic_alias_maximum", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::TOPIC_ALIAS(v) => {
+ js.set_uint("topic_alias", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::MAXIMUM_QOS(v) => {
+ js.set_uint("maximum_qos", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::RETAIN_AVAILABLE(v) => {
+ js.set_uint("retain_available", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::USER_PROPERTY((k, v)) => {
+ js.set_string(k, v)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::MAXIMUM_PACKET_SIZE(v) => {
+ js.set_uint("maximum_packet_size", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::WILDCARD_SUBSCRIPTION_AVAILABLE(v) => {
+ js.set_uint("wildcard_subscription_available", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::SUBSCRIPTION_IDENTIFIER_AVAILABLE(v) => {
+ js.set_uint("subscription_identifier_available", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::SHARED_SUBSCRIPTION_AVAILABLE(v) => {
+ js.set_uint("shared_subscription_available", *v as u64)?;
+ }
+ crate::mqtt::mqtt_property::MQTTProperty::UNKNOWN => {
+ // pass
+ }
+ }
+ Ok(())
+ }
+}
+
+#[inline]
+pub fn parse_qualified_property(input: &[u8], identifier: u32) -> IResult<&[u8], MQTTProperty> {
+ match identifier {
+ 1 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::PAYLOAD_FORMAT_INDICATOR(val))),
+ Err(e) => return Err(e),
+ },
+ 2 => match be_u32(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::MESSAGE_EXPIRY_INTERVAL(val))),
+ Err(e) => return Err(e),
+ },
+ 3 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::CONTENT_TYPE(val))),
+ Err(e) => return Err(e),
+ },
+ 8 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::RESPONSE_TOPIC(val))),
+ Err(e) => return Err(e),
+ },
+ 9 => match parse_mqtt_binary_data(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::CORRELATION_DATA(val))),
+ Err(e) => return Err(e),
+ },
+ 11 => match parse_mqtt_variable_integer(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::SUBSCRIPTION_IDENTIFIER(val))),
+ Err(e) => return Err(e),
+ },
+ 17 => match be_u32(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::SESSION_EXPIRY_INTERVAL(val))),
+ Err(e) => return Err(e),
+ },
+ 18 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(val))),
+ Err(e) => return Err(e),
+ },
+ 19 => match be_u16(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::SERVER_KEEP_ALIVE(val))),
+ Err(e) => return Err(e),
+ },
+ 21 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::AUTHENTICATION_METHOD(val))),
+ Err(e) => return Err(e),
+ },
+ 22 => match parse_mqtt_binary_data(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::AUTHENTICATION_DATA(val))),
+ Err(e) => return Err(e),
+ },
+ 23 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::REQUEST_PROBLEM_INFORMATION(val))),
+ Err(e) => return Err(e),
+ },
+ 24 => match be_u32(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::WILL_DELAY_INTERVAL(val))),
+ Err(e) => return Err(e),
+ },
+ 25 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::REQUEST_RESPONSE_INFORMATION(val))),
+ Err(e) => return Err(e),
+ },
+ 26 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::RESPONSE_INFORMATION(val))),
+ Err(e) => return Err(e),
+ },
+ 28 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::SERVER_REFERENCE(val))),
+ Err(e) => return Err(e),
+ },
+ 31 => match parse_mqtt_string(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::REASON_STRING(val))),
+ Err(e) => return Err(e),
+ },
+ 33 => match be_u16(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::RECEIVE_MAXIMUM(val))),
+ Err(e) => return Err(e),
+ },
+ 34 => match be_u16(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::TOPIC_ALIAS_MAXIMUM(val))),
+ Err(e) => return Err(e),
+ },
+ 35 => match be_u16(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::TOPIC_ALIAS(val))),
+ Err(e) => return Err(e),
+ },
+ 36 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::MAXIMUM_QOS(val))),
+ Err(e) => return Err(e),
+ },
+ 37 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::RETAIN_AVAILABLE(val))),
+ Err(e) => return Err(e),
+ },
+ 38 => match parse_mqtt_string_pair(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::USER_PROPERTY(val))),
+ Err(e) => return Err(e),
+ },
+ 39 => match be_u32(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::MAXIMUM_PACKET_SIZE(val))),
+ Err(e) => return Err(e),
+ },
+ 40 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::WILDCARD_SUBSCRIPTION_AVAILABLE(val))),
+ Err(e) => return Err(e),
+ },
+ 41 => match be_u8(input) {
+ Ok((rem, val)) => {
+ return Ok((rem, MQTTProperty::SUBSCRIPTION_IDENTIFIER_AVAILABLE(val)))
+ }
+ Err(e) => return Err(e),
+ },
+ 42 => match be_u8(input) {
+ Ok((rem, val)) => return Ok((rem, MQTTProperty::SHARED_SUBSCRIPTION_AVAILABLE(val))),
+ Err(e) => return Err(e),
+ },
+ _ => {
+ return Ok((input, MQTTProperty::UNKNOWN));
+ }
+ }
+}
diff --git a/rust/src/mqtt/parser.rs b/rust/src/mqtt/parser.rs
new file mode 100644
index 0000000..8b1c8c5
--- /dev/null
+++ b/rust/src/mqtt/parser.rs
@@ -0,0 +1,1135 @@
+/* Copyright (C) 2020-2022 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+// written by Sascha Steinbiss <sascha@steinbiss.name>
+
+use crate::common::nom7::bits;
+use crate::mqtt::mqtt_message::*;
+use crate::mqtt::mqtt_property::*;
+use nom7::bits::streaming::take as take_bits;
+use nom7::bytes::complete::take;
+use nom7::bytes::streaming::take_while_m_n;
+use nom7::combinator::{complete, cond, verify};
+use nom7::multi::{length_data, many0, many1};
+use nom7::number::streaming::*;
+use nom7::sequence::tuple;
+use nom7::{Err, IResult, Needed};
+use num_traits::FromPrimitive;
+
+#[derive(Copy, Clone, Debug)]
+pub struct FixedHeader {
+ pub message_type: MQTTTypeCode,
+ pub dup_flag: bool,
+ pub qos_level: u8,
+ pub retain: bool,
+ pub remaining_length: u32,
+}
+
+// PARSING HELPERS
+
+#[inline]
+fn is_continuation_bit_set(b: u8) -> bool {
+ return (b & 128) != 0;
+}
+
+#[inline]
+fn convert_varint(continued: Vec<u8>, last: u8) -> u32 {
+ let mut multiplier = 1u32;
+ let mut value = 0u32;
+ for val in &continued {
+ value += (val & 127) as u32 * multiplier;
+ multiplier *= 128u32;
+ }
+ value += (last & 127) as u32 * multiplier;
+ return value;
+}
+
+// DATA TYPES
+
+#[inline]
+pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> {
+ let (i, content) = length_data(be_u16)(i)?;
+ Ok((i, String::from_utf8_lossy(content).to_string()))
+}
+
+#[inline]
+pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> {
+ let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?;
+ let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?;
+ Ok((
+ i,
+ convert_varint(continued_part.to_vec(), non_continued_part),
+ ))
+}
+
+#[inline]
+pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> {
+ let (i, data) = length_data(be_u16)(i)?;
+ Ok((i, data.to_vec()))
+}
+
+#[inline]
+pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> {
+ let (i, name) = parse_mqtt_string(i)?;
+ let (i, value) = parse_mqtt_string(i)?;
+ Ok((i, (name, value)))
+}
+
+// MESSAGE COMPONENTS
+
+#[inline]
+fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> {
+ let (i, identifier) = parse_mqtt_variable_integer(i)?;
+ let (i, value) = parse_qualified_property(i, identifier)?;
+ Ok((i, value))
+}
+
+#[inline]
+fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> {
+ // do not try to parse anything when precondition is not met
+ if !precond {
+ return Ok((input, None));
+ }
+ // parse properties length
+ match parse_mqtt_variable_integer(input) {
+ Ok((rem, proplen)) => {
+ if proplen == 0 {
+ // no properties
+ return Ok((rem, None));
+ }
+ // parse properties
+ let mut props = Vec::<MQTTProperty>::new();
+ let (rem, mut newrem) = take(proplen as usize)(rem)?;
+ while !newrem.is_empty() {
+ match parse_property(newrem) {
+ Ok((rem2, val)) => {
+ props.push(val);
+ newrem = rem2;
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ return Ok((rem, Some(props)));
+ }
+ Err(e) => return Err(e),
+ }
+}
+
+#[inline]
+fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> {
+ bits(tuple((
+ take_bits(4u8),
+ take_bits(1u8),
+ take_bits(2u8),
+ take_bits(1u8),
+ )))(i)
+}
+
+#[inline]
+fn parse_message_type(code: u8) -> MQTTTypeCode {
+ match code {
+ 0..=15 => {
+ if let Some(t) = FromPrimitive::from_u8(code) {
+ return t;
+ } else {
+ return MQTTTypeCode::UNASSIGNED;
+ }
+ }
+ _ => {
+ // unreachable state in parser: we only pass values parsed from take_bits!(4u8)
+ debug_validate_fail!("can't have message codes >15 from 4 bits");
+ MQTTTypeCode::UNASSIGNED
+ }
+ }
+}
+
+#[inline]
+pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> {
+ let (i, flags) = parse_fixed_header_flags(i)?;
+ let (i, remaining_length) = parse_mqtt_variable_integer(i)?;
+ Ok((
+ i,
+ FixedHeader {
+ message_type: parse_message_type(flags.0),
+ dup_flag: flags.1 != 0,
+ qos_level: flags.2,
+ retain: flags.3 != 0,
+ remaining_length,
+ },
+ ))
+}
+
+#[inline]
+#[allow(clippy::type_complexity)]
+fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> {
+ bits(tuple((
+ take_bits(1u8),
+ take_bits(1u8),
+ take_bits(1u8),
+ take_bits(2u8),
+ take_bits(1u8),
+ take_bits(1u8),
+ take_bits(1u8),
+ )))(i)
+}
+
+#[inline]
+fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> {
+ let (i, protocol_string) = parse_mqtt_string(i)?;
+ let (i, protocol_version) = be_u8(i)?;
+ let (i, flags) = parse_connect_variable_flags(i)?;
+ let (i, keepalive) = be_u16(i)?;
+ let (i, properties) = parse_properties(i, protocol_version == 5)?;
+ let (i, client_id) = parse_mqtt_string(i)?;
+ let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?;
+ let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?;
+ let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?;
+ let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?;
+ let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?;
+ Ok((
+ i,
+ MQTTConnectData {
+ protocol_string,
+ protocol_version,
+ username_flag: flags.0 != 0,
+ password_flag: flags.1 != 0,
+ will_retain: flags.2 != 0,
+ will_qos: flags.3,
+ will_flag: flags.4 != 0,
+ clean_session: flags.5 != 0,
+ keepalive,
+ client_id,
+ will_topic,
+ will_message,
+ username,
+ password,
+ properties,
+ will_properties,
+ },
+ ))
+}
+
+#[inline]
+fn parse_connack(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTConnackData> {
+ move |i: &[u8]| {
+ let (i, topic_name_compression_response) = be_u8(i)?;
+ let (i, return_code) = be_u8(i)?;
+ let (i, properties) = parse_properties(i, protocol_version == 5)?;
+ Ok((
+ i,
+ MQTTConnackData {
+ session_present: (topic_name_compression_response & 1) != 0,
+ return_code,
+ properties,
+ },
+ ))
+ }
+}
+
+#[inline]
+fn parse_publish(
+ protocol_version: u8,
+ has_id: bool,
+) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTPublishData> {
+ move |i: &[u8]| {
+ let (i, topic) = parse_mqtt_string(i)?;
+ let (i, message_id) = cond(has_id, be_u16)(i)?;
+ let (message, properties) = parse_properties(i, protocol_version == 5)?;
+ Ok((
+ i,
+ MQTTPublishData {
+ topic,
+ message_id,
+ message: message.to_vec(),
+ properties,
+ },
+ ))
+ }
+}
+
+#[inline]
+fn parse_msgidonly(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
+ move |input: &[u8]| {
+ if protocol_version < 5 {
+ // before v5 we don't even have to care about reason codes
+ // and properties, lucky us
+ return parse_msgidonly_v3(input);
+ }
+ let remaining_len = input.len();
+ match be_u16(input) {
+ Ok((rem, message_id)) => {
+ if remaining_len == 2 {
+ // from the spec: " The Reason Code and Property Length can be
+ // omitted if the Reason Code is 0x00 (Success) and there are
+ // no Properties. In this case the message has a Remaining
+ // Length of 2."
+ return Ok((
+ rem,
+ MQTTMessageIdOnly {
+ message_id,
+ reason_code: Some(0),
+ properties: None,
+ },
+ ));
+ }
+ match be_u8(rem) {
+ Ok((rem, reason_code)) => {
+ // We are checking for 3 because in that case we have a
+ // header plus reason code, but no properties.
+ if remaining_len == 3 {
+ // no properties
+ return Ok((
+ rem,
+ MQTTMessageIdOnly {
+ message_id,
+ reason_code: Some(reason_code),
+ properties: None,
+ },
+ ));
+ }
+ match parse_properties(rem, true) {
+ Ok((rem, properties)) => {
+ return Ok((
+ rem,
+ MQTTMessageIdOnly {
+ message_id,
+ reason_code: Some(reason_code),
+ properties,
+ },
+ ));
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ }
+}
+
+#[inline]
+fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
+ let (i, message_id) = be_u16(i)?;
+ Ok((
+ i,
+ MQTTMessageIdOnly {
+ message_id,
+ reason_code: None,
+ properties: None,
+ },
+ ))
+}
+
+#[inline]
+fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> {
+ let (i, topic_name) = parse_mqtt_string(i)?;
+ let (i, qos) = be_u8(i)?;
+ Ok((i, MQTTSubscribeTopicData { topic_name, qos }))
+}
+
+#[inline]
+fn parse_subscribe(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubscribeData> {
+ move |i: &[u8]| {
+ let (i, message_id) = be_u16(i)?;
+ let (i, properties) = parse_properties(i, protocol_version == 5)?;
+ let (i, topics) = many1(complete(parse_subscribe_topic))(i)?;
+ Ok((
+ i,
+ MQTTSubscribeData {
+ message_id,
+ topics,
+ properties,
+ },
+ ))
+ }
+}
+
+#[inline]
+fn parse_suback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubackData> {
+ move |i: &[u8]| {
+ let (i, message_id) = be_u16(i)?;
+ let (qoss, properties) = parse_properties(i, protocol_version == 5)?;
+ Ok((
+ i,
+ MQTTSubackData {
+ message_id,
+ qoss: qoss.to_vec(),
+ properties,
+ },
+ ))
+ }
+}
+
+#[inline]
+fn parse_unsubscribe(
+ protocol_version: u8,
+) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubscribeData> {
+ move |i: &[u8]| {
+ let (i, message_id) = be_u16(i)?;
+ let (i, properties) = parse_properties(i, protocol_version == 5)?;
+ let (i, topics) = many0(complete(parse_mqtt_string))(i)?;
+ Ok((
+ i,
+ MQTTUnsubscribeData {
+ message_id,
+ topics,
+ properties,
+ },
+ ))
+ }
+}
+
+#[inline]
+fn parse_unsuback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubackData> {
+ move |i: &[u8]| {
+ let (i, message_id) = be_u16(i)?;
+ let (i, properties) = parse_properties(i, protocol_version == 5)?;
+ let (i, reason_codes) = many0(complete(be_u8))(i)?;
+ Ok((
+ i,
+ MQTTUnsubackData {
+ message_id,
+ properties,
+ reason_codes: Some(reason_codes),
+ },
+ ))
+ }
+}
+
+#[inline]
+fn parse_disconnect(
+ remaining_len: usize,
+ protocol_version: u8,
+) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTDisconnectData> {
+ move |input: &[u8]| {
+ if protocol_version < 5 {
+ return Ok((
+ input,
+ MQTTDisconnectData {
+ reason_code: None,
+ properties: None,
+ },
+ ));
+ }
+ if remaining_len == 0 {
+ // The Reason Code and Property Length can be omitted if the Reason
+ // Code is 0x00 (Normal disconnection) and there are no Properties.
+ // In this case the DISCONNECT has a Remaining Length of 0.
+ return Ok((
+ input,
+ MQTTDisconnectData {
+ reason_code: Some(0),
+ properties: None,
+ },
+ ));
+ }
+ match be_u8(input) {
+ Ok((rem, reason_code)) => {
+ // We are checking for 1 because in that case we have a
+ // header plus reason code, but no properties.
+ if remaining_len == 1 {
+ // no properties
+ return Ok((
+ rem,
+ MQTTDisconnectData {
+ reason_code: Some(0),
+ properties: None,
+ },
+ ));
+ }
+ match parse_properties(rem, true) {
+ Ok((rem, properties)) => {
+ return Ok((
+ rem,
+ MQTTDisconnectData {
+ reason_code: Some(reason_code),
+ properties,
+ },
+ ));
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ }
+}
+
+#[inline]
+fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> {
+ let (i, reason_code) = be_u8(i)?;
+ let (i, properties) = parse_properties(i, true)?;
+ Ok((
+ i,
+ MQTTAuthData {
+ reason_code,
+ properties,
+ },
+ ))
+}
+
+#[inline]
+fn parse_remaining_message<'a>(
+ full: &'a [u8],
+ len: usize,
+ skiplen: usize,
+ header: FixedHeader,
+ message_type: MQTTTypeCode,
+ protocol_version: u8,
+) -> impl Fn(&'a [u8]) -> IResult<&'a [u8], MQTTMessage> {
+ move |input: &'a [u8]| {
+ match message_type {
+ MQTTTypeCode::CONNECT => match parse_connect(input) {
+ Ok((_rem, conn)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::CONNECT(conn),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::CONNACK => match parse_connack(protocol_version)(input) {
+ Ok((_rem, connack)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::CONNACK(connack),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::PUBLISH => {
+ match parse_publish(protocol_version, header.qos_level > 0)(input) {
+ Ok((_rem, publish)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::PUBLISH(publish),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ }
+ }
+ MQTTTypeCode::PUBACK
+ | MQTTTypeCode::PUBREC
+ | MQTTTypeCode::PUBREL
+ | MQTTTypeCode::PUBCOMP => match parse_msgidonly(protocol_version)(input) {
+ Ok((_rem, msgidonly)) => {
+ let msg = MQTTMessage {
+ header,
+ op: match message_type {
+ MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
+ MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
+ MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
+ MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
+ _ => MQTTOperation::UNASSIGNED,
+ },
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::SUBSCRIBE => match parse_subscribe(protocol_version)(input) {
+ Ok((_rem, subs)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::SUBSCRIBE(subs),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::SUBACK => match parse_suback(protocol_version)(input) {
+ Ok((_rem, suback)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::SUBACK(suback),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(protocol_version)(input) {
+ Ok((_rem, unsub)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::UNSUBSCRIBE(unsub),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::UNSUBACK => match parse_unsuback(protocol_version)(input) {
+ Ok((_rem, unsuback)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::UNSUBACK(unsuback),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => {
+ let msg = MQTTMessage {
+ header,
+ op: match message_type {
+ MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ,
+ MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP,
+ _ => MQTTOperation::UNASSIGNED,
+ },
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ MQTTTypeCode::DISCONNECT => match parse_disconnect(len, protocol_version)(input) {
+ Ok((_rem, disco)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::DISCONNECT(disco),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ MQTTTypeCode::AUTH => match parse_auth(input) {
+ Ok((_rem, auth)) => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::AUTH(auth),
+ };
+ Ok((&full[skiplen + len..], msg))
+ }
+ Err(e) => Err(e),
+ },
+ // Unassigned message type code. Unlikely to happen with
+ // regular traffic, might be an indication for broken or
+ // crafted MQTT traffic.
+ _ => {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::UNASSIGNED,
+ };
+ return Ok((&full[skiplen + len..], msg));
+ }
+ }
+ }
+}
+
+pub fn parse_message(
+ input: &[u8],
+ protocol_version: u8,
+ max_msg_size: usize,
+) -> IResult<&[u8], MQTTMessage> {
+ // Parse the fixed header first. This is identical across versions and can
+ // be between 2 and 5 bytes long.
+ match parse_fixed_header(input) {
+ Ok((fullrem, header)) => {
+ let len = header.remaining_length as usize;
+ // This is the length of the fixed header that we need to skip
+ // before returning the remainder. It is the sum of the length
+ // of the flag byte (1) and the length of the message length
+ // varint.
+ let skiplen = input.len() - fullrem.len();
+ let message_type = header.message_type;
+
+ // If the remaining length (message length) exceeds the specified
+ // limit, we return a special truncation message type, containing
+ // no parsed metadata but just the skipped length and the message
+ // type.
+ if len > max_msg_size {
+ let msg = MQTTMessage {
+ header,
+ op: MQTTOperation::TRUNCATED(MQTTTruncatedData {
+ original_message_type: message_type,
+ skipped_length: len + skiplen,
+ }),
+ };
+ // In this case we return the full input buffer, since this is
+ // what the skipped_length value also refers to: header _and_
+ // remaining length.
+ return Ok((input, msg));
+ }
+
+ // We have not exceeded the maximum length limit, but still do not
+ // have enough data in the input buffer to handle the full
+ // message. Signal this by returning an Incomplete IResult value.
+ if fullrem.len() < len {
+ return Err(Err::Incomplete(Needed::new(len - fullrem.len())));
+ }
+
+ // Parse the contents of the buffer into a single message.
+ // We reslice the remainder into the portion that we are interested
+ // in, according to the length we just parsed. This helps with the
+ // complete() parsers, where we would otherwise need to keep track
+ // of the already parsed length.
+ let rem = &fullrem[..len];
+
+ // Parse remaining message in buffer. We use complete() to ensure
+ // we do not request additional content in case of incomplete
+ // parsing, but raise an error instead as we should have all the
+ // data asked for in the header.
+ return complete(parse_remaining_message(
+ input,
+ len,
+ skiplen,
+ header,
+ message_type,
+ protocol_version,
+ ))(rem);
+ }
+ Err(err) => {
+ return Err(err);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use nom7::error::ErrorKind;
+
+ fn test_mqtt_parse_variable_fail(buf0: &[u8]) {
+ let r0 = parse_mqtt_variable_integer(buf0);
+ match r0 {
+ Ok((_, _)) => {
+ panic!("Result should not have been ok.");
+ }
+ Err(Err::Error(err)) => {
+ assert_eq!(err.code, ErrorKind::Verify);
+ }
+ _ => {
+ panic!("Result should be an error.");
+ }
+ }
+ }
+
+ fn test_mqtt_parse_variable_check(buf0: &[u8], expected: u32) {
+ let r0 = parse_mqtt_variable_integer(buf0);
+ match r0 {
+ Ok((_, val)) => {
+ assert_eq!(val, expected);
+ }
+ Err(_) => {
+ panic!("Result should have been ok.");
+ }
+ }
+ }
+
+ #[test]
+ fn test_mqtt_parse_variable_integer_largest_input() {
+ test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0xFF]);
+ }
+
+ #[test]
+ fn test_mqtt_parse_variable_integer_boundary() {
+ test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0x80]);
+ }
+
+ #[test]
+ fn test_mqtt_parse_variable_integer_largest_valid() {
+ test_mqtt_parse_variable_check(&[0xFF, 0xFF, 0xFF, 0x7F], 268435455);
+ }
+
+ #[test]
+ fn test_mqtt_parse_variable_integer_smallest_valid() {
+ test_mqtt_parse_variable_check(&[0x0], 0);
+ }
+
+ #[test]
+ fn test_parse_fixed_header() {
+ let buf = [
+ 0x30, /* Header Flags: 0x30, Message Type: Publish Message, QoS Level: At most once delivery (Fire and Forget) */
+ 0xb7, 0x97, 0x02, /* Msg Len: 35767 */
+ 0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x01, 0xa0,
+ ];
+
+ let result = parse_fixed_header(&buf);
+ match result {
+ Ok((remainder, message)) => {
+ assert_eq!(message.message_type, MQTTTypeCode::PUBLISH);
+ assert!(!message.dup_flag);
+ assert_eq!(message.qos_level, 0);
+ assert!(!message.retain);
+ assert_eq!(message.remaining_length, 35767);
+ assert_eq!(remainder.len(), 17);
+ }
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_properties() {
+ let buf = [
+ 0x03, 0x21, 0x00, 0x14, /* Properties */
+ 0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x01, 0xa0,
+ ];
+
+ let result = parse_properties(&buf, true);
+ match result {
+ Ok((remainder, message)) => {
+ let res = message.unwrap();
+ assert_eq!(res[0], MQTTProperty::RECEIVE_MAXIMUM(20));
+ assert_eq!(remainder.len(), 17);
+ }
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+ #[test]
+ fn test_parse_connect() {
+ let buf = [
+ 0x00, 0x04, /* Protocol Name Length: 4 */
+ 0x4d, 0x51, 0x54, 0x54, /* Protocol Name: MQTT */
+ 0x05, /* Version: MQTT v5.0 (5) */
+ 0xc2, /*Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */
+ 0x00, 0x3c, /* Keep Alive: 60 */
+ 0x03, 0x21, 0x00, 0x14, /* Properties */
+ 0x00, 0x00, /* Client ID Length: 0 */
+ 0x00, 0x04, /* User Name Length: 4 */
+ 0x75, 0x73, 0x65, 0x72, /* User Name: user */
+ 0x00, 0x04, /* Password Length: 4 */
+ 0x71, 0x61, 0x71, 0x73, /* Password: pass */
+ ];
+
+ let result = parse_connect(&buf);
+ match result {
+ Ok((remainder, message)) => {
+ assert_eq!(message.protocol_string, "MQTT");
+ assert_eq!(message.protocol_version, 5);
+ assert!(message.username_flag);
+ assert!(message.password_flag);
+ assert!(!message.will_retain);
+ assert_eq!(message.will_qos, 0);
+ assert!(!message.will_flag);
+ assert!(message.clean_session);
+ assert_eq!(message.keepalive, 60);
+ assert_eq!(remainder.len(), 0);
+ }
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_connack() {
+ let buf = [
+ 0x00, /* Acknowledge Flags: 0x00 (0000 000. = Reserved: Not set )(.... ...0 = Session Present: Not set) */
+ 0x00, /* Reason Code: Success (0) */
+ 0x2f, /* Total Length: 47 */
+ 0x22, /* ID: Topic Alias Maximum (0x22) */
+ 0x00, 0x0a, /* Value: 10 */
+ 0x12, /* ID: Assigned Client Identifier (0x12) */
+ 0x00, 0x29, /* Length: 41 */
+ 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x31, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, 0x30, 0x2d,
+ 0x30, 0x38, 0x45, 0x33, 0x2d, 0x33, 0x42, 0x41, 0x31, 0x2d, 0x32, 0x45, 0x39, 0x37,
+ 0x2d, 0x45, 0x39, 0x41, 0x30, 0x42, 0x34, 0x30, 0x36, 0x34, 0x42, 0x46,
+ 0x35, /* 41 byte Value: auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5 */
+ ];
+ let client_identifier = "auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5";
+
+ let result = parse_connack(5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ let props = message.properties.unwrap();
+ assert_eq!(props[0], MQTTProperty::TOPIC_ALIAS_MAXIMUM(10));
+ assert_eq!(
+ props[1],
+ MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(client_identifier.to_string())
+ );
+ assert_eq!(message.return_code, 0);
+ assert!(!message.session_present);
+ assert_eq!(remainder.len(), 0);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_publish() {
+ let buf = [
+ 0x00, 06, /* Topic Length: 6 */
+ 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
+ 0x00, 0x01, /* Message Identifier: 1 */
+ 0x00, /* Properties 6 */
+ 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
+ ];
+
+ let result = parse_publish(5, true);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ let message_id = message.message_id.unwrap();
+ assert_eq!(message.topic, "topicX");
+ assert_eq!(message_id, 1);
+ assert_eq!(remainder.len(), 13);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_msgidonly_v3() {
+ let buf = [
+ 0x00, 01, /* Message Identifier: 1 */
+ 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34,
+ 0x33, 0x45, 0x38, 0x30,
+ ];
+
+ let result = parse_msgidonly(3);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ assert_eq!(message.message_id, 1);
+ assert_eq!(remainder.len(), 18);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_msgidonly_v5() {
+ let buf = [
+ 0x00, 01, /* Message Identifier: 1 */
+ 0x00, /* Reason Code: 0 */
+ 0x00, /* Properties */
+ 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
+ ];
+
+ let result = parse_msgidonly(5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ let reason_code = message.reason_code.unwrap();
+ assert_eq!(message.message_id, 1);
+ assert_eq!(reason_code, 0);
+ assert_eq!(remainder.len(), 12);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_subscribe() {
+ let buf = [
+ 0x00, 0x01, /* Message Identifier: 1 */
+ 0x00, /* Properties 6 */
+ 0x00, 0x06, /* Topic Length: 6 */
+ 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
+ 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
+ 0x00, 0x06, /* Topic Length: 6 */
+ 0x74, 0x6f, 0x70, 0x69, 0x63, 0x59, /* Topic: topicY */
+ 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
+ 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30,
+ ];
+
+ let result = parse_subscribe(5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ assert_eq!(message.topics[0].topic_name, "topicX");
+ assert_eq!(message.topics[1].topic_name, "topicY");
+ assert_eq!(message.topics[0].qos, 0);
+ assert_eq!(message.message_id, 1);
+ assert_eq!(remainder.len(), 12);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+ #[test]
+ fn test_parse_suback() {
+ let buf = [
+ 0x00, 0x01, /* Message Identifier: 1 */
+ 0x00, /* Properties 6 */
+ 0x00, 0x00, /* Topic Length: 6 */
+ ];
+
+ let result = parse_suback(5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ assert_eq!(message.qoss[0], 0);
+ assert_eq!(message.message_id, 1);
+ assert_eq!(remainder.len(), 3);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+ #[test]
+ fn test_parse_unsubscribe() {
+ let buf = [
+ 0x00, 0x01, /* Message Identifier: 1 */
+ 0x00, /* Properties 6 */
+ 0x00, 0x06, /* Topic Length: 6 */
+ 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */
+ 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */
+ ];
+
+ let result = parse_unsubscribe(5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ assert_eq!(message.topics[0], "topicX");
+ assert_eq!(message.message_id, 1);
+ assert_eq!(remainder.len(), 1);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_unsuback() {
+ let buf = [
+ 0x00, 0x01, /* Message Identifier: 1 */
+ 0x00, /* Properties 6 */
+ 0x00, /* Reason Code */
+ ];
+
+ let result = parse_unsuback(5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ let reason_codes = message.reason_codes.unwrap();
+ assert_eq!(reason_codes[0], 0);
+ assert_eq!(message.message_id, 1);
+ assert_eq!(remainder.len(), 0);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_disconnect() {
+ let buf = [
+ 0xe0, /* Reason: 0 */
+ 0x00, /* Message Identifier: 1 */
+ ];
+
+ let result = parse_disconnect(0, 5);
+ let input = result(&buf);
+ match input {
+ Ok((remainder, message)) => {
+ let reason_code = message.reason_code.unwrap();
+ assert_eq!(reason_code, 0);
+ assert_eq!(remainder.len(), 2);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_message() {
+ let buf = [
+ 0x10, /* Message Identifier: 1 */
+ 0x2f, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05,
+ 0xc2, /* Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */
+ 0x00, 0x3c, 0x03, 0x21, 0x00, 0x14, /* Properties */
+ 0x00, 0x13, 0x6d, 0x79, 0x76, 0x6f, 0x69, 0x63, 0x65, 0x69, 0x73, 0x6d, 0x79, 0x70,
+ 0x61, 0x73, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x00, 0x04, 0x75, 0x73, 0x65, 0x72, 0x00,
+ 0x04, 0x70, 0x61, 0x73, 0x73,
+ ];
+
+ let result = parse_message(&buf, 5, 40);
+ match result {
+ Ok((remainder, message)) => {
+ assert_eq!(message.header.message_type, MQTTTypeCode::CONNECT);
+ assert!(!message.header.dup_flag);
+ assert_eq!(message.header.qos_level, 0);
+ assert!(!message.header.retain);
+ assert_eq!(remainder.len(), 49);
+ }
+
+ Err(Err::Incomplete(_)) => {
+ panic!("Result should not have been incomplete.");
+ }
+ Err(Err::Error(err)) | Err(Err::Failure(err)) => {
+ panic!("Result should not be an error: {:?}.", err);
+ }
+ }
+ }
+}