Skip to content

Instantly share code, notes, and snippets.

@pqlx
Last active February 26, 2020 14:16
Show Gist options
  • Save pqlx/4f16b2b0f1f88ffadfdf212678337d7e to your computer and use it in GitHub Desktop.
Save pqlx/4f16b2b0f1f88ffadfdf212678337d7e to your computer and use it in GitHub Desktop.
import os
import sys
import logging
import urllib.parse
import math
from multiprocessing.pool import ThreadPool
import requests
from bs4 import BeautifulSoup
from tabulate import tabulate
from slugify import slugify
import shutil
import time
class LibGenSearchEntry:
def __init__(self):
self.values = {
"libgen_id": "",
"assc_term": "",
"title": "",
"isbns": [],
"publisher": "",
"year": "",
"page_count": "",
"language": "",
"size": "",
"extension": "",
"md5": "",
"page_url": "",
"download_url": ""
}
def parse_download_url(self):
download_page = requests.get(self["page_url"]).content
soup = BeautifulSoup(download_page, 'html.parser')
self["download_url"] = "http://" + urllib.parse.urlparse(self["page_url"]).netloc + soup.find('a').get('href')
def __getitem__(self, index):
return self.values[index]
def __setitem__(self, index, value):
self.values[index] = value
class LibGenSearch:
LIBGEN_HOSTS = [
"gen.lib.rus.ec"
]
def __init__(self, term, hosts=None, fetch_immediately=True):
self._host = None
if hosts == None:
self._select_host(self.LIBGEN_HOSTS)
else:
self._select_host(hosts)
self.term = term
@property
def enc_term(self):
return urllib.parse.quote(self.term)
def _select_host(self, hosts):
for host in hosts:
try:
r = requests.get(f"http://{host}/")
self._host = host
break
except requests.exceptions.RequestException:
logging.info("host {host} failed!")
if self._host == None:
raise requests.exceptions.RequestException("No working libgen host could be reached. Try supplying your own mirror.")
def fetch_entries(self, entry_count=25):
assert entry_count > 0
def parse_entries(count, entry_page):
parsed_entries = []
soup = BeautifulSoup(entry_page, 'html.parser')
entry_table = soup.find("table", {"class": "c"})
#print(entry_table.prettify())
entries = entry_table.find_all("tr")[1:] # First table entry is the index bar and does not contain book entries
entries = entries[:count] # Trunctuate unwanted entries, libgen only shows 25, 50 or 100 entries
for entry in entries:
fields = entry.find_all("td")
p_entry = LibGenSearchEntry()
p_entry["libgen_id"] = fields[0].text.strip()
p_entry["assc_term"] = self.term
p_entry["authors"] = [a.text.strip() for a in fields[1].find_all('a')]
title = fields[2].find("a", {"id": p_entry["libgen_id"]})
p_entry["title"] = title.find(text=True, recursive=False).strip()
p_entry["md5"] = p_entry["page_url"][-32:].lower()
if title.find("br"): #ISBN field exists
p_entry["isbns"] = title.find_all('i')[-1].text.split(", ")
p_entry["publisher"] = fields[3].text.strip()
p_entry["year"] = fields[4].text.strip()
p_entry["page_count"] = fields[5].text.strip()
p_entry["language"] = fields[6].text.strip()
p_entry["size"] = fields[7].text.strip()
p_entry["extension"] = fields[8].text.strip()
p_entry["page_url"] = fields[9].find('a').get('href')
parsed_entries.append(p_entry)
return parsed_entries
entries = []
for i in range(math.ceil(entry_count/100)):
url = f"http://{self._host}/search.php?req={self.enc_term}&open=0&res={min(100, entry_count - i * 100)}&view=simple&phrase=1&column=def&page={i+1}"
entry_page = requests.get(url).content
parsed = parse_entries( min(100, entry_count - i * 100), entry_page)
if len(parsed) == 0:
break
entries.extend(parsed)
return entries
class InteractiveInterface:
def __init__(self):
self.entry_queue = []
def print_help(self):
print(
"""
[h|elp] - Show this menu
[s|earch] [entry_amount] search term - Search for term on libgen and show results, and select
[q|ueue] - Show all entries queued for download
[d|ownload] - Initiate download
[e|xit] - Exit program
"""
)
def _generate_download():
return {
"target_directory": "",
"target_filename": "",
"download_url": "",
"page_url": "",
"queue_index": 0
}
def _print_entries(entries):
table = [[i, entry["title"], ', '.join(entry["authors"]), entry["publisher"], entry["year"], entry["page_count"], entry["size"]] for i, entry in enumerate(entries)]
print(tabulate(table, headers=["Index", "Title", "Author(s)", "Publisher", "Year", "Page #", "File Size"]))
print("\n\n")
def _search(self, entry_count, term):
s = LibGenSearch(term)
return s.fetch_entries(entry_count)
def search_and_select(self, entry_count, term):
r = self._search(entry_count, term)
InteractiveInterface._print_entries(r)
download_indices = input("Supply indices, delimited with commas: ")
if not download_indices:
return
download_indices = list(filter(lambda x: x >= 0 and x < len(r), [int(x.strip()) for x in download_indices.split(",")]))
for i in download_indices:
self.entry_queue.append(r[i])
return
def download(self):
downloads = []
for entry in self.entry_queue:
download = InteractiveInterface._generate_download()
entry.parse_download_url()
filename, extension = os.path.splitext(urllib.parse.unquote(os.path.basename(entry["download_url"])))
download["target_filename"] = f"{slugify(filename)}{extension}"
download["target_directory"] = os.path.join( os.getcwd(), slugify(entry["assc_term"]))
download["download_url"] = entry["download_url"]
download["queue_index"] = len(downloads)
download["page_url"] = entry["page_url"]
downloads.append(download)
def fetch_url(entry):
headers = {
"Host": urllib.parse.urlparse(entry["download_url"]).hostname,
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"DNT": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Referer": entry["page_url"],
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-NL,en;q=0.9,nl-NL;q=0.8,nl;q=0.7,hr-BA;q=0.6,hr;q=0.5,ka-GE;q=0.4,ka;q=0.3,el-GR;q=0.2,el;q=0.1,en-US;q=0.1"
}
path = os.path.join(entry["target_directory"], entry["target_filename"])
if not os.path.exists(path):
with requests.get(entry["download_url"], stream=True, headers=headers) as r:
try:
r.raise_for_status()
except requests.exceptions.HTTPError: # For some reason libgen fails about 1/4 times with consecutive requests... if this happens, just retry until it works.
print("\033[91mDownload for " + entry["target_filename"] + " Failed! Retrying after 10 seconds...\033[0m")
time.sleep(10)
return fetch_url(entry)
print("\033[92mStarted download: " + entry["target_filename"] + "\033[0m")
with open(path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return path
filtered_downloads = []
for entry in downloads:
if os.path.isfile(entry["target_directory"]):
logging.info("Can't write to " + entry["target_directory"] + ": File exists")
continue
if not os.path.isdir(entry["target_directory"]):
os.mkdir(entry["target_directory"])
filtered_downloads.append(entry)
print("Starting download progress... this might take a while.")
results = ThreadPool(8).imap_unordered(fetch_url, filtered_downloads)
for r in results:
print("Finished download: " + r)
def process_query(self, q):
c = q.split(' ')
if c[0] == 'h' or c[0] == "help":
self.print_help()
return
elif c[0] == "s" or c[0] == "search":
try:
entry_count = int(c[1])
if entry_count < 1:
raise ValueError
except ValueError:
print(f"\"{c[1]}\" is not a valid entry count. Should be an integer larger than 0.\n\n")
return
term = ' '.join(c[2:])
self.search_and_select(entry_count, term)
elif c[0] == "q" or c[0] == "queue":
InteractiveInterface._print_entries(self.entry_queue)
elif c[0] == "d" or c[0] == "download":
self.download()
exit()
elif c[0] == "e" or c[0] == "exit":
exit()
else:
print("Could not understand your command.\n\n")
def loop(self):
print("type h for (h)elp")
while True:
q = input("> ")
self.process_query(q)
if __name__ == "__main__":
i = InteractiveInterface()
i.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment