Created
December 11, 2017 13:39
-
-
Save szeidner/36c0d9880cb7384634afb48cf454b600 to your computer and use it in GitHub Desktop.
Fetch urls concurrently
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate hyper; | |
extern crate hyper_tls; | |
extern crate tokio_core; | |
extern crate num_cpus; | |
use db; | |
use diesel::prelude::*; | |
use super::schema::directory::dsl::*; | |
use self::futures::{Future, Stream}; | |
use self::futures::future::join_all; | |
use self::hyper::{Client, Error, Body, Chunk}; | |
use self::hyper::client::{HttpConnector, Response}; | |
use self::hyper_tls::HttpsConnector; | |
use self::tokio_core::reactor::Core; | |
use rss::Channel; | |
use output::Output; | |
use models::DirectoryItem; | |
pub struct Fetcher; | |
impl Fetcher { | |
/// new | |
/// Create a new instance of an https client to be used for all document requests | |
pub fn new() -> Fetcher { | |
Fetcher {} | |
} | |
/// Fetch all provided urls one at a time | |
pub fn fetch_all(&mut self) { | |
let feed_urls = get_all_feed_urls(); | |
let mut core = Core::new().unwrap(); | |
let client = Client::configure() | |
.connector(HttpsConnector::new(num_cpus::get(), &core.handle()).unwrap()) | |
.build(&core.handle()); | |
let work: Vec<_> = feed_urls.into_iter() | |
.map(|directory_item| { | |
fetch_with_hyper(&directory_item, &client) | |
}) | |
.collect(); | |
println!("Length: {:}", work.len()); | |
core.run(join_all(work)).unwrap(); | |
} | |
} | |
/// get_all_feed_urls | |
/// look up all feed urls listed in the database | |
#[inline] | |
fn get_all_feed_urls() -> Vec<DirectoryItem> { | |
let connection = db::establish_connection(); | |
let feeds: Vec<DirectoryItem> = directory | |
.filter(feed_url.ne("")) | |
.load(&connection) | |
.unwrap(); | |
feeds | |
} | |
/// fetch_with_hyper | |
/// fetch a url with hyper, convert to a channel and write to the database | |
#[inline] | |
pub fn fetch_with_hyper(directory_item: &DirectoryItem, | |
client: &Client<HttpsConnector<HttpConnector>, Body>) -> impl Future<Item=(), Error=Error> { | |
let uri = directory_item.feed_url.parse().unwrap(); | |
let it_id = directory_item.itunes_id; | |
client | |
.get(uri) | |
.and_then( move |res: Response<Body>| | |
res.body().concat2() | |
.and_then(move |body: Chunk| | |
convert_to_channel(body, it_id) | |
) | |
) | |
.or_else(|e| -> Result<(), Error> { | |
println!("Error: {:?}", e); | |
Ok(()) | |
}) | |
} | |
/// fetch_with_hyper | |
/// fetch a url with hyper, convert to a channel and write to the database | |
#[inline] | |
pub fn convert_to_channel(body: Chunk, it_id: i64) -> Result<(), Error> { | |
match Channel::read_from(body.as_ref()) { | |
Ok(channel) => { | |
Output::write_channel_info(&channel, it_id); | |
println!("Channel name: {:?}", channel.title()); | |
Ok(()) | |
}, | |
Err(e) => { | |
println!("Error: {:?}", e); | |
Ok(()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment