Skip to content

Instantly share code, notes, and snippets.

@zew13
Created December 1, 2023 05:46
Show Gist options
  • Save zew13/b5bccd89cf7cd09032a7a89cfa10fa33 to your computer and use it in GitHub Desktop.
Save zew13/b5bccd89cf7cd09032a7a89cfa10fa33 to your computer and use it in GitHub Desktop.
use std::{borrow::Cow, ops::ControlFlow};
use client::Client;
use futures_util::{SinkExt, StreamExt};
use t3::{
axum::extract::{
ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade},
Path,
},
IntoResponse,
};
pub async fn get(
ws: WebSocketUpgrade,
client: Client,
Path((uid, ver)): Path<(String, String)>,
) -> t3::Result<impl IntoResponse> {
let uid = u64::from_str_radix(&uid, 36).unwrap_or(0);
client.uid_logined(uid).await?;
let ver = u64::from_str_radix(&ver, 36).unwrap_or(0);
Ok(ws.on_upgrade(move |socket| handle_socket(socket, uid, ver)))
}
/// Actual websocket statemachine (one will be spawned per connection)
async fn handle_socket(mut socket: WebSocket, uid: u64, ver: u64) {
dbg!((uid, ver));
//send a ping (unsupported by some browsers) just to kick things off and get a response
if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() {
println!("Pinged ...");
} else {
println!("Could not send ping !");
// no Error here since the only thing we can do is to close the connection.
// If we can not send messages, there is no way to salvage the statemachine anyway.
return;
}
// receive single message from a client (we can either receive or send with socket).
// this will likely be the Pong for our Ping or a hello message from client.
// waiting for message from a client will block this task, but will not block other client's
// connections.
if let Some(msg) = socket.recv().await {
if let Ok(msg) = msg {
if process_message(msg).is_break() {
return;
}
} else {
println!("client abruptly disconnected");
return;
}
}
// Since each client gets individual statemachine, we can pause handling
// when necessary to wait for some external event (in this case illustrated by sleeping).
// Waiting for this client to finish getting its greetings does not prevent other clients from
// connecting to server and receiving their greetings.
for i in 1..5 {
if socket
.send(Message::Binary(b" times!".into()))
.await
.is_err()
{}
if socket
.send(Message::Text(format!("Hi {i} times!")))
.await
.is_err()
{
println!("client abruptly disconnected");
return;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
// By splitting socket we can send and receive at the same time. In this example we will send
// unsolicited messages to client based on some sort of server's internal event (i.e .timer).
let (mut sender, mut receiver) = socket.split();
// Spawn a task that will push several messages to the client (does not matter what client does)
let mut send_task = tokio::spawn(async move {
let n_msg = 20;
for i in 0..n_msg {
// In case of any websocket error, we exit.
if sender
.send(Message::Text(format!("Server message {i} ...")))
.await
.is_err()
{
return i;
}
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
println!("Sending close to ...");
if let Err(e) = sender
.send(Message::Close(Some(CloseFrame {
code: axum::extract::ws::close_code::NORMAL,
reason: Cow::from("Goodbye"),
})))
.await
{
println!("Could not send Close due to {e}, probably it is ok?");
}
n_msg
});
// This second task will receive messages from client and print them on server console
let mut recv_task = tokio::spawn(async move {
let mut cnt = 0;
while let Some(Ok(msg)) = receiver.next().await {
cnt += 1;
// print message and break if instructed to do so
if process_message(msg).is_break() {
break;
}
}
cnt
});
// If any one of the tasks exit, abort the other.
tokio::select! {
rv_a = (&mut send_task) => {
match rv_a {
Ok(a) => println!("{a} messages sent to "),
Err(a) => println!("Error sending messages {a:?}")
}
recv_task.abort();
},
rv_b = (&mut recv_task) => {
match rv_b {
Ok(b) => println!("Received {b} messages"),
Err(b) => println!("Error receiving messages {b:?}")
}
send_task.abort();
}
}
// returning from the handler closes the websocket connection
println!("Websocket context destroyed");
}
/// helper to print contents of messages to stdout. Has special treatment for Close.
fn process_message(msg: Message) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
println!(">>> sent str: {t:?}");
}
Message::Binary(d) => {
println!(">>> sent {} bytes: {:?}", d.len(), d);
}
Message::Close(c) => {
if let Some(cf) = c {
println!(
">>> sent close with code {} and reason `{}`",
cf.code, cf.reason
);
} else {
println!(">>> somehow sent close message without CloseFrame");
}
return ControlFlow::Break(());
}
Message::Pong(v) => {
println!(">>> sent pong with {v:?}");
}
// You should never need to manually handle Message::Ping, as axum's websocket library
// will do so for you automagically by replying with Pong and copying the v according to
// spec. But if you need the contents of the pings you can see them here.
Message::Ping(v) => {
println!(">>> sent ping with {v:?}");
}
}
ControlFlow::Continue(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment