Skip to content

Instantly share code, notes, and snippets.

@bitemyapp
Last active January 17, 2020 09:17
Show Gist options
  • Save bitemyapp/f0ee741224f49be13104e5e3fc1af911 to your computer and use it in GitHub Desktop.
Save bitemyapp/f0ee741224f49be13104e5e3fc1af911 to your computer and use it in GitHub Desktop.
Scraping
[package]
name = "scraping-rs"
version = "0.1.0"
authors = ["Chris Allen <cma@bitemyapp.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
fantoccini = "0.12.0"
tokio = { version = "0.2", features = ["full"] }
url = "2.1.1"
crossbeam = "0.7.3"
crossbeam-queue = "0.2.1"
log = "0.4.8"
env_logger = "0.7.1"
chashmap = "2.2.2"
lifeguard = "0.6.0"
object-pool = "0.4.4"
futures = "0.3.1"
deadpool = "0.5.0"
async-trait = "0.1.22"
[profile.release]
debug = true
lto = false
#[macro_use]
extern crate log;
use async_trait::async_trait;
use chashmap::CHashMap;
use crossbeam_queue::ArrayQueue;
use fantoccini::error::CmdError::BadUrl;
use fantoccini::{Client, Locator};
use std::collections::HashSet;
use std::sync::Arc;
use url::{Host, Url};
const QUEUE_CAPACITY: usize = 100000;
type Error = fantoccini::error::CmdError;
type Pool = deadpool::Pool<Client, Error>;
// type Pool = deadpool::unmanaged::Pool<Client>;
struct Manager {}
#[async_trait]
impl deadpool::managed::Manager<Client, fantoccini::error::CmdError> for Manager {
async fn create(&self) -> Result<Client, Error> {
let mut client = Client::new("http://localhost:9515").await.map_err(|err| {
error!("Error instantiating client error was: {:?}", err);
fantoccini::error::CmdError::NotJson("Couldn't instantiate client".to_string())
})?;
warn!("Spawned client!");
// client.persist().await?;
Ok(client)
}
async fn recycle(&self, conn: &mut Client) -> deadpool::RecycleResult<Error> {
Ok(())
}
}
#[derive(Clone)]
struct Scraping {
client_pool: Pool,
accepted_host: Host,
base_url: Url,
queue: Arc<ArrayQueue<Url>>,
queued: CHashMap<Url, ()>,
scraped: CHashMap<Url, ()>,
unique: CHashMap<Url, ()>,
}
async fn scrape_page(scraping: &Scraping, url: Url) -> Result<(), Error> {
error!("Started scrape_page");
let mut client = scraping
.client_pool
.get()
.await
.expect("Failed to pull a client from the pool");
error!("Acquired client");
client.goto(url.as_str()).await?;
error!("Went to URL");
let anchor_tags = client.find_all(Locator::Css("a")).await?;
for mut anchor_tag in anchor_tags.into_iter() {
debug!("anchor_tag html: {:?}", anchor_tag.html(false).await);
if let Some(href) = anchor_tag.attr("href").await? {
let maybe_url: Result<Url, url::ParseError> = Url::parse(&href);
let url = match maybe_url {
Ok(url) => url,
Err(RelativeUrlWithoutBase) => {
let maybe_joined_url: Result<Url, url::ParseError> =
scraping.base_url.join(&href);
match maybe_joined_url {
Err(err) => {
error!(
"Bad url that could not be joined, href was: {:?} err was: {:?}",
href, err
);
continue;
}
Ok(url) => url,
}
}
err => {
error!("Bad url, error was: {:?}", err);
continue;
}
};
debug!("{:?}", url);
let maybe_host: Option<Host> = url.host().map(|host| host.to_owned());
match maybe_host {
None => {
warn!("Host not found for url: {}", url);
continue;
}
Some(host) => {
debug!("href: {:?}", href);
if scraping.accepted_host == host {
debug!("pushing url: {:?}", url);
scraping.queue.push(url.clone()).map_err(|err| {
fantoccini::error::CmdError::NotJson(format!("{}", err))
})?;
scraping.queued.insert(url.clone(), ());
} else {
debug!("rejecting host: {:?} url: {:?}", host, url);
}
}
}
}
}
scraping.scraped.insert(url.clone(), ());
scraping.unique.insert(url.clone(), ());
Ok(())
}
async fn scrape_worker(scraping: Scraping) -> Result<(), Error> {
debug!("Queue capacity is: {}", scraping.queue.capacity());
debug!("Queue length is: {}", scraping.queue.len());
while let Ok(url) = scraping.queue.pop() {
debug!("Starting scrape of url: {:?}", url);
let been_scraped = scraping.scraped.get(&url);
if been_scraped.is_some() {
info!("URL already scraped, continuing: {:?}", url);
continue;
}
scrape_page(&scraping, url).await?;
}
Ok(())
}
#[tokio::main(core_threads = 4, max_threads = 10)]
async fn main() -> Result<(), Error> {
env_logger::init();
let mgr = Manager {};
let pool = Pool::new(mgr, 16);
let queue = ArrayQueue::new(QUEUE_CAPACITY);
let queued = CHashMap::new();
let scraped = CHashMap::new();
let unique = CHashMap::new();
let accepted_host = Host::Domain("bitemyapp.com".to_string());
let url_str = "https://bitemyapp.com/";
let base_url = Url::parse(url_str).expect("Couldn't parse base url");
let mut scraping = Scraping {
client_pool: pool,
queue: Arc::new(queue),
queued,
scraped,
unique,
accepted_host,
base_url: base_url.clone(),
};
// scraping.client.goto(url_str).await?;
// let url = scraping.client.current_url().await?;
// debug!("{:?}", url);
// scrape_page(&mut scraping, url).await?;
debug!("Pushing base_url onto queue");
scraping
.queue
.push(base_url)
.map_err(|err| fantoccini::error::CmdError::NotJson(format!("{}", err)))?;
let cloned_scraping = scraping.clone();
let scraping1 = scraping.clone();
let scraping2 = scraping.clone();
let scraping3 = scraping.clone();
let scraping4 = scraping.clone();
debug!("Starting scrape worker");
use futures::Future;
let browser1 = scraping.client_pool.get().await;
let browser2 = scraping.client_pool.get().await;
let browser3 = scraping.client_pool.get().await;
let browser4 = scraping.client_pool.get().await;
let mut futures = vec![
tokio::spawn(async { scrape_worker(scraping1).await }),
tokio::spawn(async { scrape_worker(scraping2).await }),
tokio::spawn(async { scrape_worker(scraping3).await }),
tokio::spawn(async { scrape_worker(scraping4).await }),
];
std::mem::drop(browser1);
std::mem::drop(browser2);
std::mem::drop(browser3);
std::mem::drop(browser4);
// loop {}
let results = futures::future::join_all(futures).await;
// handle1.await;
// handle2.await;
// handle3.await;
// handle4.await;
// let thread = std::thread::spawn(move || {
// scrape_worker(scraping.clone()).await;
// });
// thread.join().expect("worker died");
debug!("Scrape worker finished");
// scraping.client.close().await?;
for (url, _) in cloned_scraping.unique {
println!("{}", url);
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment