Skip to content

Instantly share code, notes, and snippets.

@kyle-mccarthy
Created April 22, 2021 22:56
Show Gist options
  • Save kyle-mccarthy/b7fd02f109df1acf5bb034fa9e7d2de8 to your computer and use it in GitHub Desktop.
Save kyle-mccarthy/b7fd02f109df1acf5bb034fa9e7d2de8 to your computer and use it in GitHub Desktop.
pull messages from Google PubSub using rust
use google_pubsub1::{api::AcknowledgeRequest, Pubsub};
use std::fs::File;
use std::io::BufReader;
use yup_oauth2::ServiceAccountKey;
static SUBSCRIPTION_PATH: &'static str =
"projects/PROJECT_ID/subscriptions/SUBSCRIPTION";
static ACCOUNT_KEY = &'static str = "YOUR_ACCOUNT_KEY.json";
#[tokio::main]
async fn main() {
let file = std::fs::File::open(ACCOUNT_KEY).unwrap();
let buffer = std::io::BufReader::new(file);
let account_key =
serde_json::from_reader::<BufReader<File>, ServiceAccountKey>(buffer).unwrap();
let auth = yup_oauth2::ServiceAccountAuthenticator::builder(account_key)
.build()
.await
.unwrap();
let hub = Pubsub::new(
hyper::Client::builder().build(hyper_rustls::HttpsConnector::with_native_roots()),
auth,
);
let mut pull_req = google_pubsub1::api::PullRequest::default();
pull_req.max_messages = Some(1);
let (_, messages) = hub
.projects()
.subscriptions_pull(pull_req, SUBSCRIPTION_PATH)
.doit()
.await
.unwrap();
if let Some(messages) = messages.received_messages {
println!("pulled {} messages", messages.len());
let ack_ids = messages
.into_iter()
.flat_map(|m| m.ack_id)
.collect::<Vec<String>>();
if ack_ids.len() > 0 {
let mut ack_req = AcknowledgeRequest::default();
ack_req.ack_ids = Some(ack_ids);
let _ack_res = hub
.projects()
.subscriptions_acknowledge(ack_req, SUBSCRIPTION_PATH)
.doit()
.await
.unwrap();
println!("ACKd messages");
}
} else {
println!("no messages available to pull");
}
println!("exiting");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment