Skip to content

Instantly share code, notes, and snippets.

@jerry73204
Created July 27, 2019 11:55
Show Gist options
  • Save jerry73204/ded57ed421023d7c77e55cb110194c09 to your computer and use it in GitHub Desktop.
Save jerry73204/ded57ed421023d7c77e55cb110194c09 to your computer and use it in GitHub Desktop.
rdkafka integration with actix example
use actix::prelude::*;
use failure::Fallible;
use rdkafka::{
consumer::stream_consumer::StreamConsumer,
message::OwnedMessage,
Message,
};
pub struct MsgConsumer {
consumer: &'static StreamConsumer,
}
impl MsgConsumer {
pub fn new(consumer: &'static StreamConsumer) -> MsgConsumer {
MsgConsumer { consumer }
}
fn handle_message(&self, msg: OwnedMessage) -> Fallible<()> {
let payload = match msg.payload_view::<[u8]>() {
Some(Ok(payload)) => payload,
Some(Err(err)) => bail!("Cannot decode message"),
None => bail!("Empty message"),
};
Ok(())
}
}
impl Actor for MsgConsumer {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let stream = self.consumer.start().filter_map(|result| match result {
Ok(msg) => Some(msg.detach()),
Err(err) => {
warn!("Kafka consumer error: {:?}", err);
None
}
});
ctx.add_stream(stream);
}
}
impl<'a, E> StreamHandler<OwnedMessage, E> for MsgConsumer {
fn handle(&mut self, item: OwnedMessage, ctx: &mut Self::Context) {
if let Err(err) = self.handle_message(item) {
warn!("Kafka consumer error: {:?}", err);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment