-
-
Save bitemyapp/f0ee741224f49be13104e5e3fc1af911 to your computer and use it in GitHub Desktop.
Scraping
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "scraping-rs" | |
version = "0.1.0" | |
authors = ["Chris Allen <cma@bitemyapp.com>"] | |
edition = "2018" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
fantoccini = "0.12.0" | |
tokio = { version = "0.2", features = ["full"] } | |
url = "2.1.1" | |
crossbeam = "0.7.3" | |
crossbeam-queue = "0.2.1" | |
log = "0.4.8" | |
env_logger = "0.7.1" | |
chashmap = "2.2.2" | |
lifeguard = "0.6.0" | |
object-pool = "0.4.4" | |
futures = "0.3.1" | |
deadpool = "0.5.0" | |
async-trait = "0.1.22" | |
[profile.release] | |
debug = true | |
lto = false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate log; | |
use async_trait::async_trait; | |
use chashmap::CHashMap; | |
use crossbeam_queue::ArrayQueue; | |
use fantoccini::error::CmdError::BadUrl; | |
use fantoccini::{Client, Locator}; | |
use std::collections::HashSet; | |
use std::sync::Arc; | |
use url::{Host, Url}; | |
const QUEUE_CAPACITY: usize = 100000; | |
type Error = fantoccini::error::CmdError; | |
type Pool = deadpool::Pool<Client, Error>; | |
// type Pool = deadpool::unmanaged::Pool<Client>; | |
struct Manager {} | |
#[async_trait] | |
impl deadpool::managed::Manager<Client, fantoccini::error::CmdError> for Manager { | |
async fn create(&self) -> Result<Client, Error> { | |
let mut client = Client::new("http://localhost:9515").await.map_err(|err| { | |
error!("Error instantiating client error was: {:?}", err); | |
fantoccini::error::CmdError::NotJson("Couldn't instantiate client".to_string()) | |
})?; | |
warn!("Spawned client!"); | |
// client.persist().await?; | |
Ok(client) | |
} | |
async fn recycle(&self, conn: &mut Client) -> deadpool::RecycleResult<Error> { | |
Ok(()) | |
} | |
} | |
#[derive(Clone)] | |
struct Scraping { | |
client_pool: Pool, | |
accepted_host: Host, | |
base_url: Url, | |
queue: Arc<ArrayQueue<Url>>, | |
queued: CHashMap<Url, ()>, | |
scraped: CHashMap<Url, ()>, | |
unique: CHashMap<Url, ()>, | |
} | |
async fn scrape_page(scraping: &Scraping, url: Url) -> Result<(), Error> { | |
error!("Started scrape_page"); | |
let mut client = scraping | |
.client_pool | |
.get() | |
.await | |
.expect("Failed to pull a client from the pool"); | |
error!("Acquired client"); | |
client.goto(url.as_str()).await?; | |
error!("Went to URL"); | |
let anchor_tags = client.find_all(Locator::Css("a")).await?; | |
for mut anchor_tag in anchor_tags.into_iter() { | |
debug!("anchor_tag html: {:?}", anchor_tag.html(false).await); | |
if let Some(href) = anchor_tag.attr("href").await? { | |
let maybe_url: Result<Url, url::ParseError> = Url::parse(&href); | |
let url = match maybe_url { | |
Ok(url) => url, | |
Err(RelativeUrlWithoutBase) => { | |
let maybe_joined_url: Result<Url, url::ParseError> = | |
scraping.base_url.join(&href); | |
match maybe_joined_url { | |
Err(err) => { | |
error!( | |
"Bad url that could not be joined, href was: {:?} err was: {:?}", | |
href, err | |
); | |
continue; | |
} | |
Ok(url) => url, | |
} | |
} | |
err => { | |
error!("Bad url, error was: {:?}", err); | |
continue; | |
} | |
}; | |
debug!("{:?}", url); | |
let maybe_host: Option<Host> = url.host().map(|host| host.to_owned()); | |
match maybe_host { | |
None => { | |
warn!("Host not found for url: {}", url); | |
continue; | |
} | |
Some(host) => { | |
debug!("href: {:?}", href); | |
if scraping.accepted_host == host { | |
debug!("pushing url: {:?}", url); | |
scraping.queue.push(url.clone()).map_err(|err| { | |
fantoccini::error::CmdError::NotJson(format!("{}", err)) | |
})?; | |
scraping.queued.insert(url.clone(), ()); | |
} else { | |
debug!("rejecting host: {:?} url: {:?}", host, url); | |
} | |
} | |
} | |
} | |
} | |
scraping.scraped.insert(url.clone(), ()); | |
scraping.unique.insert(url.clone(), ()); | |
Ok(()) | |
} | |
async fn scrape_worker(scraping: Scraping) -> Result<(), Error> { | |
debug!("Queue capacity is: {}", scraping.queue.capacity()); | |
debug!("Queue length is: {}", scraping.queue.len()); | |
while let Ok(url) = scraping.queue.pop() { | |
debug!("Starting scrape of url: {:?}", url); | |
let been_scraped = scraping.scraped.get(&url); | |
if been_scraped.is_some() { | |
info!("URL already scraped, continuing: {:?}", url); | |
continue; | |
} | |
scrape_page(&scraping, url).await?; | |
} | |
Ok(()) | |
} | |
#[tokio::main(core_threads = 4, max_threads = 10)] | |
async fn main() -> Result<(), Error> { | |
env_logger::init(); | |
let mgr = Manager {}; | |
let pool = Pool::new(mgr, 16); | |
let queue = ArrayQueue::new(QUEUE_CAPACITY); | |
let queued = CHashMap::new(); | |
let scraped = CHashMap::new(); | |
let unique = CHashMap::new(); | |
let accepted_host = Host::Domain("bitemyapp.com".to_string()); | |
let url_str = "https://bitemyapp.com/"; | |
let base_url = Url::parse(url_str).expect("Couldn't parse base url"); | |
let mut scraping = Scraping { | |
client_pool: pool, | |
queue: Arc::new(queue), | |
queued, | |
scraped, | |
unique, | |
accepted_host, | |
base_url: base_url.clone(), | |
}; | |
// scraping.client.goto(url_str).await?; | |
// let url = scraping.client.current_url().await?; | |
// debug!("{:?}", url); | |
// scrape_page(&mut scraping, url).await?; | |
debug!("Pushing base_url onto queue"); | |
scraping | |
.queue | |
.push(base_url) | |
.map_err(|err| fantoccini::error::CmdError::NotJson(format!("{}", err)))?; | |
let cloned_scraping = scraping.clone(); | |
let scraping1 = scraping.clone(); | |
let scraping2 = scraping.clone(); | |
let scraping3 = scraping.clone(); | |
let scraping4 = scraping.clone(); | |
debug!("Starting scrape worker"); | |
use futures::Future; | |
let browser1 = scraping.client_pool.get().await; | |
let browser2 = scraping.client_pool.get().await; | |
let browser3 = scraping.client_pool.get().await; | |
let browser4 = scraping.client_pool.get().await; | |
let mut futures = vec![ | |
tokio::spawn(async { scrape_worker(scraping1).await }), | |
tokio::spawn(async { scrape_worker(scraping2).await }), | |
tokio::spawn(async { scrape_worker(scraping3).await }), | |
tokio::spawn(async { scrape_worker(scraping4).await }), | |
]; | |
std::mem::drop(browser1); | |
std::mem::drop(browser2); | |
std::mem::drop(browser3); | |
std::mem::drop(browser4); | |
// loop {} | |
let results = futures::future::join_all(futures).await; | |
// handle1.await; | |
// handle2.await; | |
// handle3.await; | |
// handle4.await; | |
// let thread = std::thread::spawn(move || { | |
// scrape_worker(scraping.clone()).await; | |
// }); | |
// thread.join().expect("worker died"); | |
debug!("Scrape worker finished"); | |
// scraping.client.close().await?; | |
for (url, _) in cloned_scraping.unique { | |
println!("{}", url); | |
} | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment