1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
use std::{
io::{self, BufReader},
net::TcpStream,
thread,
};
use crossbeam_channel::{bounded, Receiver, Sender};
use crate::{
stdio::{make_io_threads, IoThreads},
Message,
};
pub(crate) fn socket_transport(
stream: TcpStream,
) -> (Sender<Message>, Receiver<Message>, IoThreads) {
let (reader_receiver, reader) = make_reader(stream.try_clone().unwrap());
let (writer_sender, writer) = make_write(stream);
let io_threads = make_io_threads(reader, writer);
(writer_sender, reader_receiver, io_threads)
}
fn make_reader(stream: TcpStream) -> (Receiver<Message>, thread::JoinHandle<io::Result<()>>) {
let (reader_sender, reader_receiver) = bounded::<Message>(0);
let reader = thread::spawn(move || {
let mut buf_read = BufReader::new(stream);
while let Some(msg) = Message::read(&mut buf_read).unwrap() {
let is_exit = matches!(&msg, Message::Notification(n) if n.is_exit());
reader_sender.send(msg).unwrap();
if is_exit {
break;
}
}
Ok(())
});
(reader_receiver, reader)
}
fn make_write(mut stream: TcpStream) -> (Sender<Message>, thread::JoinHandle<io::Result<()>>) {
let (writer_sender, writer_receiver) = bounded::<Message>(0);
let writer = thread::spawn(move || {
writer_receiver.into_iter().try_for_each(|it| it.write(&mut stream)).unwrap();
Ok(())
});
(writer_sender, writer)
}
|