Skip to content

Instantly share code, notes, and snippets.

@szeidner
Created December 11, 2017 13:39
Show Gist options
  • Save szeidner/36c0d9880cb7384634afb48cf454b600 to your computer and use it in GitHub Desktop.
Save szeidner/36c0d9880cb7384634afb48cf454b600 to your computer and use it in GitHub Desktop.
Fetch urls concurrently
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio_core;
extern crate num_cpus;
use db;
use diesel::prelude::*;
use super::schema::directory::dsl::*;
use self::futures::{Future, Stream};
use self::futures::future::join_all;
use self::hyper::{Client, Error, Body, Chunk};
use self::hyper::client::{HttpConnector, Response};
use self::hyper_tls::HttpsConnector;
use self::tokio_core::reactor::Core;
use rss::Channel;
use output::Output;
use models::DirectoryItem;
pub struct Fetcher;
impl Fetcher {
/// new
/// Create a new instance of an https client to be used for all document requests
pub fn new() -> Fetcher {
Fetcher {}
}
/// Fetch all provided urls one at a time
pub fn fetch_all(&mut self) {
let feed_urls = get_all_feed_urls();
let mut core = Core::new().unwrap();
let client = Client::configure()
.connector(HttpsConnector::new(num_cpus::get(), &core.handle()).unwrap())
.build(&core.handle());
let work: Vec<_> = feed_urls.into_iter()
.map(|directory_item| {
fetch_with_hyper(&directory_item, &client)
})
.collect();
println!("Length: {:}", work.len());
core.run(join_all(work)).unwrap();
}
}
/// get_all_feed_urls
/// look up all feed urls listed in the database
#[inline]
fn get_all_feed_urls() -> Vec<DirectoryItem> {
let connection = db::establish_connection();
let feeds: Vec<DirectoryItem> = directory
.filter(feed_url.ne(""))
.load(&connection)
.unwrap();
feeds
}
/// fetch_with_hyper
/// fetch a url with hyper, convert to a channel and write to the database
#[inline]
pub fn fetch_with_hyper(directory_item: &DirectoryItem,
client: &Client<HttpsConnector<HttpConnector>, Body>) -> impl Future<Item=(), Error=Error> {
let uri = directory_item.feed_url.parse().unwrap();
let it_id = directory_item.itunes_id;
client
.get(uri)
.and_then( move |res: Response<Body>|
res.body().concat2()
.and_then(move |body: Chunk|
convert_to_channel(body, it_id)
)
)
.or_else(|e| -> Result<(), Error> {
println!("Error: {:?}", e);
Ok(())
})
}
/// fetch_with_hyper
/// fetch a url with hyper, convert to a channel and write to the database
#[inline]
pub fn convert_to_channel(body: Chunk, it_id: i64) -> Result<(), Error> {
match Channel::read_from(body.as_ref()) {
Ok(channel) => {
Output::write_channel_info(&channel, it_id);
println!("Channel name: {:?}", channel.title());
Ok(())
},
Err(e) => {
println!("Error: {:?}", e);
Ok(())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment