Last active
February 13, 2024 20:40
-
-
Save benmuth/b2c12cbb40ca4d8183c6f17f819e2f2d to your computer and use it in GitHub Desktop.
SciHub extractor for ArchiveBox
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import os | |
import re | |
import requests | |
from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding | |
from scihub import IdentifierNotFoundError, SciHub | |
# DOIs taken from https://www.crossref.org/blog/dois-and-matching-regular-expressions/ | |
# listed in decreasing order of goodness. Not tested yet. | |
DOI_REGEXES = ( | |
re.compile(r"10.\d{4,9}\/[-._;()\/:A-Z0-9]+", re.IGNORECASE), | |
re.compile(r"10.1002\/[^\s]+", re.IGNORECASE), | |
re.compile( | |
r"10.\d{4}\/\d+-\d+X?(\d+)\d+<[\d\w]+:[\d\w]*>\d+.\d+.\w+;\d", re.IGNORECASE | |
), | |
re.compile(r"10.1021\/\w\w\d++", re.IGNORECASE), | |
re.compile(r"10.1207/[\w\d]+\&\d+_\d+", re.IGNORECASE), | |
) | |
# yoinked from archivebox/util.py | |
def download_url(url: str, timeout: int = 10) -> str: | |
"""Download the contents of a remote url and return the text""" | |
response = requests.get( | |
url, | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15" | |
}, | |
verify=True, | |
timeout=timeout, | |
) | |
content_type = response.headers.get("Content-Type", "") | |
encoding = http_content_type_encoding(content_type) or html_body_declared_encoding( | |
response.text | |
) | |
if encoding is not None: | |
response.encoding = encoding | |
return response.text | |
def parse_dois_from_text(s: str) -> list[str]: | |
for doi_regex in DOI_REGEXES: | |
matches = doi_regex.findall(s) | |
if matches: | |
return matches | |
return [] | |
def filter_dois(doi_matches): | |
# NOTE: Only keeping pdfs and matches without extensions. | |
# Haven't tested if this is a reasonable filter | |
filtered_dois = [] | |
for doi_match in doi_matches: | |
if "." in os.path.basename(doi_match): | |
_, ext = os.path.splitext(doi_match) | |
if ext.lower() == ".pdf": | |
filtered_dois.append(doi_match) | |
else: | |
filtered_dois.append(doi_match) | |
return filtered_dois | |
def parse_dois_from_html(html): | |
dois = set(parse_dois_from_text(html)) | |
return filter_dois(dois) | |
def download_from_identifier(identifier: str, out: str, sh: SciHub): | |
result = sh.download(identifier, out) | |
if "err" in result: # this is the API that scihub.py uses for errors | |
print(f"{result['err']}") | |
else: | |
print(f"Successfully downloaded file with identifier {identifier}") | |
def save_scihub(identifier: str, out: str): | |
sh = SciHub() | |
if "http" in identifier: | |
print(f"Attempting to download from {identifier}") | |
try: | |
download_from_identifier(identifier, out, sh) | |
except IdentifierNotFoundError: | |
print( | |
"Identifier not found on sci-hub mirrors. Parsing HTML for new identifiers." | |
) | |
dois = parse_dois_from_html(download_url(identifier)) | |
if dois: | |
print(f"Found DOIs in HTML: {dois}\n Attempting to download") | |
else: | |
print("No valid identifiers found") | |
sh = SciHub() # refresh available_base_url_list | |
for doi in dois: | |
download_from_identifier(doi, out, sh) | |
else: | |
dois = parse_dois_from_text(identifier) | |
print(f"Attempting to download from {dois}") | |
if not dois: | |
raise Exception("No DOIs found in input.") | |
for doi in dois: | |
download_from_identifier(doi, out, sh) | |
def main(): | |
# Some cli arguments from scihub.py | |
parser = argparse.ArgumentParser( | |
description="SciHub - To remove all barriers in the way of science." | |
) | |
parser.add_argument( | |
"-d", | |
"--download", | |
metavar="(DOI|PMID|URL)", | |
help="tries to find and download the paper with the given identifier", | |
type=str, | |
) | |
parser.add_argument( | |
"-f", | |
"--file", | |
metavar="path", | |
help="pass file with list of newline separated identifiers and download each", | |
type=str, | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
metavar="path", | |
help="optional output directory for downloaded papers", | |
default="", | |
type=str, | |
) | |
args = parser.parse_args() | |
if args.download: | |
save_scihub(args.download, args.output) | |
elif args.file: | |
with open(args.file, "r") as f: | |
identifiers = f.read().splitlines() | |
for identifier in identifiers: | |
save_scihub(identifier, args.output) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# I modified this in a few ways from the `scihub.py` GH repo: | |
# - the user agent is changed to work on my Mac | |
# - it will now search through all Sci-Hub links from sci-hub.now.sh for the source PDF | |
# instead of giving up after the first one | |
# - I added a specific exception for when no identifier matches in any base Sci-Hub url | |
""" | |
Sci-API Unofficial API | |
[Search|Download] research papers from [scholar.google.com|sci-hub.io]. | |
@author zaytoun | |
""" | |
import re | |
import argparse | |
import hashlib | |
import logging | |
import os | |
import requests | |
import urllib3 | |
from bs4 import BeautifulSoup | |
from retrying import retry | |
# log config | |
logging.basicConfig() | |
logger = logging.getLogger('Sci-Hub') | |
logger.setLevel(logging.DEBUG) | |
# | |
urllib3.disable_warnings() | |
# constants | |
SCHOLARS_BASE_URL = 'https://scholar.google.com/scholar' | |
HEADERS = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15" | |
} | |
class IdentifierNotFoundError(Exception): | |
pass | |
class SciHub(object): | |
""" | |
SciHub class can search for papers on Google Scholars | |
and fetch/download papers from sci-hub.io | |
""" | |
def __init__(self): | |
self.sess = requests.Session() | |
self.sess.headers = HEADERS | |
self.available_base_url_list = self._get_available_scihub_urls() | |
# self.available_base_url_list = ['https://sci-hub.tw'] | |
self.base_url = self.available_base_url_list[0] + '/' | |
def _get_available_scihub_urls(self): | |
''' | |
Finds available scihub urls via https://sci-hub.now.sh/ | |
''' | |
urls = [] | |
res = requests.get('https://sci-hub.now.sh/') | |
s = self._get_soup(res.content) | |
for a in s.find_all('a', href=True): | |
if 'sci-hub.' in a['href']: | |
urls.append(a['href']) | |
return urls | |
def set_proxy(self, proxy): | |
''' | |
set proxy for session | |
:param proxy_dict: | |
:return: | |
''' | |
if proxy: | |
self.sess.proxies = { | |
"http": proxy, | |
"https": proxy, } | |
def _change_base_url(self): | |
if not self.available_base_url_list: | |
# raise Exception('Ran out of valid sci-hub urls') | |
raise IdentifierNotFoundError('Ran out of valid sci-hub urls') | |
del self.available_base_url_list[0] | |
self.base_url = self.available_base_url_list[0] + '/' | |
logger.info("I'm changing to {}".format(self.available_base_url_list[0])) | |
def search(self, query, limit=10, download=False): | |
""" | |
Performs a query on scholar.google.com, and returns a dictionary | |
of results in the form {'papers': ...}. Unfortunately, as of now, | |
captchas can potentially prevent searches after a certain limit. | |
""" | |
start = 0 | |
results = {'papers': []} | |
while True: | |
try: | |
res = self.sess.get(SCHOLARS_BASE_URL, params={'q': query, 'start': start}) | |
except requests.exceptions.RequestException as e: | |
results['err'] = 'Failed to complete search with query %s (connection error)' % query | |
return results | |
s = self._get_soup(res.content) | |
papers = s.find_all('div', class_="gs_r") | |
if not papers: | |
if 'CAPTCHA' in str(res.content): | |
results['err'] = 'Failed to complete search with query %s (captcha)' % query | |
return results | |
for paper in papers: | |
if not paper.find('table'): | |
source = None | |
pdf = paper.find('div', class_='gs_ggs gs_fl') | |
link = paper.find('h3', class_='gs_rt') | |
if pdf: | |
source = pdf.find('a')['href'] | |
elif link.find('a'): | |
source = link.find('a')['href'] | |
else: | |
continue | |
results['papers'].append({ | |
'name': link.text, | |
'url': source | |
}) | |
if len(results['papers']) >= limit: | |
return results | |
start += 10 | |
@retry(wait_random_min=100, wait_random_max=1000, stop_max_attempt_number=10) | |
def download(self, identifier, destination='', path=None): | |
""" | |
Downloads a paper from sci-hub given an indentifier (DOI, PMID, URL). | |
Currently, this can potentially be blocked by a captcha if a certain | |
limit has been reached. | |
""" | |
data = self.fetch(identifier) | |
if not 'err' in data: | |
self._save(data['pdf'], | |
os.path.join(destination, path if path else data['name'])) | |
return data | |
def fetch(self, identifier): | |
""" | |
Fetches the paper by first retrieving the direct link to the pdf. | |
If the indentifier is a DOI, PMID, or URL pay-wall, then use Sci-Hub | |
to access and download paper. Otherwise, just download paper directly. | |
""" | |
try: | |
url = self._get_direct_url(identifier) | |
# verify=False is dangerous but sci-hub.io | |
# requires intermediate certificates to verify | |
# and requests doesn't know how to download them. | |
# as a hacky fix, you can add them to your store | |
# and verifying would work. will fix this later. | |
res = self.sess.get(url, verify=False) | |
if res.headers['Content-Type'] != 'application/pdf': | |
self._change_base_url() | |
logger.info('Failed to fetch pdf with identifier %s ' | |
'(resolved url %s) due to captcha' % (identifier, url)) | |
raise CaptchaNeedException('Failed to fetch pdf with identifier %s ' | |
'(resolved url %s) due to captcha' % (identifier, url)) | |
# return { | |
# 'err': 'Failed to fetch pdf with identifier %s (resolved url %s) due to captcha' | |
# % (identifier, url) | |
# } | |
else: | |
return { | |
'pdf': res.content, | |
'url': url, | |
'name': self._generate_name(res) | |
} | |
except requests.exceptions.ConnectionError: | |
logger.info('Cannot access {}, changing url'.format(self.available_base_url_list[0])) | |
self._change_base_url() | |
except requests.exceptions.RequestException as e: | |
logger.info('Failed to fetch pdf with identifier %s (resolved url %s) due to request exception.' | |
% (identifier, url)) | |
return { | |
'err': 'Failed to fetch pdf with identifier %s (resolved url %s) due to request exception.' | |
% (identifier, url) | |
} | |
def _get_direct_url(self, identifier): | |
""" | |
Finds the direct source url for a given identifier. | |
""" | |
id_type = self._classify(identifier) | |
return identifier if id_type == 'url-direct' \ | |
else self._search_direct_url(identifier) | |
def _search_direct_url(self, identifier): | |
""" | |
Sci-Hub embeds papers in an iframe. This function finds the actual | |
source url which looks something like https://moscow.sci-hub.io/.../....pdf. | |
""" | |
# res = self.sess.get(self.base_url + identifier, verify=False) | |
# s = self._get_soup(res.content) | |
# iframe = s.find('iframe') | |
# if iframe: | |
# return iframe.get('src') if not iframe.get('src').startswith('//') \ | |
# else 'http:' + iframe.get('src') | |
while True: | |
res = self.sess.get(self.base_url + identifier, verify=False) | |
s = self._get_soup(res.content) | |
iframe = s.find("iframe") | |
if iframe: | |
return ( | |
iframe.get("src") | |
if not iframe.get("src").startswith("//") | |
else "http:" + iframe.get("src") | |
) | |
else: | |
self._change_base_url() | |
def _classify(self, identifier): | |
""" | |
Classify the type of identifier: | |
url-direct - openly accessible paper | |
url-non-direct - pay-walled paper | |
pmid - PubMed ID | |
doi - digital object identifier | |
""" | |
if (identifier.startswith('http') or identifier.startswith('https')): | |
if identifier.endswith('pdf'): | |
return 'url-direct' | |
else: | |
return 'url-non-direct' | |
elif identifier.isdigit(): | |
return 'pmid' | |
else: | |
return 'doi' | |
def _save(self, data, path): | |
""" | |
Save a file give data and a path. | |
""" | |
with open(path, 'wb') as f: | |
f.write(data) | |
def _get_soup(self, html): | |
""" | |
Return html soup. | |
""" | |
return BeautifulSoup(html, 'html.parser') | |
def _generate_name(self, res): | |
""" | |
Generate unique filename for paper. Returns a name by calcuating | |
md5 hash of file contents, then appending the last 20 characters | |
of the url which typically provides a good paper identifier. | |
""" | |
name = res.url.split('/')[-1] | |
name = re.sub('#view=(.+)', '', name) | |
pdf_hash = hashlib.md5(res.content).hexdigest() | |
return '%s-%s' % (pdf_hash, name[-20:]) | |
class CaptchaNeedException(Exception): | |
pass | |
def main(): | |
sh = SciHub() | |
parser = argparse.ArgumentParser(description='SciHub - To remove all barriers in the way of science.') | |
parser.add_argument('-d', '--download', metavar='(DOI|PMID|URL)', help='tries to find and download the paper', | |
type=str) | |
parser.add_argument('-f', '--file', metavar='path', help='pass file with list of identifiers and download each', | |
type=str) | |
parser.add_argument('-s', '--search', metavar='query', help='search Google Scholars', type=str) | |
parser.add_argument('-sd', '--search_download', metavar='query', | |
help='search Google Scholars and download if possible', type=str) | |
parser.add_argument('-l', '--limit', metavar='N', help='the number of search results to limit to', default=10, | |
type=int) | |
parser.add_argument('-o', '--output', metavar='path', help='directory to store papers', default='', type=str) | |
parser.add_argument('-v', '--verbose', help='increase output verbosity', action='store_true') | |
parser.add_argument('-p', '--proxy', help='via proxy format like socks5://user:pass@host:port', action='store', type=str) | |
args = parser.parse_args() | |
if args.verbose: | |
logger.setLevel(logging.DEBUG) | |
if args.proxy: | |
sh.set_proxy(args.proxy) | |
if args.download: | |
result = sh.download(args.download, args.output) | |
if 'err' in result: | |
logger.debug('%s', result['err']) | |
else: | |
logger.debug('Successfully downloaded file with identifier %s', args.download) | |
elif args.search: | |
results = sh.search(args.search, args.limit) | |
if 'err' in results: | |
logger.debug('%s', results['err']) | |
else: | |
logger.debug('Successfully completed search with query %s', args.search) | |
print(results) | |
elif args.search_download: | |
results = sh.search(args.search_download, args.limit) | |
if 'err' in results: | |
logger.debug('%s', results['err']) | |
else: | |
logger.debug('Successfully completed search with query %s', args.search_download) | |
for paper in results['papers']: | |
result = sh.download(paper['url'], args.output) | |
if 'err' in result: | |
logger.debug('%s', result['err']) | |
else: | |
logger.debug('Successfully downloaded file with identifier %s', paper['url']) | |
elif args.file: | |
with open(args.file, 'r') as f: | |
identifiers = f.read().splitlines() | |
for identifier in identifiers: | |
result = sh.download(identifier, args.output) | |
if 'err' in result: | |
logger.debug('%s', result['err']) | |
else: | |
logger.debug('Successfully downloaded file with identifier %s', identifier) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment