Created
February 22, 2022 05:24
-
-
Save liaocs2008/47eae476c6a792cd7ae7c8f226bfa043 to your computer and use it in GitHub Desktop.
scrapping results from "thecoinperspective.com"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from webdriver_manager.chrome import ChromeDriverManager | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
import time | |
options = Options() | |
options.add_argument('--headless') | |
options.add_argument('--disable-gpu') | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
# https://stackoverflow.com/questions/54432980/how-to-access-a-site-via-a-headless-driver-without-being-denied-permission | |
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.50 Safari/537.36' | |
options.add_argument('user-agent={0}'.format(user_agent)) | |
driver = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=options) | |
def get_html(url): | |
driver.get(url) | |
SCROLL_PAUSE_TIME = 0.5 | |
last_height = driver.execute_script("return document.body.scrollHeight") | |
while True: | |
driver.execute_script("window.scrollBy(0,400);") | |
time.sleep(SCROLL_PAUSE_TIME) | |
new_height = driver.execute_script("return document.body.scrollHeight") | |
if new_height == last_height: | |
break | |
last_height = new_height | |
html = driver.page_source | |
return html | |
def get_coin_perspective(url, topn=3): | |
html = get_html(url) | |
soup = BeautifulSoup(html) | |
res = {} | |
for div in soup.find_all('div', class_='card-body row p-0')[:topn]: | |
coin = div.find_all('div', class_='ms-2 flex-grow-1')[0].text.split()[0] | |
ratio = div.find_all('div', class_='d-inline float-end fw-bold text-success')[0].text | |
res[coin] = ratio | |
return res | |
def collect_scrapping_results(coin_list): | |
ans = {} | |
for coin in coin_list: | |
scrapping = 0 | |
while scrapping < 3: | |
try: | |
ans[coin] = get_coin_perspective(f"https://thecoinperspective.com/compare/{coin}") | |
except IndexError: | |
scrapping += 1 # allow for two more trials | |
print(f"scraping {coin} failed! trial: {scrapping}") | |
except: | |
scrapping = 3 | |
print(f"scraping {coin} failed! stop trial") | |
else: | |
scrapping = 3 # complete | |
print(f"scraping {coin} success!") | |
### assert topn coin are the same | |
compared_coins = None | |
for coin in ans: | |
if compared_coins: | |
assert set(compared_coins) == set(ans[coin].keys()) | |
else: | |
compared_coins = ans[coin] | |
df = {} | |
for target_coin in compared_coins: | |
df[target_coin] = [] | |
df['name'] = [] | |
for coin in ans: | |
df['name'].append(coin) | |
for target_coin in compared_coins: | |
ratio = ans[coin][target_coin] | |
df[target_coin].append(ratio) | |
### save to dataframe | |
df = pd.DataFrame(df) | |
df = df.sort_values(by=['name']) | |
return df | |
if __name__ == "__main__": | |
coin_list = ["aave", "cardano", "amp-token", "basic-attention-token", | |
"internet-computer", "curve-dao-token", "filecoin", | |
"chainlink", "livepeer", "litecoin", "decentraland", | |
"maker", "havven", "solana", "sushi", "uniswap", | |
"stellar", "yearn-finance", "zcash", "zencash"] | |
df = collect_scrapping_results(coin_list) | |
df.to_csv('coin_perspective.csv', index=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment