Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import time
from bs4 import BeautifulSoup
from selenium import webdriver
import threading, multiprocessing
def create_driver():
"""returns a chrome webdriver headless"""
chromeOptions = webdriver.ChromeOptions()
chromeOptions.add_argument("--headless") # make it not visible
return webdriver.Chrome(options=chromeOptions)
def try_click_random_link(driver):
"""try to click on a random link on the opened page"""
try:
elements = driver.find_elements_by_tag_name('a:link')
element = elements[len(elements)//3] # try being more deterministic for threads/process
element.click()
except:
pass
def get_title(url, webdriver=None):
"""get the url html title using BeautifulSoup
if driver is None uses a new chrome-driver and quit() after
otherwise uses the driver provided and don't quit() after
"""
def print_title(driver):
driver.get(url)
#[ try_click_random_link(driver) for i in range(8) ] # try to click-walk through 8 pages on random found links
soup = BeautifulSoup(driver.page_source,"lxml")
item = soup.find('title')
print(item.string.strip())
if webdriver:
print_title(webdriver)
else:
webdriver = create_driver()
print_title(webdriver)
webdriver.quit()
links = ["https://www.amazon.com", "https://www.google.com", "https://www.youtube.com/", "https://www.facebook.com/", "https://www.wikipedia.org/",
"https://us.yahoo.com/?p=us", "https://www.instagram.com/", "https://www.globo.com/", "https://outlook.live.com/owa/"]
def main_sequentially():
start_time = time.time()
driver = create_driver()
for link in links: # simulation clicks
get_title(link, driver)
driver.quit()
return (time.time() - start_time)
def main_threads():
start_time = time.time()
threads = []
for link in links: # each thread a new 'click'
th = threading.Thread(target=get_title, args=(link,))
th.start() # could sleep 1 between 'clicks' with `time.sleep(1)``
threads.append(th)
for th in threads:
th.join() # Main thread wait for threads finish
return (time.time() - start_time)
def main_multiprocessing():
start_time = time.time()
processes = []
for link in links: # each thread a new 'click'
ps = multiprocessing.Process(target=get_title, args=(link,))
ps.start() # could sleep 1 between 'clicks' with `time.sleep(1)``
processes.append(ps)
for ps in processes:
ps.join() # Main wait for processes finish
return (time.time() - start_time)
def run_nget_times():
"""only for statistical measuraments - using this as a module"""
return main_sequentially(), main_threads(), main_multiprocessing()
if __name__ == '__main__':
seq_time = main_sequentially()
th_time = main_threads()
ps_time = main_multiprocessing()
print("sequential {:0} seconds ---".format(seq_time))
print("multithreads {:0} seconds ---".format(th_time))
print("multiprocessing {:0} seconds ---".format(ps_time))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment