Last active
October 12, 2018 20:27
-
-
Save kendru/b5de26d6f0aacde7874334a1db6d6e13 to your computer and use it in GitHub Desktop.
Test dispatching command to an Actix actor system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate actix; | |
extern crate actix_web; | |
extern crate futures; | |
use std::thread; | |
use std::io::{self, Write}; | |
use actix::prelude::*; | |
use actix_web::{Error as ActixWebError, HttpMessage}; | |
use actix_web::client; | |
use futures::future::{self, Future}; | |
fn main() { | |
let system = System::new("test"); | |
let addr = MessageDispatcher::new().start(); | |
let stdin = io::stdin(); | |
Arbiter::spawn(futures::lazy(move || { | |
thread::spawn(move || loop { | |
let mut line = String::new(); | |
print!("Enter command: "); | |
io::stdout().flush().unwrap(); | |
stdin.read_line(&mut line).unwrap(); | |
let cmd = UserInput(line.trim().to_string()); | |
addr.do_send(cmd); | |
}); | |
future::ok(()) | |
})); | |
println!("System starting"); | |
system.run(); | |
println!("System shut down"); | |
} | |
#[derive(Debug, Message)] | |
struct UserInput(String); | |
#[derive(Message)] | |
enum Command { | |
PrintString(String), | |
FetchURL(String), | |
} | |
struct Worker { | |
name: String, | |
} | |
impl Actor for Worker { | |
type Context = Context<Self>; | |
fn started(&mut self, _ctx: &mut Context<Self>) { | |
println!("Worker started: {}", self.name); | |
} | |
} | |
impl Handler<Command> for Worker { | |
type Result = (); | |
fn handle(&mut self, msg: Command, _ctx: &mut Context<Self>) -> Self::Result { | |
use Command::*; | |
match msg { | |
PrintString(s) => { | |
println!("{} from {}", s, self.name); | |
} | |
FetchURL(url) => { | |
println!("Fetching {} from {}", url, self.name); | |
let worker_name = self.name.clone(); | |
Arbiter::spawn( | |
client::ClientRequest::get(url.clone()) | |
.finish().unwrap() | |
.send() | |
.map_err(move |e| { | |
println!("Error requesting {}: {}", url.clone(), e); | |
ActixWebError::from(e) | |
}) | |
.and_then(|resp| { | |
resp.body() | |
.from_err() | |
.and_then(move |body| { | |
println!("Response from {}: {}", worker_name, String::from_utf8(body[..].to_vec()).unwrap()); | |
future::ok(()) | |
}) | |
}) | |
.map_err(|_| ()) | |
); | |
} | |
} | |
} | |
} | |
struct MessageDispatcher { | |
workers: Vec<Recipient<Command>> | |
} | |
impl Actor for MessageDispatcher { | |
type Context = Context<Self>; | |
} | |
impl MessageDispatcher { | |
fn new() -> MessageDispatcher { | |
MessageDispatcher { | |
workers: vec!() | |
} | |
} | |
} | |
impl Handler<UserInput> for MessageDispatcher { | |
type Result = (); | |
fn handle(&mut self, msg: UserInput, _ctx: &mut Context<Self>) -> Self::Result { | |
use Command::*; | |
match msg { | |
UserInput(ref cmd) if cmd == "quit" => { | |
println!("Quitting"); | |
System::current().stop(); | |
}, | |
UserInput(ref cmd) if cmd == "spawn" => { | |
let worker_name = format!("worker-{}", self.workers.len()); | |
let addr = Worker{ name: worker_name }.start(); | |
self.workers.push(addr.recipient()); | |
}, | |
UserInput(ref cmd) if cmd.starts_with("fetch:") => { | |
let url = &cmd[6..]; | |
for subscriber in &self.workers { | |
subscriber.do_send(FetchURL(url.to_string())).unwrap(); | |
} | |
}, | |
UserInput(input) => { | |
for subscriber in &self.workers { | |
subscriber.do_send(PrintString(input.clone())).unwrap(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment