// Copyright (C) 2021, Cloudflare, Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::events::EventData; use crate::events::EventImportance; use crate::events::EventType; use crate::events::Eventable; use crate::events::ExData; /// A helper object specialized for streaming JSON-serialized qlog to a /// [`Write`] trait. /// /// The object is responsible for the `Qlog` object that contains the /// provided `Trace`. /// /// Serialization is progressively driven by method calls; once log streaming /// is started, `event::Events` can be written using `add_event()`. /// /// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html use super::*; #[derive(PartialEq, Eq, Debug)] pub enum StreamerState { Initial, Ready, Finished, } pub struct QlogStreamer { start_time: std::time::Instant, writer: Box, qlog: QlogSeq, state: StreamerState, log_level: EventImportance, } impl QlogStreamer { /// Creates a [QlogStreamer] object. /// /// It owns a [QlogSeq] object that contains the provided [TraceSeq] /// containing [Event]s. /// /// All serialization will be written to the provided [`Write`] using the /// JSON-SEQ format. /// /// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html #[allow(clippy::too_many_arguments)] pub fn new( qlog_version: String, title: Option, description: Option, summary: Option, start_time: std::time::Instant, trace: TraceSeq, log_level: EventImportance, writer: Box, ) -> Self { let qlog = QlogSeq { qlog_version, qlog_format: "JSON-SEQ".to_string(), title, description, summary, trace, }; QlogStreamer { start_time, writer, qlog, state: StreamerState::Initial, log_level, } } /// Starts qlog streaming serialization. /// /// This writes out the JSON-SEQ-serialized form of all initial qlog /// information. [Event]s are separately appended using [add_event()], /// [add_event_with_instant()], [add_event_now()], /// [add_event_data_with_instant()], or [add_event_data_now()]. /// /// [add_event()]: #method.add_event /// [add_event_with_instant()]: #method.add_event_with_instant /// [add_event_now()]: #method.add_event_now /// [add_event_data_with_instant()]: #method.add_event_data_with_instant /// [add_event_data_now()]: #method.add_event_data_now pub fn start_log(&mut self) -> Result<()> { if self.state != StreamerState::Initial { return Err(Error::Done); } self.writer.as_mut().write_all(b"")?; serde_json::to_writer(self.writer.as_mut(), &self.qlog) .map_err(|_| Error::Done)?; self.writer.as_mut().write_all(b"\n")?; self.state = StreamerState::Ready; Ok(()) } /// Finishes qlog streaming serialization. /// /// After this is called, no more serialization will occur. pub fn finish_log(&mut self) -> Result<()> { if self.state == StreamerState::Initial || self.state == StreamerState::Finished { return Err(Error::InvalidState); } self.state = StreamerState::Finished; self.writer.as_mut().flush()?; Ok(()) } /// Writes a serializable to a JSON-SEQ record using /// [std::time::Instant::now()]. pub fn add_event_now( &mut self, event: E, ) -> Result<()> { let now = std::time::Instant::now(); self.add_event_with_instant(event, now) } /// Writes a serializable to a JSON-SEQ record using the provided /// [std::time::Instant]. pub fn add_event_with_instant( &mut self, mut event: E, now: std::time::Instant, ) -> Result<()> { if self.state != StreamerState::Ready { return Err(Error::InvalidState); } if !event.importance().is_contained_in(&self.log_level) { return Err(Error::Done); } let dur = if cfg!(test) { std::time::Duration::from_secs(0) } else { now.duration_since(self.start_time) }; let rel_time = dur.as_secs_f32() * 1000.0; event.set_time(rel_time); self.add_event(event) } /// Writes an [Event] based on the provided [EventData] to a JSON-SEQ record /// at time [std::time::Instant::now()]. pub fn add_event_data_now(&mut self, event_data: EventData) -> Result<()> { self.add_event_data_ex_now(event_data, Default::default()) } /// Writes an [Event] based on the provided [EventData] and [ExData] to a /// JSON-SEQ record at time [std::time::Instant::now()]. pub fn add_event_data_ex_now( &mut self, event_data: EventData, ex_data: ExData, ) -> Result<()> { let now = std::time::Instant::now(); self.add_event_data_ex_with_instant(event_data, ex_data, now) } /// Writes an [Event] based on the provided [EventData] and /// [std::time::Instant] to a JSON-SEQ record. pub fn add_event_data_with_instant( &mut self, event_data: EventData, now: std::time::Instant, ) -> Result<()> { self.add_event_data_ex_with_instant(event_data, Default::default(), now) } /// Writes an [Event] based on the provided [EventData], [ExData], and /// [std::time::Instant] to a JSON-SEQ record. pub fn add_event_data_ex_with_instant( &mut self, event_data: EventData, ex_data: ExData, now: std::time::Instant, ) -> Result<()> { if self.state != StreamerState::Ready { return Err(Error::InvalidState); } let ty = EventType::from(&event_data); if !EventImportance::from(ty).is_contained_in(&self.log_level) { return Err(Error::Done); } let dur = if cfg!(test) { std::time::Duration::from_secs(0) } else { now.duration_since(self.start_time) }; let rel_time = dur.as_secs_f32() * 1000.0; let event = Event::with_time_ex(rel_time, event_data, ex_data); self.add_event(event) } /// Writes a JSON-SEQ-serialized [Event] using the provided [Event]. pub fn add_event( &mut self, event: E, ) -> Result<()> { if self.state != StreamerState::Ready { return Err(Error::InvalidState); } if !event.importance().is_contained_in(&self.log_level) { return Err(Error::Done); } self.writer.as_mut().write_all(b"")?; serde_json::to_writer(self.writer.as_mut(), &event) .map_err(|_| Error::Done)?; self.writer.as_mut().write_all(b"\n")?; Ok(()) } /// Returns the writer. #[allow(clippy::borrowed_box)] pub fn writer(&self) -> &Box { &self.writer } pub fn start_time(&self) -> std::time::Instant { self.start_time } } impl Drop for QlogStreamer { fn drop(&mut self) { let _ = self.finish_log(); } } #[cfg(test)] mod tests { use std::collections::BTreeMap; use super::*; use crate::events::quic; use crate::events::quic::QuicFrame; use crate::events::RawInfo; use smallvec::smallvec; use testing::*; use serde_json::json; #[test] fn serialization_states() { let v: Vec = Vec::new(); let buff = std::io::Cursor::new(v); let writer = Box::new(buff); let trace = make_trace_seq(); let pkt_hdr = make_pkt_hdr(quic::PacketType::Handshake); let raw = Some(RawInfo { length: Some(1251), payload_length: Some(1224), data: None, }); let frame1 = QuicFrame::Stream { stream_id: 40, offset: 40, length: 400, fin: Some(true), raw: None, }; let event_data1 = EventData::PacketSent(quic::PacketSent { header: pkt_hdr.clone(), frames: Some(smallvec![frame1]), is_coalesced: None, retry_token: None, stateless_reset_token: None, supported_versions: None, raw: raw.clone(), datagram_id: None, send_at_time: None, trigger: None, }); let ev1 = Event::with_time(0.0, event_data1); let frame2 = QuicFrame::Stream { stream_id: 0, offset: 0, length: 100, fin: Some(true), raw: None, }; let frame3 = QuicFrame::Stream { stream_id: 0, offset: 0, length: 100, fin: Some(true), raw: None, }; let event_data2 = EventData::PacketSent(quic::PacketSent { header: pkt_hdr.clone(), frames: Some(smallvec![frame2]), is_coalesced: None, retry_token: None, stateless_reset_token: None, supported_versions: None, raw: raw.clone(), datagram_id: None, send_at_time: None, trigger: None, }); let ev2 = Event::with_time(0.0, event_data2); let event_data3 = EventData::PacketSent(quic::PacketSent { header: pkt_hdr, frames: Some(smallvec![frame3]), is_coalesced: None, retry_token: None, stateless_reset_token: Some("reset_token".to_string()), supported_versions: None, raw, datagram_id: None, send_at_time: None, trigger: None, }); let ev3 = Event::with_time(0.0, event_data3); let mut s = streamer::QlogStreamer::new( "version".to_string(), Some("title".to_string()), Some("description".to_string()), None, std::time::Instant::now(), trace, EventImportance::Base, writer, ); // Before the log is started all other operations should fail. assert!(matches!(s.add_event(ev2.clone()), Err(Error::InvalidState))); assert!(matches!(s.finish_log(), Err(Error::InvalidState))); // Start log and add a simple event. assert!(matches!(s.start_log(), Ok(()))); assert!(matches!(s.add_event(ev1), Ok(()))); // Add some more events. assert!(matches!(s.add_event(ev2), Ok(()))); assert!(matches!(s.add_event(ev3.clone()), Ok(()))); // Adding an event with an external time should work too. // For tests, it will resolve to 0 but we care about proving the API // here, not timing specifics. let now = std::time::Instant::now(); assert!(matches!(s.add_event_with_instant(ev3, now), Ok(()))); assert!(matches!(s.finish_log(), Ok(()))); let r = s.writer(); #[allow(clippy::borrowed_box)] let w: &Box>> = unsafe { std::mem::transmute(r) }; let log_string = r#"{"qlog_version":"version","qlog_format":"JSON-SEQ","title":"title","description":"description","trace":{"vantage_point":{"type":"server"},"title":"Quiche qlog trace","description":"Quiche qlog trace description","configuration":{"time_offset":0.0}}} {"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":40,"offset":40,"length":400,"fin":true}]}} {"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":0,"offset":0,"length":100,"fin":true}]}} {"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"stateless_reset_token":"reset_token","raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":0,"offset":0,"length":100,"fin":true}]}} {"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"stateless_reset_token":"reset_token","raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":0,"offset":0,"length":100,"fin":true}]}} "#; let written_string = std::str::from_utf8(w.as_ref().get_ref()).unwrap(); assert_eq!(log_string, written_string); } #[test] fn stream_json_event() { let data = json!({"foo": "Bar", "hello": 123}); let ev = events::JsonEvent { time: 0.0, importance: events::EventImportance::Core, name: "jsonevent:sample".into(), data, }; let v: Vec = Vec::new(); let buff = std::io::Cursor::new(v); let writer = Box::new(buff); let trace = make_trace_seq(); let mut s = streamer::QlogStreamer::new( "version".to_string(), Some("title".to_string()), Some("description".to_string()), None, std::time::Instant::now(), trace, EventImportance::Base, writer, ); assert!(matches!(s.start_log(), Ok(()))); assert!(matches!(s.add_event(ev), Ok(()))); assert!(matches!(s.finish_log(), Ok(()))); let r = s.writer(); #[allow(clippy::borrowed_box)] let w: &Box>> = unsafe { std::mem::transmute(r) }; let log_string = r#"{"qlog_version":"version","qlog_format":"JSON-SEQ","title":"title","description":"description","trace":{"vantage_point":{"type":"server"},"title":"Quiche qlog trace","description":"Quiche qlog trace description","configuration":{"time_offset":0.0}}} {"time":0.0,"name":"jsonevent:sample","data":{"foo":"Bar","hello":123}} "#; let written_string = std::str::from_utf8(w.as_ref().get_ref()).unwrap(); assert_eq!(log_string, written_string); } #[test] fn stream_data_ex() { let v: Vec = Vec::new(); let buff = std::io::Cursor::new(v); let writer = Box::new(buff); let trace = make_trace_seq(); let pkt_hdr = make_pkt_hdr(quic::PacketType::Handshake); let raw = Some(RawInfo { length: Some(1251), payload_length: Some(1224), data: None, }); let frame1 = QuicFrame::Stream { stream_id: 40, offset: 40, length: 400, fin: Some(true), raw: None, }; let event_data1 = EventData::PacketSent(quic::PacketSent { header: pkt_hdr.clone(), frames: Some(smallvec![frame1]), is_coalesced: None, retry_token: None, stateless_reset_token: None, supported_versions: None, raw: raw.clone(), datagram_id: None, send_at_time: None, trigger: None, }); let j1 = json!({"foo": "Bar", "hello": 123}); let j2 = json!({"baz": [1,2,3,4]}); let mut ex_data = BTreeMap::new(); ex_data.insert("first".to_string(), j1); ex_data.insert("second".to_string(), j2); let ev1 = Event::with_time_ex(0.0, event_data1, ex_data); let frame2 = QuicFrame::Stream { stream_id: 1, offset: 0, length: 100, fin: Some(true), raw: None, }; let event_data2 = EventData::PacketSent(quic::PacketSent { header: pkt_hdr.clone(), frames: Some(smallvec![frame2]), is_coalesced: None, retry_token: None, stateless_reset_token: None, supported_versions: None, raw: raw.clone(), datagram_id: None, send_at_time: None, trigger: None, }); let ev2 = Event::with_time(0.0, event_data2); let mut s = streamer::QlogStreamer::new( "version".to_string(), Some("title".to_string()), Some("description".to_string()), None, std::time::Instant::now(), trace, EventImportance::Base, writer, ); assert!(matches!(s.start_log(), Ok(()))); assert!(matches!(s.add_event(ev1), Ok(()))); assert!(matches!(s.add_event(ev2), Ok(()))); assert!(matches!(s.finish_log(), Ok(()))); let r = s.writer(); #[allow(clippy::borrowed_box)] let w: &Box>> = unsafe { std::mem::transmute(r) }; let log_string = r#"{"qlog_version":"version","qlog_format":"JSON-SEQ","title":"title","description":"description","trace":{"vantage_point":{"type":"server"},"title":"Quiche qlog trace","description":"Quiche qlog trace description","configuration":{"time_offset":0.0}}} {"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":40,"offset":40,"length":400,"fin":true}]},"first":{"foo":"Bar","hello":123},"second":{"baz":[1,2,3,4]}} {"time":0.0,"name":"transport:packet_sent","data":{"header":{"packet_type":"handshake","packet_number":0,"version":"1","scil":8,"dcil":8,"scid":"7e37e4dcc6682da8","dcid":"36ce104eee50101c"},"raw":{"length":1251,"payload_length":1224},"frames":[{"frame_type":"stream","stream_id":1,"offset":0,"length":100,"fin":true}]}} "#; let written_string = std::str::from_utf8(w.as_ref().get_ref()).unwrap(); assert_eq!(log_string, written_string); } }