1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use futures_io::AsyncWrite;
use crate::{encode, MAX_DATA_LEN, U16_HEX_BYTES};
pin_project_lite::pin_project! {
/// An implementor of [`Write`][io::Write] which passes all input to an inner `Write` in packet line data encoding,
/// one line per `write(…)` call or as many lines as it takes if the data doesn't fit into the maximum allowed line length.
pub struct Writer<T> {
#[pin]
inner: encode::LineWriter<'static, T>,
state: State,
}
}
enum State {
Idle,
WriteData(usize),
}
impl<T: AsyncWrite + Unpin> Writer<T> {
/// Create a new instance from the given `write`
pub fn new(write: T) -> Self {
Writer {
inner: encode::LineWriter::new(write, &[], &[]),
state: State::Idle,
}
}
/// Return the inner writer, consuming self.
pub fn into_inner(self) -> T {
self.inner.into_inner()
}
/// Return a mutable reference to the inner writer, useful if packet lines should be serialized directly.
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner.writer
}
}
/// Non-IO methods
impl<T> Writer<T> {
/// If called, each call to [`write()`][io::Write::write()] will write bytes as is.
pub fn enable_binary_mode(&mut self) {
self.inner.suffix = &[];
}
/// If called, each call to [`write()`][io::Write::write()] will write the input as text, appending a trailing newline
/// if needed before writing.
pub fn enable_text_mode(&mut self) {
self.inner.suffix = &[b'\n'];
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for Writer<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let mut this = self.project();
loop {
match this.state {
State::Idle => {
if buf.is_empty() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"empty packet lines are not permitted as '0004' is invalid",
)));
}
*this.state = State::WriteData(0)
}
State::WriteData(written) => {
while *written != buf.len() {
let data = &buf[*written..*written + (buf.len() - *written).min(MAX_DATA_LEN)];
let n = futures_lite::ready!(this.inner.as_mut().poll_write(cx, data))?;
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
*written += n;
*written -= U16_HEX_BYTES + this.inner.suffix.len();
}
*this.state = State::Idle;
return Poll::Ready(Ok(buf.len()));
}
}
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
}
}
|