Last active
February 26, 2020 14:16
-
-
Save pqlx/4f16b2b0f1f88ffadfdf212678337d7e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
import urllib.parse | |
import math | |
from multiprocessing.pool import ThreadPool | |
import requests | |
from bs4 import BeautifulSoup | |
from tabulate import tabulate | |
from slugify import slugify | |
import shutil | |
import time | |
class LibGenSearchEntry: | |
def __init__(self): | |
self.values = { | |
"libgen_id": "", | |
"assc_term": "", | |
"title": "", | |
"isbns": [], | |
"publisher": "", | |
"year": "", | |
"page_count": "", | |
"language": "", | |
"size": "", | |
"extension": "", | |
"md5": "", | |
"page_url": "", | |
"download_url": "" | |
} | |
def parse_download_url(self): | |
download_page = requests.get(self["page_url"]).content | |
soup = BeautifulSoup(download_page, 'html.parser') | |
self["download_url"] = "http://" + urllib.parse.urlparse(self["page_url"]).netloc + soup.find('a').get('href') | |
def __getitem__(self, index): | |
return self.values[index] | |
def __setitem__(self, index, value): | |
self.values[index] = value | |
class LibGenSearch: | |
LIBGEN_HOSTS = [ | |
"gen.lib.rus.ec" | |
] | |
def __init__(self, term, hosts=None, fetch_immediately=True): | |
self._host = None | |
if hosts == None: | |
self._select_host(self.LIBGEN_HOSTS) | |
else: | |
self._select_host(hosts) | |
self.term = term | |
@property | |
def enc_term(self): | |
return urllib.parse.quote(self.term) | |
def _select_host(self, hosts): | |
for host in hosts: | |
try: | |
r = requests.get(f"http://{host}/") | |
self._host = host | |
break | |
except requests.exceptions.RequestException: | |
logging.info("host {host} failed!") | |
if self._host == None: | |
raise requests.exceptions.RequestException("No working libgen host could be reached. Try supplying your own mirror.") | |
def fetch_entries(self, entry_count=25): | |
assert entry_count > 0 | |
def parse_entries(count, entry_page): | |
parsed_entries = [] | |
soup = BeautifulSoup(entry_page, 'html.parser') | |
entry_table = soup.find("table", {"class": "c"}) | |
#print(entry_table.prettify()) | |
entries = entry_table.find_all("tr")[1:] # First table entry is the index bar and does not contain book entries | |
entries = entries[:count] # Trunctuate unwanted entries, libgen only shows 25, 50 or 100 entries | |
for entry in entries: | |
fields = entry.find_all("td") | |
p_entry = LibGenSearchEntry() | |
p_entry["libgen_id"] = fields[0].text.strip() | |
p_entry["assc_term"] = self.term | |
p_entry["authors"] = [a.text.strip() for a in fields[1].find_all('a')] | |
title = fields[2].find("a", {"id": p_entry["libgen_id"]}) | |
p_entry["title"] = title.find(text=True, recursive=False).strip() | |
p_entry["md5"] = p_entry["page_url"][-32:].lower() | |
if title.find("br"): #ISBN field exists | |
p_entry["isbns"] = title.find_all('i')[-1].text.split(", ") | |
p_entry["publisher"] = fields[3].text.strip() | |
p_entry["year"] = fields[4].text.strip() | |
p_entry["page_count"] = fields[5].text.strip() | |
p_entry["language"] = fields[6].text.strip() | |
p_entry["size"] = fields[7].text.strip() | |
p_entry["extension"] = fields[8].text.strip() | |
p_entry["page_url"] = fields[9].find('a').get('href') | |
parsed_entries.append(p_entry) | |
return parsed_entries | |
entries = [] | |
for i in range(math.ceil(entry_count/100)): | |
url = f"http://{self._host}/search.php?req={self.enc_term}&open=0&res={min(100, entry_count - i * 100)}&view=simple&phrase=1&column=def&page={i+1}" | |
entry_page = requests.get(url).content | |
parsed = parse_entries( min(100, entry_count - i * 100), entry_page) | |
if len(parsed) == 0: | |
break | |
entries.extend(parsed) | |
return entries | |
class InteractiveInterface: | |
def __init__(self): | |
self.entry_queue = [] | |
def print_help(self): | |
print( | |
""" | |
[h|elp] - Show this menu | |
[s|earch] [entry_amount] search term - Search for term on libgen and show results, and select | |
[q|ueue] - Show all entries queued for download | |
[d|ownload] - Initiate download | |
[e|xit] - Exit program | |
""" | |
) | |
def _generate_download(): | |
return { | |
"target_directory": "", | |
"target_filename": "", | |
"download_url": "", | |
"page_url": "", | |
"queue_index": 0 | |
} | |
def _print_entries(entries): | |
table = [[i, entry["title"], ', '.join(entry["authors"]), entry["publisher"], entry["year"], entry["page_count"], entry["size"]] for i, entry in enumerate(entries)] | |
print(tabulate(table, headers=["Index", "Title", "Author(s)", "Publisher", "Year", "Page #", "File Size"])) | |
print("\n\n") | |
def _search(self, entry_count, term): | |
s = LibGenSearch(term) | |
return s.fetch_entries(entry_count) | |
def search_and_select(self, entry_count, term): | |
r = self._search(entry_count, term) | |
InteractiveInterface._print_entries(r) | |
download_indices = input("Supply indices, delimited with commas: ") | |
if not download_indices: | |
return | |
download_indices = list(filter(lambda x: x >= 0 and x < len(r), [int(x.strip()) for x in download_indices.split(",")])) | |
for i in download_indices: | |
self.entry_queue.append(r[i]) | |
return | |
def download(self): | |
downloads = [] | |
for entry in self.entry_queue: | |
download = InteractiveInterface._generate_download() | |
entry.parse_download_url() | |
filename, extension = os.path.splitext(urllib.parse.unquote(os.path.basename(entry["download_url"]))) | |
download["target_filename"] = f"{slugify(filename)}{extension}" | |
download["target_directory"] = os.path.join( os.getcwd(), slugify(entry["assc_term"])) | |
download["download_url"] = entry["download_url"] | |
download["queue_index"] = len(downloads) | |
download["page_url"] = entry["page_url"] | |
downloads.append(download) | |
def fetch_url(entry): | |
headers = { | |
"Host": urllib.parse.urlparse(entry["download_url"]).hostname, | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
"DNT": "1", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", | |
"Referer": entry["page_url"], | |
"Accept-Encoding": "gzip, deflate", | |
"Accept-Language": "en-NL,en;q=0.9,nl-NL;q=0.8,nl;q=0.7,hr-BA;q=0.6,hr;q=0.5,ka-GE;q=0.4,ka;q=0.3,el-GR;q=0.2,el;q=0.1,en-US;q=0.1" | |
} | |
path = os.path.join(entry["target_directory"], entry["target_filename"]) | |
if not os.path.exists(path): | |
with requests.get(entry["download_url"], stream=True, headers=headers) as r: | |
try: | |
r.raise_for_status() | |
except requests.exceptions.HTTPError: # For some reason libgen fails about 1/4 times with consecutive requests... if this happens, just retry until it works. | |
print("\033[91mDownload for " + entry["target_filename"] + " Failed! Retrying after 10 seconds...\033[0m") | |
time.sleep(10) | |
return fetch_url(entry) | |
print("\033[92mStarted download: " + entry["target_filename"] + "\033[0m") | |
with open(path, 'wb') as f: | |
shutil.copyfileobj(r.raw, f) | |
return path | |
filtered_downloads = [] | |
for entry in downloads: | |
if os.path.isfile(entry["target_directory"]): | |
logging.info("Can't write to " + entry["target_directory"] + ": File exists") | |
continue | |
if not os.path.isdir(entry["target_directory"]): | |
os.mkdir(entry["target_directory"]) | |
filtered_downloads.append(entry) | |
print("Starting download progress... this might take a while.") | |
results = ThreadPool(8).imap_unordered(fetch_url, filtered_downloads) | |
for r in results: | |
print("Finished download: " + r) | |
def process_query(self, q): | |
c = q.split(' ') | |
if c[0] == 'h' or c[0] == "help": | |
self.print_help() | |
return | |
elif c[0] == "s" or c[0] == "search": | |
try: | |
entry_count = int(c[1]) | |
if entry_count < 1: | |
raise ValueError | |
except ValueError: | |
print(f"\"{c[1]}\" is not a valid entry count. Should be an integer larger than 0.\n\n") | |
return | |
term = ' '.join(c[2:]) | |
self.search_and_select(entry_count, term) | |
elif c[0] == "q" or c[0] == "queue": | |
InteractiveInterface._print_entries(self.entry_queue) | |
elif c[0] == "d" or c[0] == "download": | |
self.download() | |
exit() | |
elif c[0] == "e" or c[0] == "exit": | |
exit() | |
else: | |
print("Could not understand your command.\n\n") | |
def loop(self): | |
print("type h for (h)elp") | |
while True: | |
q = input("> ") | |
self.process_query(q) | |
if __name__ == "__main__": | |
i = InteractiveInterface() | |
i.loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment