Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save cmontella/39a745afd93911898a302cbb46c69fdc to your computer and use it in GitHub Desktop.
Save cmontella/39a745afd93911898a302cbb46c69fdc to your computer and use it in GitHub Desktop.
use actix::prelude::*;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use std::time::Duration;
use std::time;
use std::thread::{self, JoinHandle};
#[macro_use]
extern crate crossbeam_channel;
use crossbeam_channel::{Sender, Receiver};
#[derive(Message)]
#[rtype(result = "()")]
struct RegisterWSClient {
addr: Addr<MyWebSocket>,
}
#[derive(Message)]
#[rtype(result = "()")]
struct ServerEvent {
event: String,
}
async fn ws_index(
r: HttpRequest,
stream: web::Payload,
data: web::Data<Addr<ServerMonitor>>,
) -> Result<HttpResponse, Error> {
let (addr, res) = ws::start_with_addr(MyWebSocket {}, &r, stream).unwrap();
data.get_ref().do_send(RegisterWSClient { addr: addr });
Ok(res)
}
struct MyWebSocket {}
impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
// process websocket messages
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
ctx.text("Message!");
}
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(_)) => {
ctx.stop();
}
_ => ctx.stop(),
}
}
}
impl Handler<ServerEvent> for MyWebSocket {
type Result = ();
fn handle(&mut self, msg: ServerEvent, ctx: &mut Self::Context) {
ctx.text(msg.event);
}
}
struct ServerMonitor {
listeners: Vec<Addr<MyWebSocket>>,
thread: Option<JoinHandle<()>>,
incoming: Receiver<String>,
}
impl Actor for ServerMonitor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_millis(10), |act, _| {
loop {
match &act.incoming.try_recv() {
Ok(message) => {
for listener in &act.listeners {
listener.do_send(ServerEvent{ event: message.to_string() });
}
}
Err(_) => break,
}
}
});
}
}
impl Handler<RegisterWSClient> for ServerMonitor {
type Result = ();
fn handle(&mut self, msg: RegisterWSClient, _: &mut Context<Self>) {
self.listeners.push(msg.addr);
}
}
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (ws_outgoing, ws_incoming) = crossbeam_channel::unbounded();
let thread = thread::Builder::new().name("Websocket Receiving Thread".to_string()).spawn(move || {
let mut counter = 0;
loop {
let ten_millis = time::Duration::from_millis(1000);
let now = time::Instant::now();
counter = counter + 1;
// Send a message over the channel
ws_outgoing.send(format!("{:?}",counter));
thread::sleep(ten_millis);
}
});
let srvmon = ServerMonitor { listeners: vec![], thread: None, incoming: ws_incoming }.start();
HttpServer::new(move || {
App::new()
.data(srvmon.clone())
.wrap(middleware::Logger::default())
.service(web::resource("/ws/").route(web::get().to(ws_index)))
})
.bind("127.0.0.1:3012")?
.run()
.await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment