Created
December 1, 2023 05:46
-
-
Save zew13/b5bccd89cf7cd09032a7a89cfa10fa33 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{borrow::Cow, ops::ControlFlow}; | |
use client::Client; | |
use futures_util::{SinkExt, StreamExt}; | |
use t3::{ | |
axum::extract::{ | |
ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade}, | |
Path, | |
}, | |
IntoResponse, | |
}; | |
pub async fn get( | |
ws: WebSocketUpgrade, | |
client: Client, | |
Path((uid, ver)): Path<(String, String)>, | |
) -> t3::Result<impl IntoResponse> { | |
let uid = u64::from_str_radix(&uid, 36).unwrap_or(0); | |
client.uid_logined(uid).await?; | |
let ver = u64::from_str_radix(&ver, 36).unwrap_or(0); | |
Ok(ws.on_upgrade(move |socket| handle_socket(socket, uid, ver))) | |
} | |
/// Actual websocket statemachine (one will be spawned per connection) | |
async fn handle_socket(mut socket: WebSocket, uid: u64, ver: u64) { | |
dbg!((uid, ver)); | |
//send a ping (unsupported by some browsers) just to kick things off and get a response | |
if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { | |
println!("Pinged ..."); | |
} else { | |
println!("Could not send ping !"); | |
// no Error here since the only thing we can do is to close the connection. | |
// If we can not send messages, there is no way to salvage the statemachine anyway. | |
return; | |
} | |
// receive single message from a client (we can either receive or send with socket). | |
// this will likely be the Pong for our Ping or a hello message from client. | |
// waiting for message from a client will block this task, but will not block other client's | |
// connections. | |
if let Some(msg) = socket.recv().await { | |
if let Ok(msg) = msg { | |
if process_message(msg).is_break() { | |
return; | |
} | |
} else { | |
println!("client abruptly disconnected"); | |
return; | |
} | |
} | |
// Since each client gets individual statemachine, we can pause handling | |
// when necessary to wait for some external event (in this case illustrated by sleeping). | |
// Waiting for this client to finish getting its greetings does not prevent other clients from | |
// connecting to server and receiving their greetings. | |
for i in 1..5 { | |
if socket | |
.send(Message::Binary(b" times!".into())) | |
.await | |
.is_err() | |
{} | |
if socket | |
.send(Message::Text(format!("Hi {i} times!"))) | |
.await | |
.is_err() | |
{ | |
println!("client abruptly disconnected"); | |
return; | |
} | |
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | |
} | |
// By splitting socket we can send and receive at the same time. In this example we will send | |
// unsolicited messages to client based on some sort of server's internal event (i.e .timer). | |
let (mut sender, mut receiver) = socket.split(); | |
// Spawn a task that will push several messages to the client (does not matter what client does) | |
let mut send_task = tokio::spawn(async move { | |
let n_msg = 20; | |
for i in 0..n_msg { | |
// In case of any websocket error, we exit. | |
if sender | |
.send(Message::Text(format!("Server message {i} ..."))) | |
.await | |
.is_err() | |
{ | |
return i; | |
} | |
tokio::time::sleep(std::time::Duration::from_millis(1000)).await; | |
} | |
println!("Sending close to ..."); | |
if let Err(e) = sender | |
.send(Message::Close(Some(CloseFrame { | |
code: axum::extract::ws::close_code::NORMAL, | |
reason: Cow::from("Goodbye"), | |
}))) | |
.await | |
{ | |
println!("Could not send Close due to {e}, probably it is ok?"); | |
} | |
n_msg | |
}); | |
// This second task will receive messages from client and print them on server console | |
let mut recv_task = tokio::spawn(async move { | |
let mut cnt = 0; | |
while let Some(Ok(msg)) = receiver.next().await { | |
cnt += 1; | |
// print message and break if instructed to do so | |
if process_message(msg).is_break() { | |
break; | |
} | |
} | |
cnt | |
}); | |
// If any one of the tasks exit, abort the other. | |
tokio::select! { | |
rv_a = (&mut send_task) => { | |
match rv_a { | |
Ok(a) => println!("{a} messages sent to "), | |
Err(a) => println!("Error sending messages {a:?}") | |
} | |
recv_task.abort(); | |
}, | |
rv_b = (&mut recv_task) => { | |
match rv_b { | |
Ok(b) => println!("Received {b} messages"), | |
Err(b) => println!("Error receiving messages {b:?}") | |
} | |
send_task.abort(); | |
} | |
} | |
// returning from the handler closes the websocket connection | |
println!("Websocket context destroyed"); | |
} | |
/// helper to print contents of messages to stdout. Has special treatment for Close. | |
fn process_message(msg: Message) -> ControlFlow<(), ()> { | |
match msg { | |
Message::Text(t) => { | |
println!(">>> sent str: {t:?}"); | |
} | |
Message::Binary(d) => { | |
println!(">>> sent {} bytes: {:?}", d.len(), d); | |
} | |
Message::Close(c) => { | |
if let Some(cf) = c { | |
println!( | |
">>> sent close with code {} and reason `{}`", | |
cf.code, cf.reason | |
); | |
} else { | |
println!(">>> somehow sent close message without CloseFrame"); | |
} | |
return ControlFlow::Break(()); | |
} | |
Message::Pong(v) => { | |
println!(">>> sent pong with {v:?}"); | |
} | |
// You should never need to manually handle Message::Ping, as axum's websocket library | |
// will do so for you automagically by replying with Pong and copying the v according to | |
// spec. But if you need the contents of the pings you can see them here. | |
Message::Ping(v) => { | |
println!(">>> sent ping with {v:?}"); | |
} | |
} | |
ControlFlow::Continue(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment