Skip to content

Instantly share code, notes, and snippets.

@rtyler rtyler/Cargo.toml
Created Apr 4, 2019

Embed
What would you like to do?
An example of a Rurst rdkafka producer publishing to an Azure Event Hub
[package]
name = "rustyhub"
version = "0.1.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
[dependencies]
futures = "~0.1.21"
clap = "~2.32.0"
regex = "1.1.5"
[dependencies.rdkafka]
version = "~0.20.0"
features = ["ssl", "sasl"]
extern crate clap; [54/235]
extern crate futures;
extern crate rdkafka;
extern crate regex;
use clap::{App, Arg};
use futures::*;
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;
use regex::Regex;
fn produce(connection: &str, topic_name: &str) {
let re = Regex::new(r"Endpoint=sb://(?P<host>.*)/(.*)").unwrap();
let caps = re.captures(connection).unwrap();
let mut brokers = String::from(&caps["host"]);
brokers.push_str(":9093");
println!("Sending messages to: {}", &brokers);
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.set("produce.offset.report", "true")
.set("message.timeout.ms", "5000")
.set("security.protocol", "sasl_ssl")
/*
* The following setting -must- be set for librdkafka, but doesn't seem to do anything
* worthwhile,. Since we're not relying on kerberos, let's just give it some junk :)
*/
.set("sasl.kerberos.kinit.cmd", "echo 'who wants kerberos?!'")
/*
* Another unused setting which is required to be present
*/
.set("sasl.kerberos.keytab", "keytab")
.set("sasl.mechanisms", "PLAIN")
/*
* The username that Azure Event Hubs uses for Kafka is really this
*/
.set("sasl.username", "$ConnectionString")
/*
* NOTE: Depending on your system, you may need to change this to adifferent location
*/
.set("ssl.ca.location", "/etc/ssl/ca-bundle.pem")
.set("sasl.password", &connection)
.create()
.expect("Producer creation error");
// This loop is non blocking: all messages will be sent one after the other, without waiting
// for the results.
let futures = (0..5)
.map(|i| {
// The send operation on the topic returns a future, that will be completed once the
// result or failure from Kafka will be received.
producer.send(
FutureRecord::to(topic_name)
.payload(&format!("Message {}", i))
.key(&format!("Key {}", i)),
-1
)
.map(move |delivery_status| { // This will be executed onw the result is received
println!("Delivery status for message {} received", i);
delivery_status
})
})
.collect::<Vec<_>>();
// This loop will wait until all delivery statuses have been received received.
for future in futures {
println!("Future completed. Result: {:?}", future.wait());
}
}
fn main() {
let matches = App::new("producer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line producer")
.arg(Arg::with_name("connection")
.short("c")
.long("connection")
.help("Azure Event Hub connection string")
.takes_value(true))
.arg(Arg::with_name("topic")
.short("t")
.long("topic")
.help("Destination topic")
.takes_value(true)
.default_value("notifications")
.required(true))
.get_matches();
let (version_n, version_s) = get_rdkafka_version();
println!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topic = matches.value_of("topic").unwrap();
let connection = matches.value_of("connection").unwrap();
produce(connection, topic);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.