Skip to content

Instantly share code, notes, and snippets.

@kendru
Last active October 12, 2018 20:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kendru/b5de26d6f0aacde7874334a1db6d6e13 to your computer and use it in GitHub Desktop.
Save kendru/b5de26d6f0aacde7874334a1db6d6e13 to your computer and use it in GitHub Desktop.
Test dispatching command to an Actix actor system
extern crate actix;
extern crate actix_web;
extern crate futures;
use std::thread;
use std::io::{self, Write};
use actix::prelude::*;
use actix_web::{Error as ActixWebError, HttpMessage};
use actix_web::client;
use futures::future::{self, Future};
fn main() {
let system = System::new("test");
let addr = MessageDispatcher::new().start();
let stdin = io::stdin();
Arbiter::spawn(futures::lazy(move || {
thread::spawn(move || loop {
let mut line = String::new();
print!("Enter command: ");
io::stdout().flush().unwrap();
stdin.read_line(&mut line).unwrap();
let cmd = UserInput(line.trim().to_string());
addr.do_send(cmd);
});
future::ok(())
}));
println!("System starting");
system.run();
println!("System shut down");
}
#[derive(Debug, Message)]
struct UserInput(String);
#[derive(Message)]
enum Command {
PrintString(String),
FetchURL(String),
}
struct Worker {
name: String,
}
impl Actor for Worker {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
println!("Worker started: {}", self.name);
}
}
impl Handler<Command> for Worker {
type Result = ();
fn handle(&mut self, msg: Command, _ctx: &mut Context<Self>) -> Self::Result {
use Command::*;
match msg {
PrintString(s) => {
println!("{} from {}", s, self.name);
}
FetchURL(url) => {
println!("Fetching {} from {}", url, self.name);
let worker_name = self.name.clone();
Arbiter::spawn(
client::ClientRequest::get(url.clone())
.finish().unwrap()
.send()
.map_err(move |e| {
println!("Error requesting {}: {}", url.clone(), e);
ActixWebError::from(e)
})
.and_then(|resp| {
resp.body()
.from_err()
.and_then(move |body| {
println!("Response from {}: {}", worker_name, String::from_utf8(body[..].to_vec()).unwrap());
future::ok(())
})
})
.map_err(|_| ())
);
}
}
}
}
struct MessageDispatcher {
workers: Vec<Recipient<Command>>
}
impl Actor for MessageDispatcher {
type Context = Context<Self>;
}
impl MessageDispatcher {
fn new() -> MessageDispatcher {
MessageDispatcher {
workers: vec!()
}
}
}
impl Handler<UserInput> for MessageDispatcher {
type Result = ();
fn handle(&mut self, msg: UserInput, _ctx: &mut Context<Self>) -> Self::Result {
use Command::*;
match msg {
UserInput(ref cmd) if cmd == "quit" => {
println!("Quitting");
System::current().stop();
},
UserInput(ref cmd) if cmd == "spawn" => {
let worker_name = format!("worker-{}", self.workers.len());
let addr = Worker{ name: worker_name }.start();
self.workers.push(addr.recipient());
},
UserInput(ref cmd) if cmd.starts_with("fetch:") => {
let url = &cmd[6..];
for subscriber in &self.workers {
subscriber.do_send(FetchURL(url.to_string())).unwrap();
}
},
UserInput(input) => {
for subscriber in &self.workers {
subscriber.do_send(PrintString(input.clone())).unwrap();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment