Created
April 4, 2019 22:55
-
-
Save rtyler/3d5b0ed5858f4ae1c2694d1b1b711a31 to your computer and use it in GitHub Desktop.
An example of a Rurst rdkafka producer publishing to an Azure Event Hub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "rustyhub" | |
version = "0.1.0" | |
authors = ["R. Tyler Croy <rtyler@brokenco.de>"] | |
edition = "2018" | |
[dependencies] | |
futures = "~0.1.21" | |
clap = "~2.32.0" | |
regex = "1.1.5" | |
[dependencies.rdkafka] | |
version = "~0.20.0" | |
features = ["ssl", "sasl"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate clap; [54/235] | |
extern crate futures; | |
extern crate rdkafka; | |
extern crate regex; | |
use clap::{App, Arg}; | |
use futures::*; | |
use rdkafka::config::ClientConfig; | |
use rdkafka::producer::{FutureProducer, FutureRecord}; | |
use rdkafka::util::get_rdkafka_version; | |
use regex::Regex; | |
fn produce(connection: &str, topic_name: &str) { | |
let re = Regex::new(r"Endpoint=sb://(?P<host>.*)/(.*)").unwrap(); | |
let caps = re.captures(connection).unwrap(); | |
let mut brokers = String::from(&caps["host"]); | |
brokers.push_str(":9093"); | |
println!("Sending messages to: {}", &brokers); | |
let producer: FutureProducer = ClientConfig::new() | |
.set("bootstrap.servers", &brokers) | |
.set("produce.offset.report", "true") | |
.set("message.timeout.ms", "5000") | |
.set("security.protocol", "sasl_ssl") | |
/* | |
* The following setting -must- be set for librdkafka, but doesn't seem to do anything | |
* worthwhile,. Since we're not relying on kerberos, let's just give it some junk :) | |
*/ | |
.set("sasl.kerberos.kinit.cmd", "echo 'who wants kerberos?!'") | |
/* | |
* Another unused setting which is required to be present | |
*/ | |
.set("sasl.kerberos.keytab", "keytab") | |
.set("sasl.mechanisms", "PLAIN") | |
/* | |
* The username that Azure Event Hubs uses for Kafka is really this | |
*/ | |
.set("sasl.username", "$ConnectionString") | |
/* | |
* NOTE: Depending on your system, you may need to change this to adifferent location | |
*/ | |
.set("ssl.ca.location", "/etc/ssl/ca-bundle.pem") | |
.set("sasl.password", &connection) | |
.create() | |
.expect("Producer creation error"); | |
// This loop is non blocking: all messages will be sent one after the other, without waiting | |
// for the results. | |
let futures = (0..5) | |
.map(|i| { | |
// The send operation on the topic returns a future, that will be completed once the | |
// result or failure from Kafka will be received. | |
producer.send( | |
FutureRecord::to(topic_name) | |
.payload(&format!("Message {}", i)) | |
.key(&format!("Key {}", i)), | |
-1 | |
) | |
.map(move |delivery_status| { // This will be executed onw the result is received | |
println!("Delivery status for message {} received", i); | |
delivery_status | |
}) | |
}) | |
.collect::<Vec<_>>(); | |
// This loop will wait until all delivery statuses have been received received. | |
for future in futures { | |
println!("Future completed. Result: {:?}", future.wait()); | |
} | |
} | |
fn main() { | |
let matches = App::new("producer example") | |
.version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) | |
.about("Simple command line producer") | |
.arg(Arg::with_name("connection") | |
.short("c") | |
.long("connection") | |
.help("Azure Event Hub connection string") | |
.takes_value(true)) | |
.arg(Arg::with_name("topic") | |
.short("t") | |
.long("topic") | |
.help("Destination topic") | |
.takes_value(true) | |
.default_value("notifications") | |
.required(true)) | |
.get_matches(); | |
let (version_n, version_s) = get_rdkafka_version(); | |
println!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s); | |
let topic = matches.value_of("topic").unwrap(); | |
let connection = matches.value_of("connection").unwrap(); | |
produce(connection, topic); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment