Skip to content

Instantly share code, notes, and snippets.

Created Apr 4, 2019
What would you like to do?
An example of a Rurst rdkafka producer publishing to an Azure Event Hub
name = "rustyhub"
version = "0.1.0"
authors = ["R. Tyler Croy <>"]
edition = "2018"
futures = "~0.1.21"
clap = "~2.32.0"
regex = "1.1.5"
version = "~0.20.0"
features = ["ssl", "sasl"]
extern crate clap; [54/235]
extern crate futures;
extern crate rdkafka;
extern crate regex;
use clap::{App, Arg};
use futures::*;
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;
use regex::Regex;
fn produce(connection: &str, topic_name: &str) {
let re = Regex::new(r"Endpoint=sb://(?P<host>.*)/(.*)").unwrap();
let caps = re.captures(connection).unwrap();
let mut brokers = String::from(&caps["host"]);
println!("Sending messages to: {}", &brokers);
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.set("", "true")
.set("", "5000")
.set("security.protocol", "sasl_ssl")
* The following setting -must- be set for librdkafka, but doesn't seem to do anything
* worthwhile,. Since we're not relying on kerberos, let's just give it some junk :)
.set("sasl.kerberos.kinit.cmd", "echo 'who wants kerberos?!'")
* Another unused setting which is required to be present
.set("sasl.kerberos.keytab", "keytab")
.set("sasl.mechanisms", "PLAIN")
* The username that Azure Event Hubs uses for Kafka is really this
.set("sasl.username", "$ConnectionString")
* NOTE: Depending on your system, you may need to change this to adifferent location
.set("", "/etc/ssl/ca-bundle.pem")
.set("sasl.password", &connection)
.expect("Producer creation error");
// This loop is non blocking: all messages will be sent one after the other, without waiting
// for the results.
let futures = (0..5)
.map(|i| {
// The send operation on the topic returns a future, that will be completed once the
// result or failure from Kafka will be received.
.payload(&format!("Message {}", i))
.key(&format!("Key {}", i)),
.map(move |delivery_status| { // This will be executed onw the result is received
println!("Delivery status for message {} received", i);
// This loop will wait until all delivery statuses have been received received.
for future in futures {
println!("Future completed. Result: {:?}", future.wait());
fn main() {
let matches = App::new("producer example")
.about("Simple command line producer")
.help("Azure Event Hub connection string")
.help("Destination topic")
let (version_n, version_s) = get_rdkafka_version();
println!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topic = matches.value_of("topic").unwrap();
let connection = matches.value_of("connection").unwrap();
produce(connection, topic);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment