Skip to content

Instantly share code, notes, and snippets.

@bitemyapp
Created January 17, 2020 17:55
Show Gist options
  • Save bitemyapp/f32e1ddac4cd2327dab8cee926eb7aec to your computer and use it in GitHub Desktop.
Save bitemyapp/f32e1ddac4cd2327dab8cee926eb7aec to your computer and use it in GitHub Desktop.
#[macro_use]
extern crate log;
use async_trait::async_trait;
use chashmap::CHashMap;
use crossbeam_queue::ArrayQueue;
use fantoccini::error::CmdError::BadUrl;
use fantoccini::{Client, Locator};
use std::collections::HashSet;
use std::sync::Arc;
use url::{Host, Url};
const QUEUE_CAPACITY: usize = 100000;
type Error = fantoccini::error::CmdError;
// type Pool = deadpool::Pool<Client, Error>;
type Pool = deadpool::unmanaged::Pool<Client>;
async fn create_client(port: &str) -> Result<Client, Error> {
let client_url = format!("http://localhost:{}", port);
let mut client = Client::new(&client_url).await.map_err(|err| {
error!("Error instantiating client error was: {:?}", err);
fantoccini::error::CmdError::NotJson("Couldn't instantiate client".to_string())
})?;
// client.persist().await?;
Ok(client)
}
// struct Manager {}
// #[async_trait]
// impl deadpool::managed::Manager<Client, fantoccini::error::CmdError> for Manager {
// async fn create(&self) -> Result<Client, Error> {
// let mut client = Client::new("http://localhost:9515").await.map_err(|err| {
// error!("Error instantiating client error was: {:?}", err);
// fantoccini::error::CmdError::NotJson("Couldn't instantiate client".to_string())
// })?;
// warn!("Spawned client!");
// // client.persist().await?;
// Ok(client)
// }
// async fn recycle(&self, conn: &mut Client) -> deadpool::RecycleResult<Error> {
// Ok(())
// }
// }
#[derive(Clone)]
struct Scraping {
// client_pool: Pool,
client: Client,
accepted_host: Host,
base_url: Url,
queue: Arc<ArrayQueue<Url>>,
queued: CHashMap<Url, ()>,
scraped: CHashMap<Url, ()>,
unique: CHashMap<Url, ()>,
}
async fn scrape_page(scraping: &mut Scraping, url: Url) -> Result<(), Error> {
error!("Started scrape_page");
// let mut client = scraping
// .client_pool
// .get()
// .await;
// // .expect("Failed to pull a client from the pool");
// let some_client = scraping.client.take();
// let mut client = client.ok_or(fantoccini::error::CmdError::NotJson("No client"))?;
error!("Acquired client");
scraping.client.goto(url.as_str()).await?;
error!("Went to URL");
let anchor_tags = scraping.client.find_all(Locator::Css("a")).await?;
for mut anchor_tag in anchor_tags.into_iter() {
debug!("anchor_tag html: {:?}", anchor_tag.html(false).await);
if let Some(href) = anchor_tag.attr("href").await? {
let maybe_url: Result<Url, url::ParseError> = Url::parse(&href);
let url = match maybe_url {
Ok(url) => url,
Err(RelativeUrlWithoutBase) => {
let maybe_joined_url: Result<Url, url::ParseError> =
scraping.base_url.join(&href);
match maybe_joined_url {
Err(err) => {
error!(
"Bad url that could not be joined, href was: {:?} err was: {:?}",
href, err
);
continue;
}
Ok(url) => url,
}
}
err => {
error!("Bad url, error was: {:?}", err);
continue;
}
};
debug!("{:?}", url);
let maybe_host: Option<Host> = url.host().map(|host| host.to_owned());
match maybe_host {
None => {
warn!("Host not found for url: {}", url);
continue;
}
Some(host) => {
debug!("href: {:?}", href);
if scraping.accepted_host == host {
debug!("pushing url: {:?}", url);
scraping.queue.push(url.clone()).map_err(|err| {
fantoccini::error::CmdError::NotJson(format!("{}", err))
})?;
scraping.queued.insert(url.clone(), ());
} else {
debug!("rejecting host: {:?} url: {:?}", host, url);
}
}
}
}
}
scraping.scraped.insert(url.clone(), ());
scraping.unique.insert(url.clone(), ());
Ok(())
}
// async fn scrape_worker(mut scraping: Scraping) -> Result<(), Error> {
async fn scrape_worker(mut scraping: Scraping) -> ! {
use std::time::Duration;
use tokio::time::*;
debug!("Queue capacity is: {}", scraping.queue.capacity());
debug!("Queue length is: {}", scraping.queue.len());
// while let Ok(url) = scraping.queue.pop() {
loop {
let maybe_url = scraping.queue.pop();
match maybe_url {
Err(err) => {
let five_seconds = Duration::new(5, 0);
tokio_timer::sleep(five_seconds);
},
Ok(url) => {
debug!("Starting scrape of url: {:?}", url);
{
let been_scraped = scraping.scraped.get(&url);
if been_scraped.is_some() {
info!("URL already scraped, continuing: {:?}", url);
continue;
}
}
let _ = scrape_page(&mut scraping, url).await;
}
}
}
// Ok(())
}
#[tokio::main(core_threads = 4, max_threads = 10)]
async fn main() -> Result<(), Error> {
env_logger::init();
// let mgr = Manager {};
// let pool = Pool::new(mgr, 16);
// let pool = Pool::from(vec![
// create_client("9515").await?,
// create_client("9515").await?,
// // create_client("9516").await?,
// // create_client("9517").await?,
// // create_client("9518").await?,
// ]);
let queue = ArrayQueue::new(QUEUE_CAPACITY);
let queued = CHashMap::new();
let scraped = CHashMap::new();
let unique = CHashMap::new();
let accepted_host = Host::Domain("bitemyapp.com".to_string());
let url_str = "https://bitemyapp.com/";
let base_url = Url::parse(url_str).expect("Couldn't parse base url");
let mut scraping = Scraping {
// client_pool: pool,
client: create_client("9515").await?,
queue: Arc::new(queue),
queued,
scraped,
unique,
accepted_host,
base_url: base_url.clone(),
};
// scraping.client.goto(url_str).await?;
// let url = scraping.client.current_url().await?;
// debug!("{:?}", url);
// scrape_page(&mut scraping, url).await?;
debug!("Pushing base_url onto queue");
scraping
.queue
.push(base_url)
.map_err(|err| fantoccini::error::CmdError::NotJson(format!("{}", err)))?;
let cloned_scraping = scraping.clone();
let mut scraping1 = scraping.clone();
let mut scraping2 = scraping.clone();
let mut scraping3 = scraping.clone();
let mut scraping4 = scraping.clone();
scraping1.client = create_client("9515").await?;
scraping2.client = create_client("9515").await?;
scraping3.client = create_client("9515").await?;
scraping4.client = create_client("9515").await?;
debug!("Starting scrape worker");
use futures::Future;
// let browser1 = scraping.client_pool.get().await;
// let browser2 = scraping.client_pool.get().await;
// let browser3 = scraping.client_pool.get().await;
// let browser4 = scraping.client_pool.get().await;
let mut futures = vec![
tokio::spawn(async { scrape_worker(scraping1).await }),
tokio::spawn(async { scrape_worker(scraping2).await }),
tokio::spawn(async { scrape_worker(scraping3).await }),
tokio::spawn(async { scrape_worker(scraping4).await }),
];
// for _ in 0..6 {
// tokio::spawn(async {
// let mut c = create_client("9515").await.unwrap();
// c.goto("https://rust-lang.org").await.unwrap();
// });
// }
// std::mem::drop(browser1);
// std::mem::drop(browser2);
// std::mem::drop(browser3);
// std::mem::drop(browser4);
// loop {}
let results = futures::future::join_all(futures).await;
// handle1.await;
// handle2.await;
// handle3.await;
// handle4.await;
// let thread = std::thread::spawn(move || {
// scrape_worker(scraping.clone()).await;
// });
// thread.join().expect("worker died");
debug!("Scrape worker finished");
// scraping.client.close().await?;
for (url, _) in cloned_scraping.unique {
println!("{}", url);
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment