Skip to content

Instantly share code, notes, and snippets.

@DarkSector
Last active April 16, 2020 08:26
Show Gist options
  • Save DarkSector/eec376803467e47f4bb10fe02a3d7c1e to your computer and use it in GitHub Desktop.
Save DarkSector/eec376803467e47f4bb10fe02a3d7c1e to your computer and use it in GitHub Desktop.
Supervisor - Client actor system
use std::panic;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
BoxedSocket, Client,
};
use bytes::Bytes;
use crossbeam::{Receiver, Sender};
use futures::stream::{SplitSink, StreamExt};
use serde::Deserialize;
use serde_json;
#[derive(Message)]
#[rtype(result = "()")]
struct WSClientStatus {
connected: bool,
}
// All actors must communicate via a Message derivative
#[derive(Message)]
#[rtype(result = "()")]
struct ClientCommand();
struct WSClient {
sink: Option<SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>>,
supervisor_addr: Addr<Supervisor>,
}
impl WSClient {
// heartbeat implementation for the actor
// All it does is send a Pong message back via the sink to the server
// every 1 second
fn hb(&self, ctx: &mut Context<Self>) {
// let sink = self.sink.unwrap();
if self.sink.is_some() {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
println!("foo");
// println!("{}", act.sink);
// act.sink.write(Message::Pong(Bytes::from_static(b""))).unwrap();
// calls itself after calling once so that it can run every 1 second
act.hb(ctx);
});
}
}
}
// implement Actor trait for the client
impl Actor for WSClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
info!("Disconnected");
// Don't kill the system, that is for the main supervisor to kill
// System::current().stop();
// Instead send the message to the supervisor
}
}
// In order for WSClient to behave as an actor it needs to implement a handler as well
impl Handler<ClientCommand> for WSClient {
type Result = ();
fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context<Self>) {
let msg = serde_json::to_string(&msg.0).ok().unwrap();
match &self.sink {
// Some(mut t) => t.write(Message::Text(msg)).unwrap(),
_ => println!("Handler doesn't have sink yet")
}
}
}
// StreamHandler
// This is helper trait that allows to handle `Stream` in
// Write handler
impl actix::io::WriteHandler<WsProtocolError> for WSClient {}
/****************** SUPERVISOR ****************/
struct Supervisor {
addr: Addr<WSClient> // keep the address of the client
}
impl Actor for Supervisor {
type Context = Context<Self>;
fn stopped(&mut self, ctx: &mut Context<Self>) {
info!("Supervisor disconnected shutting downs system");
System::current().stop();
}
}
impl Handler<WSClientStatus> for Supervisor {
type Result = ();
fn handle(&mut self, client_status: WSClientStatus, _ctx: &mut Context<Self>) {
// this is where we handle incoming WSClientStatus
// if we receive a false here
// we spawn the actor again
// We will need to keep a track of how many reconnects
// TODO: Figure out how to restart/start the arbiter here
match client_status.connected {
false => {
println!("Received a false");
Arbiter::spawn(async {
let (response, framed) = Client::new()
.ws(client_status.conf.device_uri)
.connect()
.await
.map_err(|e| {
panic!(format!("Error connecting to websocket: {}", e));
})
.unwrap();
debug!("{:?}", response);
// This is the ControlToServer that needs to be sent initially
let initial_update_device = ControlToServer::DeviceUpdatedState {
name: ControlToServerName::UpdateState,
state: DeviceState::NotWriting,
};
// Split the BoxedSocket
let (sink, stream) = framed.split();
let rx = writer_client.rx_resp.take().unwrap();
WSClient::add_stream(stream , ctx);
self.addr = sink;
});
}
true => println!("Received a true")
}
}
}
/**************************************************/
pub fn start_device_client() -> SystemRunner {
let sys = System::new("websocket-client");
let super_visor_actor = Supervisor::create(|ctx| {
// supervisor address
let supervisor_add = ctx.address(); // not using this yet
let ws_client = WSClient::create(|ctx| {
// unfortunately the client has to have a stream going before it
// can be initialized
// for that a connection needs to exist
WSClient {
// sink: SinkWrite::new(sink, ctx),
sink: None,
supervisor_addr: supervisor_add, // send it thee current address of the supervisor as well
}
});
Supervisor {
addr: ws_client // Is this necessary?
}
});
// send to the supervisor that there is connection
super_visor_actor.do_send(WSClientStatus {
connected: false,
});
sys
}
fn main() {
let sys = start_device_client();
sys.run.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment