Skip to content

Instantly share code, notes, and snippets.

@izderadicka
Created February 29, 2020 08:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save izderadicka/bdc803d38840a15436f1a5ac1b1ca2cd to your computer and use it in GitHub Desktop.
Save izderadicka/bdc803d38840a15436f1a5ac1b1ca2cd to your computer and use it in GitHub Desktop.
Hyper websocket example
#[macro_use]
extern crate log;
use futures::prelude::*;
use futures::stream::StreamExt;
use headers::{self, HeaderMapExt};
use hyper::header::{self, AsHeaderName, HeaderMap, HeaderValue};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use hyper::{self, Body, Method, Request, Response, StatusCode};
use std::convert::Infallible;
use std::io;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_tungstenite::{tungstenite::protocol, WebSocketStream};
type GenericError = Box<dyn std::error::Error + Send + Sync>;
static INDEX_PATH: &str = "examples/index.html";
async fn send_file(p: &str) -> Result<Response<Body>, std::io::Error> {
let mut f = File::open(p).await?;
let mut data = Vec::new();
f.read_to_end(&mut data).await?;
Ok(Response::new(data.into()))
}
fn error_response(err: String) -> Response<Body> {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(err.into())
.unwrap()
}
fn not_found() -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body("Not Found".into())
.unwrap()
}
async fn route(req: Request<Body>) -> Result<Response<Body>, io::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => send_file(INDEX_PATH).await,
(&Method::GET, "/socket") => handle_ws_connection(req),
_ => Ok(not_found()),
}
.or_else(|e| Ok(error_response(e.to_string())))
}
fn header_matches<S: AsHeaderName>(headers: &HeaderMap<HeaderValue>, name: S, value: &str) -> bool {
headers
.get(name)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_lowercase() == value)
.unwrap_or(false)
}
pub fn upgrade_connection(
req: Request<Body>,
) -> Result<
(
Response<Body>,
impl Future<Output = Result<WebSocketStream<hyper::upgrade::Upgraded>, ()>> + Send,
),
Response<Body>,
> {
let mut res = Response::new(Body::empty());
let mut header_error = false;
debug!("We got these headers: {:?}", req.headers());
if !header_matches(req.headers(), header::UPGRADE, "websocket") {
error!("Upgrade is not to websocket");
header_error = true;
}
if !header_matches(req.headers(), header::SEC_WEBSOCKET_VERSION, "13") {
error!("Websocket protocol version must be 13");
header_error = true;
}
if !req
.headers()
.typed_get::<headers::Connection>()
.map(|h| h.contains("Upgrade"))
.unwrap_or(false)
{
error!("It must be upgrade connection");
header_error = true;
}
let key = req.headers().typed_get::<headers::SecWebsocketKey>();
if key.is_none() {
error!("Websocket key missing");
header_error = true;
}
if header_error {
*res.status_mut() = StatusCode::BAD_REQUEST;
return Err(res);
}
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
let h = res.headers_mut();
h.typed_insert(headers::Upgrade::websocket());
h.typed_insert(headers::SecWebsocketAccept::from(key.unwrap()));
h.typed_insert(headers::Connection::upgrade());
let upgraded = req
.into_body()
.on_upgrade()
.map_err(|err| error!("Cannot create websocket: {} ", err))
.and_then(|upgraded| async {
debug!("Connection upgraded to websocket");
let r = WebSocketStream::from_raw_socket(upgraded, protocol::Role::Server, None).await;
Ok(r)
});
Ok((res, upgraded))
}
// Just echo back received messages.
fn handle_ws_connection(req: Request<Body>) -> Result<Response<Body>, io::Error> {
let res = match upgrade_connection(req) {
Err(res) => res,
Ok((res, ws)) => {
let run_ws_task = async {
match ws.await {
Ok(ws) => {
debug!("Spawning WS");
let mut counter = 0;
let (tx, rc) = ws.split();
let rc = rc.try_filter_map(|m| {
debug!("Got message {:?}", m);
future::ok(match m {
protocol::Message::Text(text) => {
counter += 1;
Some(protocol::Message::text(format!(
"Response {}: {}",
counter, text
)))
}
_ => None,
})
});
match rc.forward(tx).await {
Err(e) => error!("WS Error {}", e),
Ok(_) => debug!("Websocket has ended"),
}
}
Err(_e) => error!("WS error"),
}
};
tokio::spawn(run_ws_task);
res
}
};
debug!("WS HTTP Response {:?}", res);
Ok(res)
}
#[tokio::main]
async fn main() -> Result<(), GenericError> {
env_logger::init();
let addr = ([127, 0, 0, 1], 5000).into();
let service = make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(route)) });
let server = Server::bind(&addr).serve(service);
info!("Serving on {}", addr);
server.await?;
Ok(())
}
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'>
<title>Web Socket Demo - Echo</title>
<meta name='viewport' content='width=device-width, initial-scale=1'>
<style>
html {
font-family: Arial, Helvetica, sans-serif;
}
.blk {
margin-bottom: 1em;
}
label {
font-weight: bold;
}
.timestamp {
background-color:lightgray;
margin-right: 1em;
color:gray;
}
</style>
</head>
<body>
<h1>Web Socket Demo - Echo</h1>
<div class="blk">
<label>Message:</label>
<input type="text" name="msg" id="input-msg"> <button id="btn-send">Send</button>
</div>
<div class="blk">
<label>Responses</label>
<div id="responces">
</div>
</div>
<script>
const socketUrl = `ws://${window.location.host}/socket`;
const socket = new WebSocket(socketUrl);
socket.addEventListener('open', function (event) {
console.log(`Socket connected at ${socketUrl}`);
});
socket.addEventListener('error', function (event) {
console.error(`Websocket Error ${event.data}`);
});
socket.addEventListener('close', function (event) {
console.log(`Websocket Closed`);
});
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
const ts = (new Date()).toISOString();
const tsElem = document.createElement("SPAN")
tsElem.setAttribute("class", "timestamp");
tsElem.textContent = ts;
const item = document.createElement("DIV");
item.innerText = event.data;
item.prepend(tsElem);
responces.prepend(item);
});
const btn = document.getElementById("btn-send");
const responces = document.getElementById("responces");
const input = document.getElementById("input-msg")
btn.addEventListener("click", (e) => {
if (socket.readyState == WebSocket.OPEN) {
const msg =input.value;
socket.send(msg);
} else {
console.error("Socket not ready to send message")
}
})
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment