Skip to content

Instantly share code, notes, and snippets.

@asad-awadia
Created May 25, 2023 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asad-awadia/672b2106e99199d0dfc71be650ea373e to your computer and use it in GitHub Desktop.
Save asad-awadia/672b2106e99199d0dfc71be650ea373e to your computer and use it in GitHub Desktop.
roux-stream.rs
use async_std::task::sleep;
use crypto::digest::Digest;
use crypto::sha1::Sha1;
use futures_util::*;
use roux::comment::CommentData;
use roux_stream::stream_comments;
use serde_json::{json, Value};
use std::{borrow::Borrow, collections::HashSet, time::Duration};
use tokio_retry::strategy::ExponentialBackoff;
use chrono::{self, Timelike};
#[tokio::main]
async fn main() {
let subreddits = HashSet::from(["30PlusSkinCare","Advice","Anxiety",]);
let mut tasks = Vec::with_capacity(subreddits.len());
for sr in subreddits {
let task = tokio::spawn(async move {
get_comments_for_sub_reddit(sr).await;
});
tasks.push(task);
}
for task in tasks {
let t = task.await;
if t.is_err() {
println!("{:?}", t.unwrap_err().to_string());
}
}
}
async fn get_comments_for_sub_reddit(subreddit_name: &str) {
let subreddit = roux::subreddit::Subreddit::new(&subreddit_name);
let retry_strategy = ExponentialBackoff::from_millis(5).factor(100).take(3);
let (mut stream, join_handle) = stream_comments(
&subreddit,
Duration::from_secs(std::env::var("SLEEP").unwrap_or("30".to_string()).parse::<u64>().unwrap_or(30)),
retry_strategy,
Some(Duration::from_secs(30)),
);
let api = std::env::var("URL").unwrap_or("http://api:9090".to_string());
println!("before while loop for sr-name {} - posting to {}", subreddit_name, api);
let mut docs: Vec<Value> = Vec::new();
while let Some(comment) = stream.next().await {
// `comment` is an `Err` if getting the latest comments
// from Reddit failed even after retrying.
if comment.is_err() {
println!(
"got comment error {} - sleeping for 2 seconds sr-name {}",
comment.err().unwrap().to_string(),
subreddit_name
);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
let c = comment.unwrap();
if c.body.as_ref().unwrap().len() < 32 {
continue;
}
docs.push(comment_to_json(c.borrow()));
// maybe this part is leaking?
if docs.len() >= 1024 {
match ureq::post(&api)
.send_string(serde_json::to_string(&docs).unwrap_or_default().as_str())
{
Ok(_response) => {
/* it worked */
//let j: serde_json::Value = response.into_json().unwrap();
}
Err(error) => {
println!("ingest http request failed with error {}", error)
}
}
docs.clear();
let date = chrono::offset::Utc::now();
if date.hour() >= 21 {
// sleep for 12 hours
println!("it is after 9pm - going to sleep for 12 hours.. sr name {}", subreddit_name);
sleep(Duration::from_secs(43200)).await;
}
}
}
// In case there was an error sending the submissions through the
// stream, `join_handle` will report it.
println!("joining");
join_handle.await.unwrap().unwrap();
}
fn comment_to_json(comment: &CommentData) -> Value {
return json!({
"cid": comment.id.as_ref().unwrap(),
"link": comment.link_permalink.as_ref().unwrap(),
"title": comment.link_title.as_ref().unwrap(),
"body": comment.body.as_ref().unwrap(),
"subreddit": comment.subreddit.as_ref().unwrap(),
"create_time": comment.created_utc.as_ref().unwrap(),
"author": comment.author.as_ref().unwrap(),
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment