Created
November 11, 2021 04:55
-
-
Save digizeph/fcac3027555c0b744ea0b3a11197b694 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::io::Cursor; | |
use rdkafka::{ClientConfig, ClientContext, Message}; | |
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, StreamConsumer}; | |
pub use bgpkit_parser::{parse_bmp_msg, parse_openbmp_header}; | |
use log::{info, error}; | |
// A simple context to customize the consumer behavior and print a log line every time | |
// offsets are committed | |
struct LoggingConsumerContext; | |
impl ClientContext for LoggingConsumerContext {} | |
impl ConsumerContext for LoggingConsumerContext {} | |
// Define a new type for convenience | |
type LoggingConsumer = StreamConsumer<LoggingConsumerContext>; | |
async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { | |
let consumer: LoggingConsumer = ClientConfig::new() | |
.set("group.id", group_id) | |
.set("bootstrap.servers", brokers) | |
.set("enable.partition.eof", "false") | |
.set("session.timeout.ms", "60000") | |
.set("enable.auto.commit", "true") | |
//.set("statistics.interval.ms", "30000") | |
//.set("auto.offset.reset", "smallest") | |
.create_with_context(LoggingConsumerContext).unwrap(); | |
consumer | |
.subscribe(&topics.to_vec()) | |
.expect("Can't subscribe to specified topics"); | |
loop { | |
match consumer.recv().await { | |
Err(_) => {}, | |
Ok(m) => { | |
let payload = m.payload(); | |
if let Some(p) = payload { | |
let mut reader = Cursor::new(Vec::from(p)); | |
let header = parse_openbmp_header(&mut reader).unwrap(); | |
match parse_bmp_msg(&mut reader) { | |
Ok(msg) => { | |
info!("Parsing OK: {:?}", msg.common_header.msg_type); | |
} | |
Err(e) => { | |
error!("{:?}", e); | |
error!("{:?}", header); | |
let hex = hex::encode(p); | |
error!("{}", hex); | |
break | |
} | |
} | |
} | |
consumer.commit_message(&m, CommitMode::Async).unwrap(); | |
} | |
}; | |
} | |
} | |
#[tokio::main] | |
pub async fn main(){ | |
env_logger::init(); | |
let topic = "^routeviews\\.route-views2\\..+\\.bmp_raw"; | |
// consume_and_print("stream.routeviews.org", "bgpkit-parser-2", &["routeviews.route-views2.7660.bmp_raw"]).await | |
consume_and_print("stream.routeviews.org", "bgpkit-parser-2", &[topic]).await | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment