Skip to content

Instantly share code, notes, and snippets.

@ian-p-cooke
Last active August 24, 2018 04:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ian-p-cooke/145f14fdafc5100a1fd4f2ea29098435 to your computer and use it in GitHub Desktop.
Save ian-p-cooke/145f14fdafc5100a1fd4f2ea29098435 to your computer and use it in GitHub Desktop.
#[derive(Message)]
struct Batch {
key: String,
messages: Vec<String>
}
struct RedisListTailer {
redis: Addr<Redis>,
key: String,
current_index: isize,
batch_size: isize,
sink: Recipient<Batch>,
}
impl RedisListTailer {
pub fn from_index(redis: Addr<Redis>, key: String, start_index: isize, batch_size: isize, sink: Recipient<Batch>) -> RedisListTailer {
assert!(start_index >= 0);
assert!(batch_size > 0);
RedisListTailer { redis, key, current_index: start_index, batch_size, sink }
}
pub fn from_beginning(redis: Addr<Redis>, key: String, batch_size: isize, sink: Recipient<Batch>) -> RedisListTailer {
RedisListTailer::from_index(redis, key, 0, batch_size, sink)
}
}
#[derive(Message)]
struct RunOnce;
impl Actor for RedisListTailer {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.notify_later(RunOnce, Duration::from_secs(0));
}
}
impl Handler<RunOnce> for RedisListTailer {
type Result = ();
fn handle(&mut self, _msg: RunOnce, ctx: &mut Context<Self>) -> Self::Result {
let req = self.redis.send(Lrange { key: self.key.clone(), start_index: self.current_index, stop_index: self.current_index + self.batch_size - 1 });
ctx.spawn(req.into_actor(self).then(move |r, act, ctx| {
match r {
Ok(res) => {
match res {
Ok(messages) => {
let batch_len = messages.len();
if batch_len == 0 {
ctx.notify_later(RunOnce, Duration::from_secs(1));
} else {
match act.sink.do_send(Batch { key: act.key.clone(), messages }) {
Ok(_) => {
act.current_index += batch_len as isize;
ctx.notify(RunOnce);
}
Err(e) => {
println!("sink error when sending batch: {}", e);
ctx.stop();
}
}
}
},
Err(e) => {
println!("redis error during lrange: {}", e);
ctx.stop();
}
}
}
Err(e) => {
println!("mailbox error during lrange: {}", e);
ctx.stop();
}
}
actix::fut::ok(())
}));
()
}
}
@ian-p-cooke
Copy link
Author

the most recent revision shows what things look like after using into_actor. much nicer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment