diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:39:49 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:39:49 +0000 |
commit | a0aa2307322cd47bbf416810ac0292925e03be87 (patch) | |
tree | 37076262a026c4b48c8a0e84f44ff9187556ca35 /rust/src/mqtt | |
parent | Initial commit. (diff) | |
download | suricata-a0aa2307322cd47bbf416810ac0292925e03be87.tar.xz suricata-a0aa2307322cd47bbf416810ac0292925e03be87.zip |
Adding upstream version 1:7.0.3.upstream/1%7.0.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'rust/src/mqtt')
-rw-r--r-- | rust/src/mqtt/detect.rs | 520 | ||||
-rw-r--r-- | rust/src/mqtt/logger.rs | 305 | ||||
-rw-r--r-- | rust/src/mqtt/mod.rs | 25 | ||||
-rw-r--r-- | rust/src/mqtt/mqtt.rs | 805 | ||||
-rw-r--r-- | rust/src/mqtt/mqtt_message.rs | 205 | ||||
-rw-r--r-- | rust/src/mqtt/mqtt_property.rs | 269 | ||||
-rw-r--r-- | rust/src/mqtt/parser.rs | 1135 |
7 files changed, 3264 insertions, 0 deletions
diff --git a/rust/src/mqtt/detect.rs b/rust/src/mqtt/detect.rs new file mode 100644 index 0000000..b47a84f --- /dev/null +++ b/rust/src/mqtt/detect.rs @@ -0,0 +1,520 @@ +/* Copyright (C) 2020 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +// written by Sascha Steinbiss <sascha@steinbiss.name> + +use crate::mqtt::mqtt::{MQTTState, MQTTTransaction}; +use crate::mqtt::mqtt_message::{MQTTOperation, MQTTTypeCode}; +use std::ffi::CStr; +use std::ptr; +use std::str::FromStr; + +#[derive(FromPrimitive, Debug, Copy, Clone, PartialOrd, PartialEq, Eq)] +#[allow(non_camel_case_types)] +#[repr(u8)] +pub enum MQTTFlagState { + MQTT_DONT_CARE = 0, + MQTT_MUST_BE_SET = 1, + MQTT_CANT_BE_SET = 2, +} + +#[inline] +fn check_flag_state(flag_state: MQTTFlagState, flag_value: bool, ok: &mut bool) { + match flag_state { + MQTTFlagState::MQTT_MUST_BE_SET => { + if !flag_value { + *ok = false; + } + } + MQTTFlagState::MQTT_CANT_BE_SET => { + if flag_value { + *ok = false; + } + } + _ => {} + } +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_tx_has_type(tx: &MQTTTransaction, mtype: u8) -> u8 { + for msg in tx.msg.iter() { + if mtype == msg.header.message_type as u8 { + return 1; + } + } + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_cstr_message_code( + str: *const std::os::raw::c_char, +) -> std::os::raw::c_int { + let msgtype: &CStr = CStr::from_ptr(str); + if let Ok(s) = msgtype.to_str() { + if let Ok(x) = MQTTTypeCode::from_str(s) { + return x as i32; + } + } + return -1; +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_tx_has_flags( + tx: &MQTTTransaction, qretain: MQTTFlagState, qdup: MQTTFlagState, +) -> u8 { + for msg in tx.msg.iter() { + let mut ok = true; + check_flag_state(qretain, msg.header.retain, &mut ok); + check_flag_state(qdup, msg.header.dup_flag, &mut ok); + if ok { + return 1; + } + } + + return 0; +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_tx_has_qos(tx: &MQTTTransaction, qos: u8) -> u8 { + for msg in tx.msg.iter() { + if qos == msg.header.qos_level { + return 1; + } + } + return 0; +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_tx_get_protocol_version(state: &MQTTState) -> u8 { + return state.protocol_version; +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_tx_has_connect_flags( + tx: &MQTTTransaction, username: MQTTFlagState, password: MQTTFlagState, will: MQTTFlagState, + will_retain: MQTTFlagState, clean_session: MQTTFlagState, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNECT(ref cv) = msg.op { + let mut ok = true; + check_flag_state(username, cv.username_flag, &mut ok); + check_flag_state(password, cv.password_flag, &mut ok); + check_flag_state(will, cv.will_flag, &mut ok); + check_flag_state(will_retain, cv.will_retain, &mut ok); + check_flag_state(clean_session, cv.clean_session, &mut ok); + if ok { + return 1; + } + } + } + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNECT(ref cv) = msg.op { + let p = &cv.client_id; + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNECT(ref cv) = msg.op { + if let Some(p) = &cv.username { + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNECT(ref cv) = msg.op { + if let Some(p) = &cv.password { + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNECT(ref cv) = msg.op { + if let Some(p) = &cv.will_topic { + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNECT(ref cv) = msg.op { + if let Some(p) = &cv.will_message { + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent( + tx: &MQTTTransaction, session_present: *mut bool, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::CONNACK(ref ca) = msg.op { + *session_present = ca.session_present; + return 1; + } + } + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::PUBLISH(ref pubv) = msg.op { + let p = &pubv.topic; + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message( + tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32, +) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::PUBLISH(ref pubv) = msg.op { + let p = &pubv.message; + if !p.is_empty() { + *buffer = p.as_ptr(); + *buffer_len = p.len() as u32; + return 1; + } + } + } + + *buffer = ptr::null(); + *buffer_len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic( + tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32, +) -> u8 { + let mut offset = 0; + for msg in tx.msg.iter() { + if let MQTTOperation::SUBSCRIBE(ref subv) = msg.op { + if (i as usize) < subv.topics.len() + offset { + let topic = &subv.topics[(i as usize) - offset]; + if !topic.topic_name.is_empty() { + *len = topic.topic_name.len() as u32; + *buf = topic.topic_name.as_ptr(); + return 1; + } + } else { + offset += subv.topics.len(); + } + } + } + + *buf = ptr::null(); + *len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic( + tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32, +) -> u8 { + let mut offset = 0; + for msg in tx.msg.iter() { + if let MQTTOperation::UNSUBSCRIBE(ref unsubv) = msg.op { + if (i as usize) < unsubv.topics.len() + offset { + let topic = &unsubv.topics[(i as usize) - offset]; + if !topic.is_empty() { + *len = topic.len() as u32; + *buf = topic.as_ptr(); + return 1; + } + } else { + offset += unsubv.topics.len(); + } + } + } + + *buf = ptr::null(); + *len = 0; + + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(tx: &MQTTTransaction, result: *mut u8) -> u8 { + for msg in tx.msg.iter() { + match msg.op { + MQTTOperation::PUBACK(ref v) + | MQTTOperation::PUBREL(ref v) + | MQTTOperation::PUBREC(ref v) + | MQTTOperation::PUBCOMP(ref v) => { + if let Some(rcode) = v.reason_code { + *result = rcode; + return 1; + } + } + MQTTOperation::AUTH(ref v) => { + *result = v.reason_code; + return 1; + } + MQTTOperation::CONNACK(ref v) => { + *result = v.return_code; + return 1; + } + MQTTOperation::DISCONNECT(ref v) => { + if let Some(rcode) = v.reason_code { + *result = rcode; + return 1; + } + } + _ => return 0, + } + } + return 0; +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(tx: &MQTTTransaction, code: u8) -> u8 { + for msg in tx.msg.iter() { + if let MQTTOperation::UNSUBACK(ref unsuback) = msg.op { + if let Some(ref reason_codes) = unsuback.reason_codes { + for rc in reason_codes.iter() { + if *rc == code { + return 1; + } + } + } + } + } + return 0; +} + +#[cfg(test)] +mod test { + use super::*; + use crate::mqtt::mqtt::MQTTTransaction; + use crate::mqtt::mqtt_message::*; + use crate::mqtt::parser::FixedHeader; + use crate::core::Direction; + use std; + + #[test] + fn test_multi_unsubscribe() { + let mut t = MQTTTransaction::new(MQTTMessage { + header: FixedHeader { + message_type: MQTTTypeCode::UNSUBSCRIBE, + dup_flag: false, + qos_level: 0, + retain: false, + remaining_length: 0, + }, + op: MQTTOperation::UNSUBSCRIBE(MQTTUnsubscribeData { + message_id: 1, + topics: vec!["foo".to_string(), "baar".to_string()], + properties: None, + }), + }, Direction::ToServer); + t.msg.push(MQTTMessage { + header: FixedHeader { + message_type: MQTTTypeCode::UNSUBSCRIBE, + dup_flag: false, + qos_level: 0, + retain: false, + remaining_length: 0, + }, + op: MQTTOperation::UNSUBSCRIBE(MQTTUnsubscribeData { + message_id: 1, + topics: vec!["fieee".to_string(), "baaaaz".to_string()], + properties: None, + }), + }); + let mut s: *const u8 = std::ptr::null_mut(); + let mut slen: u32 = 0; + let mut r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen) }; + assert_eq!(r, 1); + let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "foo"); + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen) }; + assert_eq!(r, 1); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "baar"); + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen) }; + assert_eq!(r, 1); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "fieee"); + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen) }; + assert_eq!(r, 1); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "baaaaz"); + r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen) }; + assert_eq!(r, 0); + } + + #[test] + fn test_multi_subscribe() { + let mut t = MQTTTransaction::new(MQTTMessage { + header: FixedHeader { + message_type: MQTTTypeCode::SUBSCRIBE, + dup_flag: false, + qos_level: 0, + retain: false, + remaining_length: 0, + }, + op: MQTTOperation::SUBSCRIBE(MQTTSubscribeData { + message_id: 1, + topics: vec![ + MQTTSubscribeTopicData { + topic_name: "foo".to_string(), + qos: 0, + }, + MQTTSubscribeTopicData { + topic_name: "baar".to_string(), + qos: 1, + }, + ], + properties: None, + }), + }, Direction::ToServer); + t.msg.push(MQTTMessage { + header: FixedHeader { + message_type: MQTTTypeCode::SUBSCRIBE, + dup_flag: false, + qos_level: 0, + retain: false, + remaining_length: 0, + }, + op: MQTTOperation::SUBSCRIBE(MQTTSubscribeData { + message_id: 1, + topics: vec![ + MQTTSubscribeTopicData { + topic_name: "fieee".to_string(), + qos: 0, + }, + MQTTSubscribeTopicData { + topic_name: "baaaaz".to_string(), + qos: 1, + }, + ], + properties: None, + }), + }); + let mut s: *const u8 = std::ptr::null_mut(); + let mut slen: u32 = 0; + let mut r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen) }; + assert_eq!(r, 1); + let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "foo"); + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen) }; + assert_eq!(r, 1); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "baar"); + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen) }; + assert_eq!(r, 1); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "fieee"); + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen) }; + assert_eq!(r, 1); + topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) }); + assert_eq!(topic, "baaaaz"); + r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen) }; + assert_eq!(r, 0); + } +} diff --git a/rust/src/mqtt/logger.rs b/rust/src/mqtt/logger.rs new file mode 100644 index 0000000..af0db17 --- /dev/null +++ b/rust/src/mqtt/logger.rs @@ -0,0 +1,305 @@ +/* Copyright (C) 2020 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +// written by Sascha Steinbiss <sascha@steinbiss.name> + +use super::mqtt::MQTTTransaction; +use crate::jsonbuilder::{JsonBuilder, JsonError}; +use crate::mqtt::mqtt_message::{MQTTOperation, MQTTSubscribeTopicData}; +use crate::mqtt::parser::FixedHeader; +use std; + +pub const MQTT_LOG_PASSWORDS: u32 = BIT_U32!(0); + +#[inline] +fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> { + js.start_object()?; + js.set_string("topic", &t.topic_name)?; + js.set_uint("qos", t.qos as u64)?; + js.close()?; + return Ok(()); +} + +#[inline] +fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> { + js.set_uint("qos", hdr.qos_level as u64)?; + js.set_bool("retain", hdr.retain)?; + js.set_bool("dup", hdr.dup_flag)?; + return Ok(()); +} + +fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<(), JsonError> { + js.open_object("mqtt")?; + for msg in tx.msg.iter() { + match msg.op { + MQTTOperation::CONNECT(ref conn) => { + js.open_object("connect")?; + log_mqtt_header(js, &msg.header)?; + js.set_string("protocol_string", &conn.protocol_string)?; + js.set_uint("protocol_version", conn.protocol_version as u64)?; + js.set_string("client_id", &conn.client_id)?; + js.open_object("flags")?; + js.set_bool("username", conn.username_flag)?; + js.set_bool("password", conn.password_flag)?; + js.set_bool("will_retain", conn.will_retain)?; + js.set_bool("will", conn.will_flag)?; + js.set_bool("clean_session", conn.clean_session)?; + js.close()?; // flags + if let Some(user) = &conn.username { + js.set_string("username", user)?; + } + if flags & MQTT_LOG_PASSWORDS != 0 { + if let Some(pass) = &conn.password { + js.set_string_from_bytes("password", pass)?; + } + } + if conn.will_flag { + js.open_object("will")?; + if let Some(will_topic) = &conn.will_topic { + js.set_string("topic", will_topic)?; + } + if let Some(will_message) = &conn.will_message { + js.set_string_from_bytes("message", will_message)?; + } + if let Some(will_properties) = &conn.will_properties { + js.open_object("properties")?; + for prop in will_properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // will + } + if let Some(properties) = &conn.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // connect + } + MQTTOperation::CONNACK(ref connack) => { + js.open_object("connack")?; + log_mqtt_header(js, &msg.header)?; + js.set_bool("session_present", connack.session_present)?; + js.set_uint("return_code", connack.return_code as u64)?; + if let Some(properties) = &connack.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // connack + } + MQTTOperation::PUBLISH(ref publish) => { + js.open_object("publish")?; + log_mqtt_header(js, &msg.header)?; + js.set_string("topic", &publish.topic)?; + if let Some(message_id) = publish.message_id { + js.set_uint("message_id", message_id as u64)?; + } + js.set_string_from_bytes("message", &publish.message)?; + if let Some(properties) = &publish.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // publish + } + MQTTOperation::PUBACK(ref msgidonly) => { + js.open_object("puback")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", msgidonly.message_id as u64)?; + if let Some(reason_code) = &msgidonly.reason_code { + js.set_uint("reason_code", *reason_code as u64)?; + } + if let Some(properties) = &msgidonly.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // puback + } + MQTTOperation::PUBREC(ref msgidonly) => { + js.open_object("pubrec")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", msgidonly.message_id as u64)?; + if let Some(reason_code) = &msgidonly.reason_code { + js.set_uint("reason_code", *reason_code as u64)?; + } + if let Some(properties) = &msgidonly.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // pubrec + } + MQTTOperation::PUBREL(ref msgidonly) => { + js.open_object("pubrel")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", msgidonly.message_id as u64)?; + if let Some(reason_code) = &msgidonly.reason_code { + js.set_uint("reason_code", *reason_code as u64)?; + } + if let Some(properties) = &msgidonly.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // pubrel + } + MQTTOperation::PUBCOMP(ref msgidonly) => { + js.open_object("pubcomp")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", msgidonly.message_id as u64)?; + if let Some(reason_code) = &msgidonly.reason_code { + js.set_uint("reason_code", *reason_code as u64)?; + } + if let Some(properties) = &msgidonly.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // pubcomp + } + MQTTOperation::SUBSCRIBE(ref subs) => { + js.open_object("subscribe")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", subs.message_id as u64)?; + js.open_array("topics")?; + for t in &subs.topics { + log_mqtt_topic(js, t)?; + } + js.close()?; //topics + if let Some(properties) = &subs.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // subscribe + } + MQTTOperation::SUBACK(ref suback) => { + js.open_object("suback")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", suback.message_id as u64)?; + js.open_array("qos_granted")?; + for t in &suback.qoss { + js.append_uint(*t as u64)?; + } + js.close()?; // qos_granted + js.close()?; // suback + } + MQTTOperation::UNSUBSCRIBE(ref unsub) => { + js.open_object("unsubscribe")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", unsub.message_id as u64)?; + js.open_array("topics")?; + for t in &unsub.topics { + js.append_string(t)?; + } + js.close()?; // topics + js.close()?; // unsubscribe + } + MQTTOperation::UNSUBACK(ref unsuback) => { + js.open_object("unsuback")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("message_id", unsuback.message_id as u64)?; + if let Some(codes) = &unsuback.reason_codes { + if !codes.is_empty() { + js.open_array("reason_codes")?; + for t in codes { + js.append_uint(*t as u64)?; + } + js.close()?; // reason_codes + } + } + js.close()?; // unsuback + } + MQTTOperation::PINGREQ => { + js.open_object("pingreq")?; + log_mqtt_header(js, &msg.header)?; + js.close()?; // pingreq + } + MQTTOperation::PINGRESP => { + js.open_object("pingresp")?; + log_mqtt_header(js, &msg.header)?; + js.close()?; // pingresp + } + MQTTOperation::AUTH(ref auth) => { + js.open_object("auth")?; + log_mqtt_header(js, &msg.header)?; + js.set_uint("reason_code", auth.reason_code as u64)?; + if let Some(properties) = &auth.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // auth + } + MQTTOperation::DISCONNECT(ref disco) => { + js.open_object("disconnect")?; + log_mqtt_header(js, &msg.header)?; + if let Some(reason_code) = &disco.reason_code { + js.set_uint("reason_code", *reason_code as u64)?; + } + if let Some(properties) = &disco.properties { + js.open_object("properties")?; + for prop in properties { + prop.set_json(js)?; + } + js.close()?; // properties + } + js.close()?; // disconnect + } + MQTTOperation::TRUNCATED(ref trunc) => { + js.open_object(&trunc.original_message_type.to_lower_str())?; + log_mqtt_header(js, &msg.header)?; + js.set_bool("truncated", true)?; + js.set_uint("skipped_length", trunc.skipped_length as u64)?; + js.close()?; // truncated + } + MQTTOperation::UNASSIGNED => {} + } + } + js.close()?; // mqtt + + return Ok(()); +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_logger_log( + tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder, +) -> bool { + let tx = cast_pointer!(tx, MQTTTransaction); + log_mqtt(tx, flags, js).is_ok() +} diff --git a/rust/src/mqtt/mod.rs b/rust/src/mqtt/mod.rs new file mode 100644 index 0000000..aefcc33 --- /dev/null +++ b/rust/src/mqtt/mod.rs @@ -0,0 +1,25 @@ +/* Copyright (C) 2020 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +//! MQTT application layer, detection, logger and parser module. + +pub mod detect; +pub mod logger; +pub mod mqtt; +pub mod mqtt_message; +pub mod mqtt_property; +pub mod parser; diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs new file mode 100644 index 0000000..3f110df --- /dev/null +++ b/rust/src/mqtt/mqtt.rs @@ -0,0 +1,805 @@ +/* Copyright (C) 2020-2022 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +// written by Sascha Steinbiss <sascha@steinbiss.name> + +use super::mqtt_message::*; +use super::parser::*; +use crate::applayer::*; +use crate::applayer::{self, LoggerFlags}; +use crate::conf::conf_get; +use crate::core::*; +use crate::frames::*; +use nom7::Err; +use std; +use std::collections::VecDeque; +use std::ffi::CString; + +// Used as a special pseudo packet identifier to denote the first CONNECT +// packet in a connection. Note that there is no risk of collision with a +// parsed packet identifier because in the protocol these are only 16 bit +// unsigned. +const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX; +// Maximum message length in bytes. If the length of a message exceeds +// this value, it will be truncated. Default: 1MB. +static mut MAX_MSG_LEN: u32 = 1048576; + +static mut MQTT_MAX_TX: usize = 1024; + +static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN; + +#[derive(AppLayerFrameType)] +pub enum MQTTFrameType { + Pdu, + Header, + Data, +} + +#[derive(FromPrimitive, Debug, AppLayerEvent)] +pub enum MQTTEvent { + MissingConnect, + MissingPublish, + MissingSubscribe, + MissingUnsubscribe, + DoubleConnect, + UnintroducedMessage, + InvalidQosLevel, + MissingMsgId, + UnassignedMsgType, + TooManyTransactions, + MalformedTraffic, +} + +#[derive(Debug)] +pub struct MQTTTransaction { + tx_id: u64, + pkt_id: Option<u32>, + pub msg: Vec<MQTTMessage>, + complete: bool, + toclient: bool, + toserver: bool, + + logged: LoggerFlags, + tx_data: applayer::AppLayerTxData, +} + +impl MQTTTransaction { + pub fn new(msg: MQTTMessage, direction: Direction) -> MQTTTransaction { + let mut m = MQTTTransaction::new_empty(direction); + m.msg.push(msg); + return m; + } + + pub fn new_empty(direction: Direction) -> MQTTTransaction { + return MQTTTransaction { + tx_id: 0, + pkt_id: None, + complete: false, + logged: LoggerFlags::new(), + msg: Vec::new(), + toclient: direction.is_to_client(), + toserver: direction.is_to_server(), + tx_data: applayer::AppLayerTxData::for_direction(direction), + }; + } +} + +impl Transaction for MQTTTransaction { + fn id(&self) -> u64 { + self.tx_id + } +} + +pub struct MQTTState { + state_data: AppLayerStateData, + tx_id: u64, + pub protocol_version: u8, + transactions: VecDeque<MQTTTransaction>, + connected: bool, + skip_request: usize, + skip_response: usize, + max_msg_len: usize, + tx_index_completed: usize, +} + +impl State<MQTTTransaction> for MQTTState { + fn get_transaction_count(&self) -> usize { + self.transactions.len() + } + + fn get_transaction_by_index(&self, index: usize) -> Option<&MQTTTransaction> { + self.transactions.get(index) + } +} + +impl Default for MQTTState { + fn default() -> Self { + Self::new() + } +} + +impl MQTTState { + pub fn new() -> Self { + Self { + state_data: AppLayerStateData::new(), + tx_id: 0, + protocol_version: 0, + transactions: VecDeque::new(), + connected: false, + skip_request: 0, + skip_response: 0, + max_msg_len: unsafe { MAX_MSG_LEN as usize }, + tx_index_completed: 0, + } + } + + fn free_tx(&mut self, tx_id: u64) { + let len = self.transactions.len(); + let mut found = false; + let mut index = 0; + for i in 0..len { + let tx = &self.transactions[i]; + if tx.tx_id == tx_id + 1 { + found = true; + index = i; + break; + } + } + if found { + self.tx_index_completed = 0; + self.transactions.remove(index); + } + } + + pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> { + self.transactions.iter().find(|tx| tx.tx_id == tx_id + 1) + } + + pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> { + for tx in &mut self.transactions.range_mut(self.tx_index_completed..) { + if !tx.complete { + if let Some(mpktid) = tx.pkt_id { + if mpktid == pkt_id { + return Some(tx); + } + } + } + } + return None; + } + + fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction { + let direction = if toclient { + Direction::ToClient + } else { + Direction::ToServer + }; + let mut tx = MQTTTransaction::new(msg, direction); + self.tx_id += 1; + tx.tx_id = self.tx_id; + if self.transactions.len() > unsafe { MQTT_MAX_TX } { + let mut index = self.tx_index_completed; + for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) { + index += 1; + if !tx_old.complete { + tx_old.complete = true; + MQTTState::set_event(tx_old, MQTTEvent::TooManyTransactions); + break; + } + } + self.tx_index_completed = index; + } + return tx; + } + + // Handle a MQTT message depending on the direction and state. + // Note that we are trying to only have one mutable reference to msg + // and its components, however, since we are in a large match operation, + // we cannot pass around and/or store more references or move things + // without having to introduce lifetimes etc. + // This is the reason for the code duplication below. Maybe there is a + // more concise way to do it, but this works for now. + fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) { + match msg.op { + MQTTOperation::CONNECT(ref conn) => { + self.protocol_version = conn.protocol_version; + let mut tx = self.new_tx(msg, toclient); + tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); + if self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect); + } + self.transactions.push_back(tx); + } + MQTTOperation::PUBLISH(ref publish) => { + let qos = msg.header.qos_level; + let pkt_id = publish.message_id; + let mut tx = self.new_tx(msg, toclient); + match qos { + 0 => { + // with QOS level 0, we do not need to wait for a + // response + tx.complete = true; + } + 1..=2 => { + if let Some(pkt_id) = pkt_id { + tx.pkt_id = Some(pkt_id as u32); + } else { + MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); + } + } + _ => { + MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); + } + } + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); + } + MQTTOperation::SUBSCRIBE(ref subscribe) => { + let pkt_id = subscribe.message_id as u32; + let qos = msg.header.qos_level; + let mut tx = self.new_tx(msg, toclient); + match qos { + 0 => { + // with QOS level 0, we do not need to wait for a + // response + tx.complete = true; + } + 1..=2 => { + tx.pkt_id = Some(pkt_id); + } + _ => { + MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); + } + } + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); + } + MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { + let pkt_id = unsubscribe.message_id as u32; + let qos = msg.header.qos_level; + let mut tx = self.new_tx(msg, toclient); + match qos { + 0 => { + // with QOS level 0, we do not need to wait for a + // response + tx.complete = true; + } + 1..=2 => { + tx.pkt_id = Some(pkt_id); + } + _ => { + MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); + } + } + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); + } + MQTTOperation::CONNACK(ref _connack) => { + if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { + tx.msg.push(msg); + tx.complete = true; + tx.pkt_id = None; + self.connected = true; + } else { + let mut tx = self.new_tx(msg, toclient); + MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); + tx.complete = true; + self.transactions.push_back(tx); + } + } + MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => { + if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { + tx.msg.push(msg); + } else { + let mut tx = self.new_tx(msg, toclient); + MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; + self.transactions.push_back(tx); + } + } + MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => { + if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { + tx.msg.push(msg); + tx.complete = true; + tx.pkt_id = None; + } else { + let mut tx = self.new_tx(msg, toclient); + MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; + self.transactions.push_back(tx); + } + } + MQTTOperation::SUBACK(ref suback) => { + if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) { + tx.msg.push(msg); + tx.complete = true; + tx.pkt_id = None; + } else { + let mut tx = self.new_tx(msg, toclient); + MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; + self.transactions.push_back(tx); + } + } + MQTTOperation::UNSUBACK(ref unsuback) => { + if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) { + tx.msg.push(msg); + tx.complete = true; + tx.pkt_id = None; + } else { + let mut tx = self.new_tx(msg, toclient); + MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; + self.transactions.push_back(tx); + } + } + MQTTOperation::UNASSIGNED => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; + MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType); + self.transactions.push_back(tx); + } + MQTTOperation::TRUNCATED(_) => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; + self.transactions.push_back(tx); + } + MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); + } + MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); + } + } + } + + fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { + let input = stream_slice.as_slice(); + let mut current = input; + + if input.is_empty() { + return AppLayerResult::ok(); + } + + let mut consumed = 0; + SCLogDebug!( + "skip_request {} input len {}", + self.skip_request, + input.len() + ); + if self.skip_request > 0 { + if input.len() <= self.skip_request { + SCLogDebug!("reducing skip_request by {}", input.len()); + self.skip_request -= input.len(); + return AppLayerResult::ok(); + } else { + current = &input[self.skip_request..]; + SCLogDebug!( + "skip end reached, skipping {} :{:?}", + self.skip_request, + current + ); + consumed = self.skip_request; + self.skip_request = 0; + } + } + + while !current.is_empty() { + SCLogDebug!("request: handling {}", current.len()); + match parse_message(current, self.protocol_version, self.max_msg_len) { + Ok((rem, msg)) => { + let _pdu = Frame::new( + flow, + &stream_slice, + input, + current.len() as i64, + MQTTFrameType::Pdu as u8, + ); + SCLogDebug!("request msg {:?}", msg); + if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { + SCLogDebug!( + "found truncated with skipped {} current len {}", + trunc.skipped_length, + current.len() + ); + if trunc.skipped_length >= current.len() { + self.skip_request = trunc.skipped_length - current.len(); + self.handle_msg(msg, true); + return AppLayerResult::ok(); + } else { + consumed += trunc.skipped_length; + current = ¤t[trunc.skipped_length..]; + self.handle_msg(msg, true); + self.skip_request = 0; + continue; + } + } + + self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg); + self.handle_msg(msg, false); + consumed += current.len() - rem.len(); + current = rem; + } + Err(Err::Incomplete(_)) => { + SCLogDebug!( + "incomplete request: consumed {} needed {} (input len {})", + consumed, + (current.len() + 1), + input.len() + ); + return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); + } + Err(_) => { + self.set_event_notx(MQTTEvent::MalformedTraffic, false); + return AppLayerResult::err(); + } + } + } + + return AppLayerResult::ok(); + } + + fn parse_response(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { + let input = stream_slice.as_slice(); + let mut current = input; + + if input.is_empty() { + return AppLayerResult::ok(); + } + + let mut consumed = 0; + SCLogDebug!( + "skip_response {} input len {}", + self.skip_response, + current.len() + ); + if self.skip_response > 0 { + if input.len() <= self.skip_response { + self.skip_response -= current.len(); + return AppLayerResult::ok(); + } else { + current = &input[self.skip_response..]; + SCLogDebug!( + "skip end reached, skipping {} :{:?}", + self.skip_request, + current + ); + consumed = self.skip_response; + self.skip_response = 0; + } + } + + while !current.is_empty() { + SCLogDebug!("response: handling {}", current.len()); + match parse_message(current, self.protocol_version, self.max_msg_len) { + Ok((rem, msg)) => { + let _pdu = Frame::new( + flow, + &stream_slice, + input, + input.len() as i64, + MQTTFrameType::Pdu as u8, + ); + + SCLogDebug!("response msg {:?}", msg); + if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { + SCLogDebug!( + "found truncated with skipped {} current len {}", + trunc.skipped_length, + current.len() + ); + if trunc.skipped_length >= current.len() { + self.skip_response = trunc.skipped_length - current.len(); + self.handle_msg(msg, true); + SCLogDebug!("skip_response now {}", self.skip_response); + return AppLayerResult::ok(); + } else { + consumed += trunc.skipped_length; + current = ¤t[trunc.skipped_length..]; + self.handle_msg(msg, true); + self.skip_response = 0; + continue; + } + } + + self.mqtt_hdr_and_data_frames(flow, &stream_slice, &msg); + self.handle_msg(msg, true); + consumed += current.len() - rem.len(); + current = rem; + } + Err(Err::Incomplete(_)) => { + SCLogDebug!( + "incomplete response: consumed {} needed {} (input len {})", + consumed, + (current.len() + 1), + input.len() + ); + return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); + } + Err(_) => { + self.set_event_notx(MQTTEvent::MalformedTraffic, true); + return AppLayerResult::err(); + } + } + } + + return AppLayerResult::ok(); + } + + fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) { + tx.tx_data.set_event(event as u8); + } + + fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) { + let mut tx = MQTTTransaction::new_empty(if toclient { + Direction::ToClient + } else { + Direction::ToServer + }); + self.tx_id += 1; + tx.tx_id = self.tx_id; + if toclient { + tx.toclient = true; + } else { + tx.toserver = true; + } + tx.complete = true; + tx.tx_data.set_event(event as u8); + self.transactions.push_back(tx); + } + + fn mqtt_hdr_and_data_frames( + &mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTMessage, + ) { + let hdr = stream_slice.as_slice(); + //MQTT payload has a fixed header of 2 bytes + let _mqtt_hdr = Frame::new(flow, stream_slice, hdr, 2, MQTTFrameType::Header as u8); + SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr); + let rem_length = input.header.remaining_length as usize; + let data = &hdr[2..rem_length + 2]; + let _mqtt_data = Frame::new( + flow, + stream_slice, + data, + rem_length as i64, + MQTTFrameType::Data as u8, + ); + SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data); + } +} + +// C exports. + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_probing_parser( + _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8, +) -> AppProto { + let buf = build_slice!(input, input_len as usize); + match parse_fixed_header(buf) { + Ok((_, hdr)) => { + // reject unassigned message type + if hdr.message_type == MQTTTypeCode::UNASSIGNED { + return ALPROTO_FAILED; + } + // with 2 being the highest valid QoS level + if hdr.qos_level > 2 { + return ALPROTO_FAILED; + } + return ALPROTO_MQTT; + } + Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN, + Err(_) => ALPROTO_FAILED, + } +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_state_new( + _orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto, +) -> *mut std::os::raw::c_void { + let state = MQTTState::new(); + let boxed = Box::new(state); + return Box::into_raw(boxed) as *mut _; +} + +#[no_mangle] +pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) { + std::mem::drop(unsafe { Box::from_raw(state as *mut MQTTState) }); +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) { + let state = cast_pointer!(state, MQTTState); + state.free_tx(tx_id); +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_parse_request( + flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, + stream_slice: StreamSlice, _data: *const std::os::raw::c_void, +) -> AppLayerResult { + let state = cast_pointer!(state, MQTTState); + return state.parse_request(flow, stream_slice); +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_parse_response( + flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, + stream_slice: StreamSlice, _data: *const std::os::raw::c_void, +) -> AppLayerResult { + let state = cast_pointer!(state, MQTTState); + return state.parse_response(flow, stream_slice); +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_state_get_tx( + state: *mut std::os::raw::c_void, tx_id: u64, +) -> *mut std::os::raw::c_void { + let state = cast_pointer!(state, MQTTState); + match state.get_tx(tx_id) { + Some(tx) => { + return tx as *const _ as *mut _; + } + None => { + return std::ptr::null_mut(); + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 { + let state = cast_pointer!(state, MQTTState); + return state.tx_id; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_is_toclient( + tx: *const std::os::raw::c_void, +) -> std::os::raw::c_int { + let tx = cast_pointer!(tx, MQTTTransaction); + if tx.toclient { + return 1; + } + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress( + tx: *mut std::os::raw::c_void, direction: u8, +) -> std::os::raw::c_int { + let tx = cast_pointer!(tx, MQTTTransaction); + match direction.into() { + Direction::ToServer => { + if tx.complete || tx.toclient { + return 1; + } + } + Direction::ToClient => { + if tx.complete || tx.toserver { + return 1; + } + } + } + return 0; +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_get_logged( + _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, +) -> u32 { + let tx = cast_pointer!(tx, MQTTTransaction); + return tx.logged.get(); +} + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_tx_set_logged( + _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, logged: u32, +) { + let tx = cast_pointer!(tx, MQTTTransaction); + tx.logged.set(logged); +} + +// Parser name as a C style string. +const PARSER_NAME: &[u8] = b"mqtt\0"; + +export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction); +export_state_data_get!(rs_mqtt_get_state_data, MQTTState); + +#[no_mangle] +pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { + let default_port = CString::new("[1883]").unwrap(); + let max_msg_len = &mut MAX_MSG_LEN; + *max_msg_len = cfg_max_msg_len; + let parser = RustParser { + name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char, + default_port: default_port.as_ptr(), + ipproto: IPPROTO_TCP, + probe_ts: Some(rs_mqtt_probing_parser), + probe_tc: Some(rs_mqtt_probing_parser), + min_depth: 0, + max_depth: 16, + state_new: rs_mqtt_state_new, + state_free: rs_mqtt_state_free, + tx_free: rs_mqtt_state_tx_free, + parse_ts: rs_mqtt_parse_request, + parse_tc: rs_mqtt_parse_response, + get_tx_count: rs_mqtt_state_get_tx_count, + get_tx: rs_mqtt_state_get_tx, + tx_comp_st_ts: 1, + tx_comp_st_tc: 1, + tx_get_progress: rs_mqtt_tx_get_alstate_progress, + get_eventinfo: Some(MQTTEvent::get_event_info), + get_eventinfo_byid: Some(MQTTEvent::get_event_info_by_id), + localstorage_new: None, + localstorage_free: None, + get_tx_files: None, + get_tx_iterator: Some(crate::applayer::state_get_tx_iterator::<MQTTState, MQTTTransaction>), + get_tx_data: rs_mqtt_get_tx_data, + get_state_data: rs_mqtt_get_state_data, + apply_tx_config: None, + flags: 0, + truncate: None, + get_frame_id_by_name: Some(MQTTFrameType::ffi_id_from_name), + get_frame_name_by_id: Some(MQTTFrameType::ffi_name_from_id), + }; + + let ip_proto_str = CString::new("tcp").unwrap(); + + if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 { + let alproto = AppLayerRegisterProtocolDetection(&parser, 1); + ALPROTO_MQTT = alproto; + if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 { + let _ = AppLayerRegisterParser(&parser, alproto); + } + if let Some(val) = conf_get("app-layer.protocols.mqtt.max-tx") { + if let Ok(v) = val.parse::<usize>() { + MQTT_MAX_TX = v; + } else { + SCLogError!("Invalid value for mqtt.max-tx"); + } + } + } else { + SCLogDebug!("Protocol detector and parser disabled for MQTT."); + } +} diff --git a/rust/src/mqtt/mqtt_message.rs b/rust/src/mqtt/mqtt_message.rs new file mode 100644 index 0000000..390fc9e --- /dev/null +++ b/rust/src/mqtt/mqtt_message.rs @@ -0,0 +1,205 @@ +/* Copyright (C) 2020 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +// written by Sascha Steinbiss <sascha@steinbiss.name> + +use crate::mqtt::mqtt_property::*; +use crate::mqtt::parser::*; +use std::fmt; + +#[derive(Debug)] +pub struct MQTTMessage { + pub header: FixedHeader, + pub op: MQTTOperation, +} + +#[derive(Debug)] +pub enum MQTTOperation { + UNASSIGNED, + CONNECT(MQTTConnectData), + CONNACK(MQTTConnackData), + PUBLISH(MQTTPublishData), + PUBACK(MQTTMessageIdOnly), + PUBREC(MQTTMessageIdOnly), + PUBREL(MQTTMessageIdOnly), + PUBCOMP(MQTTMessageIdOnly), + SUBSCRIBE(MQTTSubscribeData), + SUBACK(MQTTSubackData), + UNSUBSCRIBE(MQTTUnsubscribeData), + UNSUBACK(MQTTUnsubackData), + AUTH(MQTTAuthData), + PINGREQ, + PINGRESP, + DISCONNECT(MQTTDisconnectData), + // TRUNCATED is special, representing a message that was not parsed + // in its entirety due to size constraints. There is no equivalent in + // the MQTT specification. + TRUNCATED(MQTTTruncatedData), +} + +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, FromPrimitive, Debug)] +pub enum MQTTTypeCode { + UNASSIGNED = 0, + CONNECT = 1, + CONNACK = 2, + PUBLISH = 3, + PUBACK = 4, + PUBREC = 5, + PUBREL = 6, + PUBCOMP = 7, + SUBSCRIBE = 8, + SUBACK = 9, + UNSUBSCRIBE = 10, + UNSUBACK = 11, + PINGREQ = 12, + PINGRESP = 13, + DISCONNECT = 14, + AUTH = 15, +} + +impl MQTTTypeCode { + pub fn to_lower_str(&self) -> String { + self.to_string().to_lowercase() + } +} + +impl fmt::Display for MQTTTypeCode { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::str::FromStr for MQTTTypeCode { + type Err = String; + fn from_str(s: &str) -> Result<Self, Self::Err> { + let su = s.to_uppercase(); + let su_slice: &str = &su; + match su_slice { + "CONNECT" => Ok(MQTTTypeCode::CONNECT), + "CONNACK" => Ok(MQTTTypeCode::CONNACK), + "PUBLISH" => Ok(MQTTTypeCode::PUBLISH), + "PUBACK" => Ok(MQTTTypeCode::PUBACK), + "PUBREC" => Ok(MQTTTypeCode::PUBREC), + "PUBREL" => Ok(MQTTTypeCode::PUBREL), + "PUBCOMP" => Ok(MQTTTypeCode::PUBCOMP), + "SUBSCRIBE" => Ok(MQTTTypeCode::SUBSCRIBE), + "SUBACK" => Ok(MQTTTypeCode::SUBACK), + "UNSUBSCRIBE" => Ok(MQTTTypeCode::UNSUBSCRIBE), + "UNSUBACK" => Ok(MQTTTypeCode::UNSUBACK), + "PINGREQ" => Ok(MQTTTypeCode::PINGREQ), + "PINGRESP" => Ok(MQTTTypeCode::PINGRESP), + "DISCONNECT" => Ok(MQTTTypeCode::DISCONNECT), + "AUTH" => Ok(MQTTTypeCode::AUTH), + _ => Err(format!("'{}' is not a valid value for MQTTTypeCode", s)), + } + } +} + +#[derive(Debug)] +pub struct MQTTConnectData { + pub protocol_string: String, + pub protocol_version: u8, + pub username_flag: bool, + pub password_flag: bool, + pub will_retain: bool, + pub will_qos: u8, + pub will_flag: bool, + pub clean_session: bool, + pub keepalive: u16, + pub client_id: String, + pub will_topic: Option<String>, + pub will_message: Option<Vec<u8>>, + pub username: Option<String>, + pub password: Option<Vec<u8>>, + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 + pub will_properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTConnackData { + pub return_code: u8, + pub session_present: bool, // MQTT 3.1.1 + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTPublishData { + pub topic: String, + pub message_id: Option<u16>, + pub message: Vec<u8>, + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTMessageIdOnly { + pub message_id: u16, + pub reason_code: Option<u8>, // MQTT 5.0 + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTSubscribeTopicData { + pub topic_name: String, + pub qos: u8, +} + +#[derive(Debug)] +pub struct MQTTSubscribeData { + pub message_id: u16, + pub topics: Vec<MQTTSubscribeTopicData>, + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTSubackData { + pub message_id: u16, + pub qoss: Vec<u8>, + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTUnsubscribeData { + pub message_id: u16, + pub topics: Vec<String>, + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTUnsubackData { + pub message_id: u16, + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 + pub reason_codes: Option<Vec<u8>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTAuthData { + pub reason_code: u8, // MQTT 5.0 + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTDisconnectData { + pub reason_code: Option<u8>, // MQTT 5.0 + pub properties: Option<Vec<MQTTProperty>>, // MQTT 5.0 +} + +#[derive(Debug)] +pub struct MQTTTruncatedData { + pub original_message_type: MQTTTypeCode, + pub skipped_length: usize, +} diff --git a/rust/src/mqtt/mqtt_property.rs b/rust/src/mqtt/mqtt_property.rs new file mode 100644 index 0000000..a716c4b --- /dev/null +++ b/rust/src/mqtt/mqtt_property.rs @@ -0,0 +1,269 @@ +/* Copyright (C) 2020 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +// written by Sascha Steinbiss <sascha@steinbiss.name> + +use crate::jsonbuilder::{JsonBuilder, JsonError}; +use crate::mqtt::parser::*; +use nom7::number::streaming::*; +use nom7::IResult; + +// TODO: It might be useful to also add detection on property presence and +// content, e.g. mqtt.property: AUTHENTICATION_METHOD. +#[derive(Debug, PartialEq, PartialOrd)] +#[allow(non_camel_case_types)] +pub enum MQTTProperty { + UNKNOWN, + PAYLOAD_FORMAT_INDICATOR(u8), + MESSAGE_EXPIRY_INTERVAL(u32), + CONTENT_TYPE(String), + RESPONSE_TOPIC(String), + CORRELATION_DATA(Vec<u8>), + SUBSCRIPTION_IDENTIFIER(u32), + SESSION_EXPIRY_INTERVAL(u32), + ASSIGNED_CLIENT_IDENTIFIER(String), + SERVER_KEEP_ALIVE(u16), + AUTHENTICATION_METHOD(String), + AUTHENTICATION_DATA(Vec<u8>), + REQUEST_PROBLEM_INFORMATION(u8), + WILL_DELAY_INTERVAL(u32), + REQUEST_RESPONSE_INFORMATION(u8), + RESPONSE_INFORMATION(String), + SERVER_REFERENCE(String), + REASON_STRING(String), + RECEIVE_MAXIMUM(u16), + TOPIC_ALIAS_MAXIMUM(u16), + TOPIC_ALIAS(u16), + MAXIMUM_QOS(u8), + RETAIN_AVAILABLE(u8), + USER_PROPERTY((String, String)), + MAXIMUM_PACKET_SIZE(u32), + WILDCARD_SUBSCRIPTION_AVAILABLE(u8), + SUBSCRIPTION_IDENTIFIER_AVAILABLE(u8), + SHARED_SUBSCRIPTION_AVAILABLE(u8), +} + +impl crate::mqtt::mqtt_property::MQTTProperty { + pub fn set_json(&self, js: &mut JsonBuilder) -> Result<(), JsonError> { + match self { + crate::mqtt::mqtt_property::MQTTProperty::PAYLOAD_FORMAT_INDICATOR(v) => { + js.set_uint("payload_format_indicator", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::MESSAGE_EXPIRY_INTERVAL(v) => { + js.set_uint("message_expiry_interval", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::CONTENT_TYPE(v) => { + js.set_string("content_type", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::RESPONSE_TOPIC(v) => { + js.set_string("response_topic", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::CORRELATION_DATA(v) => { + js.set_string_from_bytes("correlation_data", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::SUBSCRIPTION_IDENTIFIER(v) => { + js.set_uint("subscription_identifier", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::SESSION_EXPIRY_INTERVAL(v) => { + js.set_uint("session_expiry_interval", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(v) => { + js.set_string("assigned_client_identifier", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::SERVER_KEEP_ALIVE(v) => { + js.set_uint("server_keep_alive", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::AUTHENTICATION_METHOD(v) => { + js.set_string("authentication_method", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::AUTHENTICATION_DATA(v) => { + js.set_string_from_bytes("authentication_data", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::REQUEST_PROBLEM_INFORMATION(v) => { + js.set_uint("request_problem_information", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::WILL_DELAY_INTERVAL(v) => { + js.set_uint("will_delay_interval", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::REQUEST_RESPONSE_INFORMATION(v) => { + js.set_uint("request_response_information", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::RESPONSE_INFORMATION(v) => { + js.set_string("response_information", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::SERVER_REFERENCE(v) => { + js.set_string("server_reference", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::REASON_STRING(v) => { + js.set_string("reason_string", v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::RECEIVE_MAXIMUM(v) => { + js.set_uint("receive_maximum", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::TOPIC_ALIAS_MAXIMUM(v) => { + js.set_uint("topic_alias_maximum", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::TOPIC_ALIAS(v) => { + js.set_uint("topic_alias", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::MAXIMUM_QOS(v) => { + js.set_uint("maximum_qos", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::RETAIN_AVAILABLE(v) => { + js.set_uint("retain_available", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::USER_PROPERTY((k, v)) => { + js.set_string(k, v)?; + } + crate::mqtt::mqtt_property::MQTTProperty::MAXIMUM_PACKET_SIZE(v) => { + js.set_uint("maximum_packet_size", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::WILDCARD_SUBSCRIPTION_AVAILABLE(v) => { + js.set_uint("wildcard_subscription_available", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::SUBSCRIPTION_IDENTIFIER_AVAILABLE(v) => { + js.set_uint("subscription_identifier_available", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::SHARED_SUBSCRIPTION_AVAILABLE(v) => { + js.set_uint("shared_subscription_available", *v as u64)?; + } + crate::mqtt::mqtt_property::MQTTProperty::UNKNOWN => { + // pass + } + } + Ok(()) + } +} + +#[inline] +pub fn parse_qualified_property(input: &[u8], identifier: u32) -> IResult<&[u8], MQTTProperty> { + match identifier { + 1 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::PAYLOAD_FORMAT_INDICATOR(val))), + Err(e) => return Err(e), + }, + 2 => match be_u32(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::MESSAGE_EXPIRY_INTERVAL(val))), + Err(e) => return Err(e), + }, + 3 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::CONTENT_TYPE(val))), + Err(e) => return Err(e), + }, + 8 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::RESPONSE_TOPIC(val))), + Err(e) => return Err(e), + }, + 9 => match parse_mqtt_binary_data(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::CORRELATION_DATA(val))), + Err(e) => return Err(e), + }, + 11 => match parse_mqtt_variable_integer(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::SUBSCRIPTION_IDENTIFIER(val))), + Err(e) => return Err(e), + }, + 17 => match be_u32(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::SESSION_EXPIRY_INTERVAL(val))), + Err(e) => return Err(e), + }, + 18 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(val))), + Err(e) => return Err(e), + }, + 19 => match be_u16(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::SERVER_KEEP_ALIVE(val))), + Err(e) => return Err(e), + }, + 21 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::AUTHENTICATION_METHOD(val))), + Err(e) => return Err(e), + }, + 22 => match parse_mqtt_binary_data(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::AUTHENTICATION_DATA(val))), + Err(e) => return Err(e), + }, + 23 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::REQUEST_PROBLEM_INFORMATION(val))), + Err(e) => return Err(e), + }, + 24 => match be_u32(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::WILL_DELAY_INTERVAL(val))), + Err(e) => return Err(e), + }, + 25 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::REQUEST_RESPONSE_INFORMATION(val))), + Err(e) => return Err(e), + }, + 26 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::RESPONSE_INFORMATION(val))), + Err(e) => return Err(e), + }, + 28 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::SERVER_REFERENCE(val))), + Err(e) => return Err(e), + }, + 31 => match parse_mqtt_string(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::REASON_STRING(val))), + Err(e) => return Err(e), + }, + 33 => match be_u16(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::RECEIVE_MAXIMUM(val))), + Err(e) => return Err(e), + }, + 34 => match be_u16(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::TOPIC_ALIAS_MAXIMUM(val))), + Err(e) => return Err(e), + }, + 35 => match be_u16(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::TOPIC_ALIAS(val))), + Err(e) => return Err(e), + }, + 36 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::MAXIMUM_QOS(val))), + Err(e) => return Err(e), + }, + 37 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::RETAIN_AVAILABLE(val))), + Err(e) => return Err(e), + }, + 38 => match parse_mqtt_string_pair(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::USER_PROPERTY(val))), + Err(e) => return Err(e), + }, + 39 => match be_u32(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::MAXIMUM_PACKET_SIZE(val))), + Err(e) => return Err(e), + }, + 40 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::WILDCARD_SUBSCRIPTION_AVAILABLE(val))), + Err(e) => return Err(e), + }, + 41 => match be_u8(input) { + Ok((rem, val)) => { + return Ok((rem, MQTTProperty::SUBSCRIPTION_IDENTIFIER_AVAILABLE(val))) + } + Err(e) => return Err(e), + }, + 42 => match be_u8(input) { + Ok((rem, val)) => return Ok((rem, MQTTProperty::SHARED_SUBSCRIPTION_AVAILABLE(val))), + Err(e) => return Err(e), + }, + _ => { + return Ok((input, MQTTProperty::UNKNOWN)); + } + } +} diff --git a/rust/src/mqtt/parser.rs b/rust/src/mqtt/parser.rs new file mode 100644 index 0000000..8b1c8c5 --- /dev/null +++ b/rust/src/mqtt/parser.rs @@ -0,0 +1,1135 @@ +/* Copyright (C) 2020-2022 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +// written by Sascha Steinbiss <sascha@steinbiss.name> + +use crate::common::nom7::bits; +use crate::mqtt::mqtt_message::*; +use crate::mqtt::mqtt_property::*; +use nom7::bits::streaming::take as take_bits; +use nom7::bytes::complete::take; +use nom7::bytes::streaming::take_while_m_n; +use nom7::combinator::{complete, cond, verify}; +use nom7::multi::{length_data, many0, many1}; +use nom7::number::streaming::*; +use nom7::sequence::tuple; +use nom7::{Err, IResult, Needed}; +use num_traits::FromPrimitive; + +#[derive(Copy, Clone, Debug)] +pub struct FixedHeader { + pub message_type: MQTTTypeCode, + pub dup_flag: bool, + pub qos_level: u8, + pub retain: bool, + pub remaining_length: u32, +} + +// PARSING HELPERS + +#[inline] +fn is_continuation_bit_set(b: u8) -> bool { + return (b & 128) != 0; +} + +#[inline] +fn convert_varint(continued: Vec<u8>, last: u8) -> u32 { + let mut multiplier = 1u32; + let mut value = 0u32; + for val in &continued { + value += (val & 127) as u32 * multiplier; + multiplier *= 128u32; + } + value += (last & 127) as u32 * multiplier; + return value; +} + +// DATA TYPES + +#[inline] +pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> { + let (i, content) = length_data(be_u16)(i)?; + Ok((i, String::from_utf8_lossy(content).to_string())) +} + +#[inline] +pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> { + let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?; + let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?; + Ok(( + i, + convert_varint(continued_part.to_vec(), non_continued_part), + )) +} + +#[inline] +pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> { + let (i, data) = length_data(be_u16)(i)?; + Ok((i, data.to_vec())) +} + +#[inline] +pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> { + let (i, name) = parse_mqtt_string(i)?; + let (i, value) = parse_mqtt_string(i)?; + Ok((i, (name, value))) +} + +// MESSAGE COMPONENTS + +#[inline] +fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> { + let (i, identifier) = parse_mqtt_variable_integer(i)?; + let (i, value) = parse_qualified_property(i, identifier)?; + Ok((i, value)) +} + +#[inline] +fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> { + // do not try to parse anything when precondition is not met + if !precond { + return Ok((input, None)); + } + // parse properties length + match parse_mqtt_variable_integer(input) { + Ok((rem, proplen)) => { + if proplen == 0 { + // no properties + return Ok((rem, None)); + } + // parse properties + let mut props = Vec::<MQTTProperty>::new(); + let (rem, mut newrem) = take(proplen as usize)(rem)?; + while !newrem.is_empty() { + match parse_property(newrem) { + Ok((rem2, val)) => { + props.push(val); + newrem = rem2; + } + Err(e) => return Err(e), + } + } + return Ok((rem, Some(props))); + } + Err(e) => return Err(e), + } +} + +#[inline] +fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> { + bits(tuple(( + take_bits(4u8), + take_bits(1u8), + take_bits(2u8), + take_bits(1u8), + )))(i) +} + +#[inline] +fn parse_message_type(code: u8) -> MQTTTypeCode { + match code { + 0..=15 => { + if let Some(t) = FromPrimitive::from_u8(code) { + return t; + } else { + return MQTTTypeCode::UNASSIGNED; + } + } + _ => { + // unreachable state in parser: we only pass values parsed from take_bits!(4u8) + debug_validate_fail!("can't have message codes >15 from 4 bits"); + MQTTTypeCode::UNASSIGNED + } + } +} + +#[inline] +pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> { + let (i, flags) = parse_fixed_header_flags(i)?; + let (i, remaining_length) = parse_mqtt_variable_integer(i)?; + Ok(( + i, + FixedHeader { + message_type: parse_message_type(flags.0), + dup_flag: flags.1 != 0, + qos_level: flags.2, + retain: flags.3 != 0, + remaining_length, + }, + )) +} + +#[inline] +#[allow(clippy::type_complexity)] +fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> { + bits(tuple(( + take_bits(1u8), + take_bits(1u8), + take_bits(1u8), + take_bits(2u8), + take_bits(1u8), + take_bits(1u8), + take_bits(1u8), + )))(i) +} + +#[inline] +fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> { + let (i, protocol_string) = parse_mqtt_string(i)?; + let (i, protocol_version) = be_u8(i)?; + let (i, flags) = parse_connect_variable_flags(i)?; + let (i, keepalive) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, client_id) = parse_mqtt_string(i)?; + let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?; + let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?; + let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?; + let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?; + let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?; + Ok(( + i, + MQTTConnectData { + protocol_string, + protocol_version, + username_flag: flags.0 != 0, + password_flag: flags.1 != 0, + will_retain: flags.2 != 0, + will_qos: flags.3, + will_flag: flags.4 != 0, + clean_session: flags.5 != 0, + keepalive, + client_id, + will_topic, + will_message, + username, + password, + properties, + will_properties, + }, + )) +} + +#[inline] +fn parse_connack(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTConnackData> { + move |i: &[u8]| { + let (i, topic_name_compression_response) = be_u8(i)?; + let (i, return_code) = be_u8(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + Ok(( + i, + MQTTConnackData { + session_present: (topic_name_compression_response & 1) != 0, + return_code, + properties, + }, + )) + } +} + +#[inline] +fn parse_publish( + protocol_version: u8, + has_id: bool, +) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTPublishData> { + move |i: &[u8]| { + let (i, topic) = parse_mqtt_string(i)?; + let (i, message_id) = cond(has_id, be_u16)(i)?; + let (message, properties) = parse_properties(i, protocol_version == 5)?; + Ok(( + i, + MQTTPublishData { + topic, + message_id, + message: message.to_vec(), + properties, + }, + )) + } +} + +#[inline] +fn parse_msgidonly(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTMessageIdOnly> { + move |input: &[u8]| { + if protocol_version < 5 { + // before v5 we don't even have to care about reason codes + // and properties, lucky us + return parse_msgidonly_v3(input); + } + let remaining_len = input.len(); + match be_u16(input) { + Ok((rem, message_id)) => { + if remaining_len == 2 { + // from the spec: " The Reason Code and Property Length can be + // omitted if the Reason Code is 0x00 (Success) and there are + // no Properties. In this case the message has a Remaining + // Length of 2." + return Ok(( + rem, + MQTTMessageIdOnly { + message_id, + reason_code: Some(0), + properties: None, + }, + )); + } + match be_u8(rem) { + Ok((rem, reason_code)) => { + // We are checking for 3 because in that case we have a + // header plus reason code, but no properties. + if remaining_len == 3 { + // no properties + return Ok(( + rem, + MQTTMessageIdOnly { + message_id, + reason_code: Some(reason_code), + properties: None, + }, + )); + } + match parse_properties(rem, true) { + Ok((rem, properties)) => { + return Ok(( + rem, + MQTTMessageIdOnly { + message_id, + reason_code: Some(reason_code), + properties, + }, + )); + } + Err(e) => return Err(e), + } + } + Err(e) => return Err(e), + } + } + Err(e) => return Err(e), + } + } +} + +#[inline] +fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> { + let (i, message_id) = be_u16(i)?; + Ok(( + i, + MQTTMessageIdOnly { + message_id, + reason_code: None, + properties: None, + }, + )) +} + +#[inline] +fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> { + let (i, topic_name) = parse_mqtt_string(i)?; + let (i, qos) = be_u8(i)?; + Ok((i, MQTTSubscribeTopicData { topic_name, qos })) +} + +#[inline] +fn parse_subscribe(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubscribeData> { + move |i: &[u8]| { + let (i, message_id) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, topics) = many1(complete(parse_subscribe_topic))(i)?; + Ok(( + i, + MQTTSubscribeData { + message_id, + topics, + properties, + }, + )) + } +} + +#[inline] +fn parse_suback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTSubackData> { + move |i: &[u8]| { + let (i, message_id) = be_u16(i)?; + let (qoss, properties) = parse_properties(i, protocol_version == 5)?; + Ok(( + i, + MQTTSubackData { + message_id, + qoss: qoss.to_vec(), + properties, + }, + )) + } +} + +#[inline] +fn parse_unsubscribe( + protocol_version: u8, +) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubscribeData> { + move |i: &[u8]| { + let (i, message_id) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, topics) = many0(complete(parse_mqtt_string))(i)?; + Ok(( + i, + MQTTUnsubscribeData { + message_id, + topics, + properties, + }, + )) + } +} + +#[inline] +fn parse_unsuback(protocol_version: u8) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTUnsubackData> { + move |i: &[u8]| { + let (i, message_id) = be_u16(i)?; + let (i, properties) = parse_properties(i, protocol_version == 5)?; + let (i, reason_codes) = many0(complete(be_u8))(i)?; + Ok(( + i, + MQTTUnsubackData { + message_id, + properties, + reason_codes: Some(reason_codes), + }, + )) + } +} + +#[inline] +fn parse_disconnect( + remaining_len: usize, + protocol_version: u8, +) -> impl Fn(&[u8]) -> IResult<&[u8], MQTTDisconnectData> { + move |input: &[u8]| { + if protocol_version < 5 { + return Ok(( + input, + MQTTDisconnectData { + reason_code: None, + properties: None, + }, + )); + } + if remaining_len == 0 { + // The Reason Code and Property Length can be omitted if the Reason + // Code is 0x00 (Normal disconnection) and there are no Properties. + // In this case the DISCONNECT has a Remaining Length of 0. + return Ok(( + input, + MQTTDisconnectData { + reason_code: Some(0), + properties: None, + }, + )); + } + match be_u8(input) { + Ok((rem, reason_code)) => { + // We are checking for 1 because in that case we have a + // header plus reason code, but no properties. + if remaining_len == 1 { + // no properties + return Ok(( + rem, + MQTTDisconnectData { + reason_code: Some(0), + properties: None, + }, + )); + } + match parse_properties(rem, true) { + Ok((rem, properties)) => { + return Ok(( + rem, + MQTTDisconnectData { + reason_code: Some(reason_code), + properties, + }, + )); + } + Err(e) => return Err(e), + } + } + Err(e) => return Err(e), + } + } +} + +#[inline] +fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> { + let (i, reason_code) = be_u8(i)?; + let (i, properties) = parse_properties(i, true)?; + Ok(( + i, + MQTTAuthData { + reason_code, + properties, + }, + )) +} + +#[inline] +fn parse_remaining_message<'a>( + full: &'a [u8], + len: usize, + skiplen: usize, + header: FixedHeader, + message_type: MQTTTypeCode, + protocol_version: u8, +) -> impl Fn(&'a [u8]) -> IResult<&'a [u8], MQTTMessage> { + move |input: &'a [u8]| { + match message_type { + MQTTTypeCode::CONNECT => match parse_connect(input) { + Ok((_rem, conn)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::CONNECT(conn), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::CONNACK => match parse_connack(protocol_version)(input) { + Ok((_rem, connack)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::CONNACK(connack), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::PUBLISH => { + match parse_publish(protocol_version, header.qos_level > 0)(input) { + Ok((_rem, publish)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::PUBLISH(publish), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + } + } + MQTTTypeCode::PUBACK + | MQTTTypeCode::PUBREC + | MQTTTypeCode::PUBREL + | MQTTTypeCode::PUBCOMP => match parse_msgidonly(protocol_version)(input) { + Ok((_rem, msgidonly)) => { + let msg = MQTTMessage { + header, + op: match message_type { + MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly), + MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly), + MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly), + MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly), + _ => MQTTOperation::UNASSIGNED, + }, + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::SUBSCRIBE => match parse_subscribe(protocol_version)(input) { + Ok((_rem, subs)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::SUBSCRIBE(subs), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::SUBACK => match parse_suback(protocol_version)(input) { + Ok((_rem, suback)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::SUBACK(suback), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(protocol_version)(input) { + Ok((_rem, unsub)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::UNSUBSCRIBE(unsub), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::UNSUBACK => match parse_unsuback(protocol_version)(input) { + Ok((_rem, unsuback)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::UNSUBACK(unsuback), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => { + let msg = MQTTMessage { + header, + op: match message_type { + MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ, + MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP, + _ => MQTTOperation::UNASSIGNED, + }, + }; + Ok((&full[skiplen + len..], msg)) + } + MQTTTypeCode::DISCONNECT => match parse_disconnect(len, protocol_version)(input) { + Ok((_rem, disco)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::DISCONNECT(disco), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + MQTTTypeCode::AUTH => match parse_auth(input) { + Ok((_rem, auth)) => { + let msg = MQTTMessage { + header, + op: MQTTOperation::AUTH(auth), + }; + Ok((&full[skiplen + len..], msg)) + } + Err(e) => Err(e), + }, + // Unassigned message type code. Unlikely to happen with + // regular traffic, might be an indication for broken or + // crafted MQTT traffic. + _ => { + let msg = MQTTMessage { + header, + op: MQTTOperation::UNASSIGNED, + }; + return Ok((&full[skiplen + len..], msg)); + } + } + } +} + +pub fn parse_message( + input: &[u8], + protocol_version: u8, + max_msg_size: usize, +) -> IResult<&[u8], MQTTMessage> { + // Parse the fixed header first. This is identical across versions and can + // be between 2 and 5 bytes long. + match parse_fixed_header(input) { + Ok((fullrem, header)) => { + let len = header.remaining_length as usize; + // This is the length of the fixed header that we need to skip + // before returning the remainder. It is the sum of the length + // of the flag byte (1) and the length of the message length + // varint. + let skiplen = input.len() - fullrem.len(); + let message_type = header.message_type; + + // If the remaining length (message length) exceeds the specified + // limit, we return a special truncation message type, containing + // no parsed metadata but just the skipped length and the message + // type. + if len > max_msg_size { + let msg = MQTTMessage { + header, + op: MQTTOperation::TRUNCATED(MQTTTruncatedData { + original_message_type: message_type, + skipped_length: len + skiplen, + }), + }; + // In this case we return the full input buffer, since this is + // what the skipped_length value also refers to: header _and_ + // remaining length. + return Ok((input, msg)); + } + + // We have not exceeded the maximum length limit, but still do not + // have enough data in the input buffer to handle the full + // message. Signal this by returning an Incomplete IResult value. + if fullrem.len() < len { + return Err(Err::Incomplete(Needed::new(len - fullrem.len()))); + } + + // Parse the contents of the buffer into a single message. + // We reslice the remainder into the portion that we are interested + // in, according to the length we just parsed. This helps with the + // complete() parsers, where we would otherwise need to keep track + // of the already parsed length. + let rem = &fullrem[..len]; + + // Parse remaining message in buffer. We use complete() to ensure + // we do not request additional content in case of incomplete + // parsing, but raise an error instead as we should have all the + // data asked for in the header. + return complete(parse_remaining_message( + input, + len, + skiplen, + header, + message_type, + protocol_version, + ))(rem); + } + Err(err) => { + return Err(err); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use nom7::error::ErrorKind; + + fn test_mqtt_parse_variable_fail(buf0: &[u8]) { + let r0 = parse_mqtt_variable_integer(buf0); + match r0 { + Ok((_, _)) => { + panic!("Result should not have been ok."); + } + Err(Err::Error(err)) => { + assert_eq!(err.code, ErrorKind::Verify); + } + _ => { + panic!("Result should be an error."); + } + } + } + + fn test_mqtt_parse_variable_check(buf0: &[u8], expected: u32) { + let r0 = parse_mqtt_variable_integer(buf0); + match r0 { + Ok((_, val)) => { + assert_eq!(val, expected); + } + Err(_) => { + panic!("Result should have been ok."); + } + } + } + + #[test] + fn test_mqtt_parse_variable_integer_largest_input() { + test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0xFF]); + } + + #[test] + fn test_mqtt_parse_variable_integer_boundary() { + test_mqtt_parse_variable_fail(&[0xFF, 0xFF, 0xFF, 0x80]); + } + + #[test] + fn test_mqtt_parse_variable_integer_largest_valid() { + test_mqtt_parse_variable_check(&[0xFF, 0xFF, 0xFF, 0x7F], 268435455); + } + + #[test] + fn test_mqtt_parse_variable_integer_smallest_valid() { + test_mqtt_parse_variable_check(&[0x0], 0); + } + + #[test] + fn test_parse_fixed_header() { + let buf = [ + 0x30, /* Header Flags: 0x30, Message Type: Publish Message, QoS Level: At most once delivery (Fire and Forget) */ + 0xb7, 0x97, 0x02, /* Msg Len: 35767 */ + 0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0xa0, + ]; + + let result = parse_fixed_header(&buf); + match result { + Ok((remainder, message)) => { + assert_eq!(message.message_type, MQTTTypeCode::PUBLISH); + assert!(!message.dup_flag); + assert_eq!(message.qos_level, 0); + assert!(!message.retain); + assert_eq!(message.remaining_length, 35767); + assert_eq!(remainder.len(), 17); + } + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_properties() { + let buf = [ + 0x03, 0x21, 0x00, 0x14, /* Properties */ + 0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x10, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0xa0, + ]; + + let result = parse_properties(&buf, true); + match result { + Ok((remainder, message)) => { + let res = message.unwrap(); + assert_eq!(res[0], MQTTProperty::RECEIVE_MAXIMUM(20)); + assert_eq!(remainder.len(), 17); + } + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + #[test] + fn test_parse_connect() { + let buf = [ + 0x00, 0x04, /* Protocol Name Length: 4 */ + 0x4d, 0x51, 0x54, 0x54, /* Protocol Name: MQTT */ + 0x05, /* Version: MQTT v5.0 (5) */ + 0xc2, /*Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */ + 0x00, 0x3c, /* Keep Alive: 60 */ + 0x03, 0x21, 0x00, 0x14, /* Properties */ + 0x00, 0x00, /* Client ID Length: 0 */ + 0x00, 0x04, /* User Name Length: 4 */ + 0x75, 0x73, 0x65, 0x72, /* User Name: user */ + 0x00, 0x04, /* Password Length: 4 */ + 0x71, 0x61, 0x71, 0x73, /* Password: pass */ + ]; + + let result = parse_connect(&buf); + match result { + Ok((remainder, message)) => { + assert_eq!(message.protocol_string, "MQTT"); + assert_eq!(message.protocol_version, 5); + assert!(message.username_flag); + assert!(message.password_flag); + assert!(!message.will_retain); + assert_eq!(message.will_qos, 0); + assert!(!message.will_flag); + assert!(message.clean_session); + assert_eq!(message.keepalive, 60); + assert_eq!(remainder.len(), 0); + } + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_connack() { + let buf = [ + 0x00, /* Acknowledge Flags: 0x00 (0000 000. = Reserved: Not set )(.... ...0 = Session Present: Not set) */ + 0x00, /* Reason Code: Success (0) */ + 0x2f, /* Total Length: 47 */ + 0x22, /* ID: Topic Alias Maximum (0x22) */ + 0x00, 0x0a, /* Value: 10 */ + 0x12, /* ID: Assigned Client Identifier (0x12) */ + 0x00, 0x29, /* Length: 41 */ + 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x31, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, 0x30, 0x2d, + 0x30, 0x38, 0x45, 0x33, 0x2d, 0x33, 0x42, 0x41, 0x31, 0x2d, 0x32, 0x45, 0x39, 0x37, + 0x2d, 0x45, 0x39, 0x41, 0x30, 0x42, 0x34, 0x30, 0x36, 0x34, 0x42, 0x46, + 0x35, /* 41 byte Value: auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5 */ + ]; + let client_identifier = "auto-1B43E800-08E3-3BA1-2E97-E9A0B4064BF5"; + + let result = parse_connack(5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + let props = message.properties.unwrap(); + assert_eq!(props[0], MQTTProperty::TOPIC_ALIAS_MAXIMUM(10)); + assert_eq!( + props[1], + MQTTProperty::ASSIGNED_CLIENT_IDENTIFIER(client_identifier.to_string()) + ); + assert_eq!(message.return_code, 0); + assert!(!message.session_present); + assert_eq!(remainder.len(), 0); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_publish() { + let buf = [ + 0x00, 06, /* Topic Length: 6 */ + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */ + 0x00, 0x01, /* Message Identifier: 1 */ + 0x00, /* Properties 6 */ + 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, + ]; + + let result = parse_publish(5, true); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + let message_id = message.message_id.unwrap(); + assert_eq!(message.topic, "topicX"); + assert_eq!(message_id, 1); + assert_eq!(remainder.len(), 13); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_msgidonly_v3() { + let buf = [ + 0x00, 01, /* Message Identifier: 1 */ + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, + 0x33, 0x45, 0x38, 0x30, + ]; + + let result = parse_msgidonly(3); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + assert_eq!(message.message_id, 1); + assert_eq!(remainder.len(), 18); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_msgidonly_v5() { + let buf = [ + 0x00, 01, /* Message Identifier: 1 */ + 0x00, /* Reason Code: 0 */ + 0x00, /* Properties */ + 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, + ]; + + let result = parse_msgidonly(5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + let reason_code = message.reason_code.unwrap(); + assert_eq!(message.message_id, 1); + assert_eq!(reason_code, 0); + assert_eq!(remainder.len(), 12); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_subscribe() { + let buf = [ + 0x00, 0x01, /* Message Identifier: 1 */ + 0x00, /* Properties 6 */ + 0x00, 0x06, /* Topic Length: 6 */ + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */ + 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */ + 0x00, 0x06, /* Topic Length: 6 */ + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x59, /* Topic: topicY */ + 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */ + 0x00, 0x61, 0x75, 0x74, 0x6f, 0x2d, 0x42, 0x34, 0x33, 0x45, 0x38, 0x30, + ]; + + let result = parse_subscribe(5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + assert_eq!(message.topics[0].topic_name, "topicX"); + assert_eq!(message.topics[1].topic_name, "topicY"); + assert_eq!(message.topics[0].qos, 0); + assert_eq!(message.message_id, 1); + assert_eq!(remainder.len(), 12); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + #[test] + fn test_parse_suback() { + let buf = [ + 0x00, 0x01, /* Message Identifier: 1 */ + 0x00, /* Properties 6 */ + 0x00, 0x00, /* Topic Length: 6 */ + ]; + + let result = parse_suback(5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + assert_eq!(message.qoss[0], 0); + assert_eq!(message.message_id, 1); + assert_eq!(remainder.len(), 3); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + #[test] + fn test_parse_unsubscribe() { + let buf = [ + 0x00, 0x01, /* Message Identifier: 1 */ + 0x00, /* Properties 6 */ + 0x00, 0x06, /* Topic Length: 6 */ + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x58, /* Topic: topicX */ + 0x00, /*Subscription Options: 0x00, Retain Handling: Send msgs at subscription time, QoS: At most once delivery (Fire and Forget) */ + ]; + + let result = parse_unsubscribe(5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + assert_eq!(message.topics[0], "topicX"); + assert_eq!(message.message_id, 1); + assert_eq!(remainder.len(), 1); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_unsuback() { + let buf = [ + 0x00, 0x01, /* Message Identifier: 1 */ + 0x00, /* Properties 6 */ + 0x00, /* Reason Code */ + ]; + + let result = parse_unsuback(5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + let reason_codes = message.reason_codes.unwrap(); + assert_eq!(reason_codes[0], 0); + assert_eq!(message.message_id, 1); + assert_eq!(remainder.len(), 0); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_disconnect() { + let buf = [ + 0xe0, /* Reason: 0 */ + 0x00, /* Message Identifier: 1 */ + ]; + + let result = parse_disconnect(0, 5); + let input = result(&buf); + match input { + Ok((remainder, message)) => { + let reason_code = message.reason_code.unwrap(); + assert_eq!(reason_code, 0); + assert_eq!(remainder.len(), 2); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } + + #[test] + fn test_parse_message() { + let buf = [ + 0x10, /* Message Identifier: 1 */ + 0x2f, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05, + 0xc2, /* Connect Flags: 0xc2, User Name Flag, Password Flag, QoS Level: At most once delivery (Fire and Forget), Clean Session Flag */ + 0x00, 0x3c, 0x03, 0x21, 0x00, 0x14, /* Properties */ + 0x00, 0x13, 0x6d, 0x79, 0x76, 0x6f, 0x69, 0x63, 0x65, 0x69, 0x73, 0x6d, 0x79, 0x70, + 0x61, 0x73, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x00, 0x04, 0x75, 0x73, 0x65, 0x72, 0x00, + 0x04, 0x70, 0x61, 0x73, 0x73, + ]; + + let result = parse_message(&buf, 5, 40); + match result { + Ok((remainder, message)) => { + assert_eq!(message.header.message_type, MQTTTypeCode::CONNECT); + assert!(!message.header.dup_flag); + assert_eq!(message.header.qos_level, 0); + assert!(!message.header.retain); + assert_eq!(remainder.len(), 49); + } + + Err(Err::Incomplete(_)) => { + panic!("Result should not have been incomplete."); + } + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + panic!("Result should not be an error: {:?}.", err); + } + } + } +} |