Skip to content

Instantly share code, notes, and snippets.

@alex179ohm
Created February 1, 2019 14:39
Show Gist options
  • Save alex179ohm/0310924b85e5e8333133a0e900761b80 to your computer and use it in GitHub Desktop.
Save alex179ohm/0310924b85e5e8333133a0e900761b80 to your computer and use it in GitHub Desktop.
extern crate nsqueue;
extern crate actix;
use nsqueue::{Consumer, Connection, Ctx, Msg, Error, Cmd, fin, Config};
use actix::prelude::*;
struct MyReader;
impl Consumer<Ctx> for MyReader {
// on_message callback
fn on_message(&self, msgs: &Vec<Msg>, ctx: &mut Ctx) -> Result<Vec<Cmd>, Error> {
let resps = Vec::new();
println!("{}", ctx.rdy_count());
if ctx.rdy_count() == 0 {
ctx.rdy(1);
}
for msg in msgs {
println!("{:?}", msg);
resps.push(fin(&msg.id));
}
Ok(resps)
}
// on_error callback
fn on_error(&self, msg: Msg, ctx: &mut Ctx) -> Option<Cmd> {
None
}
}
fn main() {
let config = Config::default().hostname("Consumer".to_string());
System::run(|| {
// Supervisor automatically restart Consumer if conndection drops.
// the _c variable is needed, see:
// https://stackoverflow.com/questions/50526516/
// Arbiter spawn a single event-loop thread.
Connection::new(
Some(MyReader{}),
"test", // topic
"test", //channel for receiver
"0.0.0.0:4150", //nsqd tcp address
Some(config), //config (Optional see mod config for defaults, if None Consumer sets defaults)
Some(2), //set rdy (if None Consumer sets initial rdy at 1)
).start();
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment