1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
extern crate env_logger;
/// An example of using channels to transfer data between three parts of some system.
///
/// A WebSocket server echoes data back to a client and tees that data to a logging system.
/// A WebSocket client sends some data do the server.
/// A worker thread stores data as a log and sends that data back to the main program when the
/// WebSocket server has finished receiving data.
///
/// This example demonstrates how to use threads, channels, and WebSocket handlers to create a
/// complex system from simple, composable parts.
extern crate ws;
use std::sync::mpsc::Sender as ThreadOut;
use std::sync::mpsc::channel;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use ws::{connect, listen, CloseCode, Handler, Handshake, Message, Result, Sender};
fn main() {
// Setup logging
env_logger::init();
// Data to be sent across WebSockets and channels
let data = vec![1, 2, 3, 4, 5];
let (final_in, final_out) = channel();
let (log_in, log_out) = channel();
// WebSocket connection handler for the server connection
struct Server {
ws: Sender,
log: ThreadOut<String>,
}
impl Handler for Server {
fn on_message(&mut self, msg: Message) -> Result<()> {
println!("Server got message '{}'. ", msg);
// log it
self.log.send(msg.to_string()).unwrap();
// echo it back
self.ws.send(msg)
}
fn on_close(&mut self, _: CloseCode, _: &str) {
self.ws.shutdown().unwrap()
}
}
// Server thread
let server = thread::Builder::new()
.name("server".to_owned())
.spawn(move || {
listen("127.0.0.1:3012", |out| {
Server {
ws: out,
// we need to clone the channel because
// in theory, there could be many active connections
log: log_in.clone(),
}
}).unwrap()
})
.unwrap();
// Give the server a little time to get going
sleep(Duration::from_millis(10));
// WebSocket connection handler for the client connection
struct Client {
out: Sender,
ind: usize,
data: Vec<u32>,
}
impl Client {
// Core business logic for client, keeping it DRY
fn increment(&mut self) -> Result<()> {
if let Some(num) = self.data.get(self.ind) {
// Advance the index
self.ind += 1;
// Send the number to the server
self.out.send(num.to_string())
} else {
// All of the data has been sent, let's close
self.out.close(CloseCode::Normal)
}
}
}
impl Handler for Client {
fn on_open(&mut self, _: Handshake) -> Result<()> {
self.increment()
}
fn on_message(&mut self, msg: Message) -> Result<()> {
println!("Client got message '{}'. ", msg);
self.increment()
}
}
// We need to clone the data into the client, making two versions we will compare for
// consistency later
let client_data = data.clone();
// Client thread
let client = thread::Builder::new()
.name("client".to_owned())
.spawn(move || {
connect("ws://127.0.0.1:3012", |out| {
Client {
out,
ind: 0,
// we need to clone again because
// in theory, there could be many client connections sending off the data
data: client_data.clone(),
}
}).unwrap()
})
.unwrap();
// Logger thread
let logger = thread::Builder::new()
.name("logger".to_owned())
.spawn(move || {
// Make a new vector to store the numbers
let mut log: Vec<u32> = Vec::new();
// Receive data and push it to the log, this only works if we have one WebSocket
// connection, otherwise the log would have data from all connections. But for our example,
// we know we only have one :)
while let Ok(string) = log_out.recv() {
println!("Logger is storing {}", string);
log.push(string.parse().unwrap());
}
println!("Logger sending final log result.");
final_in.send(log).unwrap();
})
.unwrap();
// Wait for the worker threads to finish what they are doing
let _ = server.join();
let _ = client.join();
let _ = logger.join();
// Get the result from the logger and check that it is correct
let final_data = final_out.recv().unwrap();
println!("In: {:?}", data);
println!("Out: {:?}", final_data);
assert_eq!(final_data, data);
println!("All done.")
}
|