-
-
Save bitemyapp/f32e1ddac4cd2327dab8cee926eb7aec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate log; | |
use async_trait::async_trait; | |
use chashmap::CHashMap; | |
use crossbeam_queue::ArrayQueue; | |
use fantoccini::error::CmdError::BadUrl; | |
use fantoccini::{Client, Locator}; | |
use std::collections::HashSet; | |
use std::sync::Arc; | |
use url::{Host, Url}; | |
const QUEUE_CAPACITY: usize = 100000; | |
type Error = fantoccini::error::CmdError; | |
// type Pool = deadpool::Pool<Client, Error>; | |
type Pool = deadpool::unmanaged::Pool<Client>; | |
async fn create_client(port: &str) -> Result<Client, Error> { | |
let client_url = format!("http://localhost:{}", port); | |
let mut client = Client::new(&client_url).await.map_err(|err| { | |
error!("Error instantiating client error was: {:?}", err); | |
fantoccini::error::CmdError::NotJson("Couldn't instantiate client".to_string()) | |
})?; | |
// client.persist().await?; | |
Ok(client) | |
} | |
// struct Manager {} | |
// #[async_trait] | |
// impl deadpool::managed::Manager<Client, fantoccini::error::CmdError> for Manager { | |
// async fn create(&self) -> Result<Client, Error> { | |
// let mut client = Client::new("http://localhost:9515").await.map_err(|err| { | |
// error!("Error instantiating client error was: {:?}", err); | |
// fantoccini::error::CmdError::NotJson("Couldn't instantiate client".to_string()) | |
// })?; | |
// warn!("Spawned client!"); | |
// // client.persist().await?; | |
// Ok(client) | |
// } | |
// async fn recycle(&self, conn: &mut Client) -> deadpool::RecycleResult<Error> { | |
// Ok(()) | |
// } | |
// } | |
#[derive(Clone)] | |
struct Scraping { | |
// client_pool: Pool, | |
client: Client, | |
accepted_host: Host, | |
base_url: Url, | |
queue: Arc<ArrayQueue<Url>>, | |
queued: CHashMap<Url, ()>, | |
scraped: CHashMap<Url, ()>, | |
unique: CHashMap<Url, ()>, | |
} | |
async fn scrape_page(scraping: &mut Scraping, url: Url) -> Result<(), Error> { | |
error!("Started scrape_page"); | |
// let mut client = scraping | |
// .client_pool | |
// .get() | |
// .await; | |
// // .expect("Failed to pull a client from the pool"); | |
// let some_client = scraping.client.take(); | |
// let mut client = client.ok_or(fantoccini::error::CmdError::NotJson("No client"))?; | |
error!("Acquired client"); | |
scraping.client.goto(url.as_str()).await?; | |
error!("Went to URL"); | |
let anchor_tags = scraping.client.find_all(Locator::Css("a")).await?; | |
for mut anchor_tag in anchor_tags.into_iter() { | |
debug!("anchor_tag html: {:?}", anchor_tag.html(false).await); | |
if let Some(href) = anchor_tag.attr("href").await? { | |
let maybe_url: Result<Url, url::ParseError> = Url::parse(&href); | |
let url = match maybe_url { | |
Ok(url) => url, | |
Err(RelativeUrlWithoutBase) => { | |
let maybe_joined_url: Result<Url, url::ParseError> = | |
scraping.base_url.join(&href); | |
match maybe_joined_url { | |
Err(err) => { | |
error!( | |
"Bad url that could not be joined, href was: {:?} err was: {:?}", | |
href, err | |
); | |
continue; | |
} | |
Ok(url) => url, | |
} | |
} | |
err => { | |
error!("Bad url, error was: {:?}", err); | |
continue; | |
} | |
}; | |
debug!("{:?}", url); | |
let maybe_host: Option<Host> = url.host().map(|host| host.to_owned()); | |
match maybe_host { | |
None => { | |
warn!("Host not found for url: {}", url); | |
continue; | |
} | |
Some(host) => { | |
debug!("href: {:?}", href); | |
if scraping.accepted_host == host { | |
debug!("pushing url: {:?}", url); | |
scraping.queue.push(url.clone()).map_err(|err| { | |
fantoccini::error::CmdError::NotJson(format!("{}", err)) | |
})?; | |
scraping.queued.insert(url.clone(), ()); | |
} else { | |
debug!("rejecting host: {:?} url: {:?}", host, url); | |
} | |
} | |
} | |
} | |
} | |
scraping.scraped.insert(url.clone(), ()); | |
scraping.unique.insert(url.clone(), ()); | |
Ok(()) | |
} | |
// async fn scrape_worker(mut scraping: Scraping) -> Result<(), Error> { | |
async fn scrape_worker(mut scraping: Scraping) -> ! { | |
use std::time::Duration; | |
use tokio::time::*; | |
debug!("Queue capacity is: {}", scraping.queue.capacity()); | |
debug!("Queue length is: {}", scraping.queue.len()); | |
// while let Ok(url) = scraping.queue.pop() { | |
loop { | |
let maybe_url = scraping.queue.pop(); | |
match maybe_url { | |
Err(err) => { | |
let five_seconds = Duration::new(5, 0); | |
tokio_timer::sleep(five_seconds); | |
}, | |
Ok(url) => { | |
debug!("Starting scrape of url: {:?}", url); | |
{ | |
let been_scraped = scraping.scraped.get(&url); | |
if been_scraped.is_some() { | |
info!("URL already scraped, continuing: {:?}", url); | |
continue; | |
} | |
} | |
let _ = scrape_page(&mut scraping, url).await; | |
} | |
} | |
} | |
// Ok(()) | |
} | |
#[tokio::main(core_threads = 4, max_threads = 10)] | |
async fn main() -> Result<(), Error> { | |
env_logger::init(); | |
// let mgr = Manager {}; | |
// let pool = Pool::new(mgr, 16); | |
// let pool = Pool::from(vec![ | |
// create_client("9515").await?, | |
// create_client("9515").await?, | |
// // create_client("9516").await?, | |
// // create_client("9517").await?, | |
// // create_client("9518").await?, | |
// ]); | |
let queue = ArrayQueue::new(QUEUE_CAPACITY); | |
let queued = CHashMap::new(); | |
let scraped = CHashMap::new(); | |
let unique = CHashMap::new(); | |
let accepted_host = Host::Domain("bitemyapp.com".to_string()); | |
let url_str = "https://bitemyapp.com/"; | |
let base_url = Url::parse(url_str).expect("Couldn't parse base url"); | |
let mut scraping = Scraping { | |
// client_pool: pool, | |
client: create_client("9515").await?, | |
queue: Arc::new(queue), | |
queued, | |
scraped, | |
unique, | |
accepted_host, | |
base_url: base_url.clone(), | |
}; | |
// scraping.client.goto(url_str).await?; | |
// let url = scraping.client.current_url().await?; | |
// debug!("{:?}", url); | |
// scrape_page(&mut scraping, url).await?; | |
debug!("Pushing base_url onto queue"); | |
scraping | |
.queue | |
.push(base_url) | |
.map_err(|err| fantoccini::error::CmdError::NotJson(format!("{}", err)))?; | |
let cloned_scraping = scraping.clone(); | |
let mut scraping1 = scraping.clone(); | |
let mut scraping2 = scraping.clone(); | |
let mut scraping3 = scraping.clone(); | |
let mut scraping4 = scraping.clone(); | |
scraping1.client = create_client("9515").await?; | |
scraping2.client = create_client("9515").await?; | |
scraping3.client = create_client("9515").await?; | |
scraping4.client = create_client("9515").await?; | |
debug!("Starting scrape worker"); | |
use futures::Future; | |
// let browser1 = scraping.client_pool.get().await; | |
// let browser2 = scraping.client_pool.get().await; | |
// let browser3 = scraping.client_pool.get().await; | |
// let browser4 = scraping.client_pool.get().await; | |
let mut futures = vec![ | |
tokio::spawn(async { scrape_worker(scraping1).await }), | |
tokio::spawn(async { scrape_worker(scraping2).await }), | |
tokio::spawn(async { scrape_worker(scraping3).await }), | |
tokio::spawn(async { scrape_worker(scraping4).await }), | |
]; | |
// for _ in 0..6 { | |
// tokio::spawn(async { | |
// let mut c = create_client("9515").await.unwrap(); | |
// c.goto("https://rust-lang.org").await.unwrap(); | |
// }); | |
// } | |
// std::mem::drop(browser1); | |
// std::mem::drop(browser2); | |
// std::mem::drop(browser3); | |
// std::mem::drop(browser4); | |
// loop {} | |
let results = futures::future::join_all(futures).await; | |
// handle1.await; | |
// handle2.await; | |
// handle3.await; | |
// handle4.await; | |
// let thread = std::thread::spawn(move || { | |
// scrape_worker(scraping.clone()).await; | |
// }); | |
// thread.join().expect("worker died"); | |
debug!("Scrape worker finished"); | |
// scraping.client.close().await?; | |
for (url, _) in cloned_scraping.unique { | |
println!("{}", url); | |
} | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment