Skip to content

Instantly share code, notes, and snippets.

@ChenyangGao
Last active April 9, 2024 07:32
Show Gist options
  • Save ChenyangGao/30caa9f5eb34a28fe75d2d64efc87b78 to your computer and use it in GitHub Desktop.
Save ChenyangGao/30caa9f5eb34a28fe75d2d64efc87b78 to your computer and use it in GitHub Desktop.
libgen (library genius) 工具集
#!/usr/bin/env python3
# encoding: utf-8
"libgen.rs 搜索信息罗列"
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__version__ = (0, 0, 1)
__all__ = ["search", "info", "get_downlinks", "get_downlink_from_libgenli"]
if __name__ == "__main__":
from argparse import ArgumentParser, RawTextHelpFormatter
parser = ArgumentParser(
formatter_class=RawTextHelpFormatter,
description="libgen.rs 搜索信息罗列",
epilog="""
library genius 是一个著名的电子书分享网站,它有一些镜像站
- http://gen.lib.rus.ec
- https://libgen.is
- http://libgen.is
- http://libgen.rs
- https://libgen.rs
- https://libgen.st
- http://libgen.st
参考:[Library Genesis Guide](https://librarygenesis.net/)
其它电子书网站推荐
- https://libgen.li
- https://annas-archive.org
"""
)
parser.add_argument("url", nargs="?", help="url 链接,直接从浏览器复制过来即可")
parser.add_argument("-d", "--detail-level", type=int, default=0, help="""输出完整信息的级别
- 0 => 【默认值】只输出 md5
- 1 => 输出 json 格式,只包含从搜索列表得到的基本信息
- 2 => 输出 json 格式,在 1 的基础上,增加详细信息
- 3 => 输出 json 格式,在 1 的基础上,增加下载链接
- 4 => 输出 json 格式,在 1 的基础上,增加详细信息和下载链接
""")
parser.add_argument("-b", "--begin", default=1, type=int, help="开始于序号,默认值 1,从 1 开始编号")
parser.add_argument("-e", "--end", default=0, type=int, help="结束于序号(包含),默认值 0,小于等于 0 时不限")
parser.add_argument("-s", "--select", help="提供一个表达式(会注入一个变量 item,是一个 dict),用于筛选条目")
parser.add_argument("-m", "--max-workers", default=1, type=int, help="多线程并发数,默认为 1,小于等于 0 时,则自动确定合适的并发数")
parser.add_argument("-v", "--version", action="store_true", help="输出版本号")
args = parser.parse_args()
if args.version:
print(".".join(map(str, __version__)))
raise SystemExit(0)
if not args.url:
parser.parse_args(["-h"])
from sys import version_info
if version_info < (3, 11):
raise SystemExit("Python 版本过低,请升级到至少 3.11")
try:
from lxml.etree import _ElementTree as ElementTree
from lxml.html import parse, HtmlElement
except ImportError:
from sys import executable
from subprocess import run
run([executable, "-m", "pip", "install", "-U", "lxml"], check=True)
from lxml.etree import _ElementTree as ElementTree
from lxml.html import parse, HtmlElement
from collections.abc import Callable, Iterator
from functools import partial, update_wrapper
from gzip import GzipFile
from itertools import count
from re import compile as re_compile
from typing import Any, Optional
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
from urllib.request import urlopen, Request
CREB_LIBGEN_GET_search = re_compile(b'(?<=")get\.php\?md5=[^"]+').search
def retry(
func: Optional[Callable] = None,
/,
retry_times: int = 3,
exceptions: type[BaseException] | tuple[type[BaseException], ...] = Exception,
do_between: Optional[Callable[[int, BaseException], Any]] = None,
) -> Callable:
if func is None:
return partial(
retry,
retry_times=retry_times,
exceptions=exceptions,
)
if retry_times == 0:
return func
def wrapper(*args, **kwds):
excs: List[BaseException] = []
if retry_times < 0:
it = count()
else:
it = range(retry_times + 1)
prev_exc = None
for i in it:
if i and do_between:
do_between(i, prev_exc)
try:
return func(*args, **kwds)
except exceptions as exc:
exc.__prev__ = prev_exc
prev_exc = exc
excs.append(exc)
except BaseException as exc:
exc.__prev__ = prev_exc
raise exc
raise BaseExceptionGroup("too many retries", tuple(excs))
return wrapper
@retry(retry_times=5)
def fecth_as_etree(url: str) -> ElementTree:
with urlopen(Request(url, headers={"Accept-Encoding": "gzip"}), timeout=10) as resp:
return parse(GzipFile(fileobj=resp))
def extract_libgen_title(td: HtmlElement) -> dict:
d: dict = {}
if len(td) == 1:
el_title = td[0]
else:
d["series"] = td[0].text_content().replace("\xa0", " ")
el_title = td[2]
d["title"] = el_title.text.strip()
url = d["url"] = el_title.attrib["href"]
d["md5"] = url[-32:]
if len(el_title) == 1:
d["edition"] = el_title[0].text_content().replace("\xa0", " ")
elif len(el_title) == 2:
d["isbn"] = el_title[1].text_content().replace("\xa0", " ")
elif len(el_title) >= 3:
d["edition"] = el_title[0].text_content().replace("\xa0", " ")
d["isbn"] = el_title[2].text_content().replace("\xa0", " ")
return d
def extract_libgen_item(tr: HtmlElement) -> dict:
return {
"id": int(tr[0].text),
"authors": [a.text for a in tr[1].xpath(".//a")],
**extract_libgen_title(tr[2]),
"publisher": tr[3].text,
"year": tr[4].text,
"pages": tr[5].text,
"language": tr[6].text,
"size": tr[7].text,
"extension": tr[8].text,
"mirrors": [td[0].attrib["href"] for td in tr[9:-1]],
}
def extract_fiction_title(td: HtmlElement) -> dict:
d: dict = {}
el_title = td[0][0]
d["title"] = el_title.text.strip()
url = d["url"] = el_title.attrib["href"]
d["md5"] = url[-32:]
if len(el_title):
d["edition"] = el_title[0].text
if len(td) > 1:
d["isbn"] = td[1].text
return d
def extract_fiction_item(tr: HtmlElement) -> dict:
return {
"authors": [a.text for a in tr[0].xpath('.//a')],
"series": tr[1].text,
**extract_fiction_title(tr[2]),
"language": tr[3].text,
"upload_datetime": tr[4].attrib["title"][-19:],
"size": tr[4].text.replace("\xa0", " "),
"mirrors": [a.attrib["href"] for a in tr[5].xpath('.//a')],
}
def search(url: str) -> Iterator[dict]:
"搜索文件"
if url.startswith("/"):
url = "https://libgen.rs" + url
elif not url.startswith(("http://", "https://")):
if url.startswith("?"):
url = "https://libgen.rs/search.php" + url
elif parse_qsl(url):
url = "https://libgen.rs/search.php?" + url
else:
url = "https://libgen.rs/search.php?q=" + "+".join(url.replace("&", "%26").split())
urlp = urlparse(url)
params: dict = dict(parse_qsl(urlp.query))
is_fiction = urlp.path.startswith("/fiction")
if is_fiction:
type = "fiction"
extract_item = extract_fiction_item
res = 25
else:
type = "libgen"
extract_item = extract_libgen_item
res = int(params.get("res", 25))
if res not in (25, 50, 100):
res = 25
params["res"] = res
page = int(params.get("page", 1))
if page <= 0:
page = 1
params["page"] = page
url = urlunparse(urlp._replace(query=""))
ls_tr: list[HtmlElement]
while True:
etree = fecth_as_etree(url+"?"+urlencode(params))
if is_fiction:
ls_tr = etree.xpath("body/table/tbody/tr") # type: ignore
else:
ls_tr = etree.xpath("body/table[3]/tr[position()>1]") # type: ignore
for tr in ls_tr:
item = extract_item(tr)
item["url"] = urljoin(url, item["url"])
item["type"] = type
yield item
if len(ls_tr) < res:
break
params["page"] += 1
def info(md5: str, is_fiction: bool = False) -> dict:
"查询文件信息"
def extract_field_text(el):
return el.text_content().rstrip(": ").replace("\xa0", " ")
def extract_el_a(el, callback=None):
info = {
"href": urljoin(url, el.attrib["href"]),
"text": " ".join(el.itertext())
}
if callback:
info.update(callback(el))
return info
def extract_nested_el_table(el):
return dict(zip(
filter(None, map(extract_field_text, el[0])),
map(extract_field_value, el[1]),
))
def extract_field_value(el):
if len(el):
sel = el[0]
if sel.tag == "ul":
return [extract_el_a(a) for a in sel.xpath(".//a")]
elif sel.tag in "b" and len(sel) and sel[0].tag == "a":
return extract_el_a(sel[0])
elif sel.tag == "a":
return [extract_el_a(a) for a in el.xpath(".//a")]
elif sel.tag == "table":
return extract_nested_el_table(sel)
return el.text_content().strip()
def extract_el_a_input_filename(el):
el = el.getparent().find("input")
if el is None:
return []
return [("filename", el.attrib.get("value", ""))]
if is_fiction:
url = f"https://libgen.rs/fiction/{md5}"
else:
url = f"https://libgen.rs/book/index.php?md5={md5}"
info: dict = {"url": url}
etree = fecth_as_etree(url)
if is_fiction:
div: HtmlElement = etree.find('.//div[@class="record_side"]')
info["cover_url"] = urljoin(url, div.find('img').attrib["src"])
info["download_page_url"] = f"https://library.lol/fiction/{md5}"
info["hashes"] = dict(zip(
div.xpath('./table[@class="hashes"]/tr/th/text()'),
div.xpath('./table[@class="hashes"]/tr/td/text()'),
))
detail = info["detail"] = {}
table = div.getnext()
detail.update(zip(
map(
extract_field_text,
table.xpath("tr/td[position() mod 2 = 1]")
),
map(
extract_field_value,
table.xpath("tr/td[position() mod 2 = 0]")
),
))
else:
table = etree.find(".//table")
td = table[1][0]
info["cover_url"] = urljoin(url, td.find("./a/img").attrib["src"])
info["download_page_url"] = urljoin(url, td.find("./a").attrib["href"])
info["hashes"] = dict(zip(
td.xpath("./table/tr/th/text()"),
td.xpath("./table/tr/td/text()"),
))
detail = info["detail"] = {}
detail[extract_field_text(table[1][1])] = table[1][2].find(".//a").text
detail[extract_field_text(table[1][3][0])] = table[1][3][0].tail
detail.update(zip(
map(
extract_field_text,
table.xpath(
"tr[position()>2 and position()<18]/td[position() mod 2 = 1]")
),
map(
extract_field_value,
table.xpath(
"tr[position()>2 and position()<18]/td[position() mod 2 = 0]")
),
))
detail[extract_field_text(table[17][0])] = [
extract_el_a(el, extract_el_a_input_filename)
for el in table[17][1][0].xpath(".//td/a")
]
detail["introduction"] = "\n".join(table[18].itertext())
detail["toc"] = "\n".join(table[19].itertext())
return info
def get_downlinks(md5: str, is_fiction: bool = False) -> list[str]:
"获取下载链接列表"
type = "fiction" if is_fiction else "main"
url = f"https://library.lol/{type}/{md5}"
etree = fecth_as_etree(url)
return etree.xpath('//div[@id="download"]//*[self::h2 or self::li]/a[@href]/@href') # type: ignore
def get_downlink_from_libgenli(md5: str) -> str:
"从 https://libgen.li 获取下载链接"
url = f"https://libgen.li/ads.php?md5={md5}"
with urlopen(Request(url, headers={"User-Agent": ""})) as resp:
link = CREB_LIBGEN_GET_search(resp.read())[0] # type: ignore
return "https://libgen.li/" + link.decode()
if __name__ == "__main__":
from os import (
close as fclose, open as fopen, ctermid, getenv, get_terminal_size,
terminal_size, O_RDONLY,
)
from platform import system
from sys import stderr
from threading import RLock
from time import perf_counter
# Reference:
# - [How to get Linux console window width in Python](https://stackoverflow.com/questions/566746/how-to-get-linux-console-window-width-in-python)
# - [How do I find the width & height of a terminal window](https://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window)
IS_WIN = system() == "Windows"
def environ_GWINSZ() -> terminal_size:
# COLUMNS, LINES are the working values
return terminal_size(int(getenv(var, 0)) for var in ("COLUMNS", "LINES"))
def os_GWINSZ() -> terminal_size:
# Reference:
# - [os.get_terminal_size](https://docs.python.org/3/library/os.html#os.get_terminal_size)
# - [shutil.get_terminal_size](https://docs.python.org/3/library/shutil.html#shutil.get_terminal_size)
try:
return get_terminal_size()
except (AttributeError, ValueError, OSError):
# fd is nonexists, closed, detached, or not a terminal, or
# os.get_terminal_size() is unsupported
# Tips: If fd is nonexists, closed, detached, or not a terminal,
# then it may raise the following exception
# OSError: [Errno 25] Inappropriate ioctl for device
return terminal_size((0, 0))
def ioctl_GWINSZ(fd: int = stderr.fileno()) -> terminal_size:
try:
from fcntl import ioctl
from struct import unpack
from termios import TIOCGWINSZ
rows, columns, hp, wp = unpack('hhhh', ioctl(fd, TIOCGWINSZ, b'\0'*8))
return terminal_size((columns, rows))
except (ImportError, AttributeError, ValueError, OSError):
# fd is nonexists, closed, detached, or not a terminal, or
# related modules are unsupported
# Tips: If fd is nonexists, closed, detached, or not a terminal,
# then it may raise the following exception
# OSError: [Errno 25] Inappropriate ioctl for device
return terminal_size((0, 0))
def ioctl_GWINSZ_auto() -> terminal_size:
for size in map(ioctl_GWINSZ, range(3)):
if size != (0, 0):
return size
try:
fd = fopen(ctermid(), O_RDONLY)
try:
return ioctl_GWINSZ(fd)
finally:
fclose(fd)
except:
return terminal_size((0, 0))
def stty_GWINSZ() -> terminal_size:
import subprocess
try:
rows, columns = subprocess.check_output(['stty', 'size']).split()
return terminal_size((int(columns), int(rows)))
except:
# If it is working on a script that expects redirected input on stdin,
# and stty would complain that "stdin isn't a terminal" in that case.
try:
with open('/dev/tty') as tty:
rows, columns = subprocess.check_output(
['stty', 'size'], stdin=tty).split()
return terminal_size((int(columns), int(rows)))
except:
# maybe stty is unsupported
return terminal_size((0, 0))
def tput_GWINSZ() -> terminal_size:
try:
import subprocess
rows = int(subprocess.check_output(['tput', 'lines']))
columns = int(subprocess.check_output(['tput', 'cols']))
return terminal_size((columns, rows))
except:
# maybe tput is unsupported
return terminal_size((0, 0))
def curses_GWINSZ() -> terminal_size:
try:
import curses
rows, columns = curses.initscr().getmaxyx()
return terminal_size((columns, rows))
except:
return terminal_size((0, 0))
def windows_GWINSZ() -> terminal_size:
if not IS_WIN:
return terminal_size((0, 0))
try:
from ctypes import windll, create_string_buffer # type: ignore
# stdin handle is -10
# stdout handle is -11
# stderr handle is -12
h = windll.kernel32.GetStdHandle(-12)
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
except:
return terminal_size((0, 0))
if res:
import struct
(bufx, bufy, curx, cury, wattr,
left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
sizex = right - left + 1
sizey = bottom - top + 1
return terminal_size((sizey, sizex))
else:
return terminal_size((0, 0))
def get_columns_size():
for func in (os_GWINSZ, environ_GWINSZ, ioctl_GWINSZ):
columns = func().columns # type: ignore
if columns > 0:
return columns
# Reference:
# - [tqdm](https://pypi.org/project/tqdm/)
# - [rich](https://pypi.org/project/rich/)
# - [blessings](https://pypi.org/project/blessings/)
# - [colorama](https://pypi.org/project/colorama/)
# - [colored](https://pypi.org/project/colored/)
class ProgressInfo:
def __init__(self):
self._total: int = 0
self._success: int = 0
self._failed: int = 0
self._str: str = ''
self._size: int = 0
self._current_ts = self._start_ts = perf_counter()
self._lock = RLock()
@property
def col_total(self) -> str:
return f'🤔 Total: {self._total}'
@property
def col_success(self) -> str:
return f'😂 Success: {self._success}'
@property
def col_failed(self) -> str:
return f'😭 Failed: {self._failed}'
@property
def col_speed(self) -> str:
elapsed = self._current_ts - self._start_ts
if elapsed == 0:
speed = 'nan'
else:
speed = format(self._total / elapsed, '.6f')
return f'🚀 Speed: {speed} i/s'
@property
def col_elapsed(self) -> str:
return f'🕙 Elapsed: {self._current_ts - self._start_ts:.6f} s'
@property
def col_success_rate(self) -> str:
if self._total:
rate = self._success * 100 / self._total
else:
rate = 100
return f'💯 Succeess Rate: {rate:.2f}%'
def tostring(self) -> tuple[int, str]:
columns: int = get_columns_size()
if not columns:
return 0, ""
cols: list = []
col_expand_size: int = 0
while True:
# ' ' takes up 1 columns
columns -= 1
if columns <= 0:
break
col = self.col_failed
# '😭' takes up 2 columns, 1 extra
columns -= len(col) + 1
if columns < 0:
break
cols.append(col)
col_expand_size += 1
# ' | ' takes up 3 columns
columns -= 3
if columns <= 0:
break
col = self.col_success
# '😂' takes up 2 columns, 1 extra
columns -= len(col) + 1
if columns < 0:
break
cols.insert(0, col)
col_expand_size += 1
# ' | ' takes up 3 columns
columns -= 3
if columns <= 0:
break
col = self.col_speed
# '🚀' takes up 2 columns, 1 extra
columns -= len(col) + 1
if columns < 0:
break
cols.append(col)
col_expand_size += 1
# ' | ' takes up 3 columns
columns -= 3
if columns <= 0:
break
col = self.col_success_rate
# '💯' takes up 2 columns, 1 extra
columns -= len(col) + 1
if columns < 0:
break
cols.insert(2, col)
col_expand_size += 1
# ' | ' takes up 3 columns
columns -= 3
if columns <= 0:
break
col = self.col_total
# '🤔' takes up 2 columns, 1 extra
columns -= len(col) + 1
if columns < 0:
break
cols.insert(0, col)
col_expand_size += 1
# ' | ' takes up 3 columns
columns -= 3
if columns <= 0:
break
col = self.col_elapsed
# '🕙' takes up 2 columns, 1 extra
columns -= len(col) + 1
if columns < 0:
break
cols.append(col)
col_expand_size += 1
break
s = f' %s\r' % ' | '.join(cols)
# '\r' takes up 0 columns, -1 extra
return len(s) - 1 + col_expand_size, s
def update(self):
with self._lock:
self.clear()
self._current_ts = perf_counter()
self._size, self._str = self.tostring()
self.output()
def inc_success(self):
with self._lock:
self._success += 1
self._total += 1
self.update()
def inc_failed(self):
with self._lock:
self._failed += 1
self._total += 1
self.update()
def clear(self):
if self._size:
with self._lock:
stderr.write(' '*self._size)
#stderr.write('\b'*self._size)
stderr.write('\r')
stderr.flush()
def output(self):
with self._lock:
stderr.write(self._str)
stderr.flush()
def pure_print(self, *args, **kwds) -> None:
kwds["flush"] = True
with self._lock:
self.clear()
print(*args, **kwds)
self._size = 0
def print(self, *args, **kwds) -> None:
with self._lock:
self.pure_print(*args, **kwds)
self.output()
from itertools import islice
from json import dumps
begin = args.begin
end = args.end
max_workers = args.max_workers
select = args.select
if select:
select = eval("lambda item:" + select)
p = ProgressInfo()
detail_level = args.detail_level
if detail_level == 0:
def make_output(item):
return item["md5"]
p.inc_success()
elif detail_level == 1:
def make_output(item):
return dumps(item, ensure_ascii=False)
elif detail_level == 2:
def make_output(item):
item["detail"] = info(md5=item["md5"], is_fiction=item["type"]=="fiction")
return dumps(item, ensure_ascii=False)
elif detail_level == 3:
def make_output(item):
item["download_links"] = get_downlinks(md5=item["md5"], is_fiction=item["type"]=="fiction")
return dumps(item, ensure_ascii=False)
else:
def make_output(item):
item["detail"] = info(md5=item["md5"], is_fiction=item["type"]=="fiction")
item["download_links"] = get_downlinks(md5=item["md5"], is_fiction=item["type"]=="fiction")
return dumps(item, ensure_ascii=False)
def output(item):
try:
p.pure_print(make_output(item))
p.inc_success()
except BaseException as exc:
p.pure_print("\x1b[38;5;1m\x1b[1m[FAILED]\x1b[0m", exc, file=stderr)
p.inc_failed()
try:
it = search(args.url)
if begin <= 1:
if end > 0:
it = islice(it, end)
elif begin <= end:
it = islice(it, begin-1, end)
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(None if max_workers <= 0 else max_workers)
try:
for item in filter(select, it):
e.submit(output, item)
with e:
pass
finally:
e.shutdown(wait=False, cancel_futures=True)
except BrokenPipeError:
stderr.close()
except KeyboardInterrupt:
pass
#!/usr/bin/env python3
# encoding: utf-8
"libgen.rs 种子获取"
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = ["libgen_torrents", "libgen_plus_torrents"]
__version__ = (0, 0, 1)
if __name__ == "__main__":
from argparse import ArgumentParser, RawTextHelpFormatter
parser = ArgumentParser(description="libgen.rs 种子获取", formatter_class=RawTextHelpFormatter)
parser.add_argument("-t", "--type", choices=(1, 2, 3), default=1, type=int, help="""类型
0. All
1. Non-fiction / Sci-tech
2. Fiction
3. Scientific articles
""")
parser.add_argument("-dl", "--download", default=0, type=int, help="下载文件线程数,0(默认值)不下载,小于 0 时自动确定线程数")
parser.add_argument("-d", "--detail", action="store_true", help="输出完整信息,json 格式")
parser.add_argument("-s", "--select", help="提供一个表达式(会注入一个变量 item,是一个 namedtuple),用于筛选条目")
args = parser.parse_args()
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from functools import update_wrapper
from posixpath import join as joinpath
from re import compile as re_compile
from typing import NamedTuple
from urllib.request import urlopen
findall_tr = re_compile("<tr[^>]*>.+?</tr>").findall
findall_td = re_compile("<td[^>]*>.+?</td>").findall
search_href = re_compile('(?<=href=")[^"]+').search
search_text = re_compile("(?<=>) *[^> ][^>]*(?=<)").search
class LibgenTorrentInfo(NamedTuple):
url: str
name: str
last_modified: str
size: str
def __str__(self, /) -> str:
return self.url
geturl = __str__
def ensure_enum(cls, val):
if isinstance(val, cls):
return val
if isinstance(val, str):
try:
return cls[val]
except KeyError:
pass
return cls(val)
class LibgenType(Enum):
all = 0
libgen = nonfiction = 1 # Non-fiction / Sci-tech
fiction = 2 # Fiction
scimag = 3 # Scientific articles
class LibgenPlusType(Enum):
all = 0
libgen = nonfiction = 1 # libgen
fiction = 2 # fiction
scimag = 3 # scimag
comics = 4 # comics
internet_archive = 5 # internet_archive
isbndb = 6 # isbndb
magazines = 7 # magazines
pilimi = 8 # pilimi-zlib-all
worldcat = 9 # worldcat
def _libegn_torrent_iter(url):
with urlopen(url) as resp:
html = resp.read().decode()
for tr in findall_tr(html)[3:-1]:
td_name, td_mtime, td_size = findall_td(tr)[-4:-1]
name = search_href(td_name)[0]
last_modified = search_text(td_mtime)
if last_modified:
last_modified = last_modified[0].strip()
size = search_text(td_size)
if size:
size = size[0].strip()
link = joinpath(url, name)
if size == "-":
yield from _libegn_torrent_iter(link)
else:
yield LibgenTorrentInfo(link, name, last_modified, size)
def _gen_startup(func, /):
def wrapper(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return update_wrapper(wrapper, func)
@_gen_startup
def _download_torrent_gen(max_workers=0):
stopped = False
def download(url, path):
while not stopped:
try:
with urlopen(url, timeout=5) as fsrc:
fsrc_read = fsrc.read
with open(path, "wb") as fdst:
fdst_write = fdst.write
while not stopped:
buf = fsrc_read(1 << 16)
if not buf:
break
fdst_write(buf)
break
except Exception as e:
if stopped:
return
print(f"retrying {url!r} <= {type(e).__qualname__}: {e}")
print("downloaded:", path)
try:
if max_workers == 1:
while True:
torrent = yield
download(torrent.url, torrent.name)
else:
executor = ThreadPoolExecutor(None if max_workers <= 0 else max_workers)
try:
while True:
torrent = yield
executor.submit(download, torrent.url, torrent.name)
except KeyboardInterrupt:
pass
except GeneratorExit:
executor.shutdown(wait=True)
finally:
executor.shutdown(wait=False, cancel_futures=True)
finally:
stopped = True
def libgen_torrents(type=1, download=False, predicate=None, max_workers=0):
"罗列和下载 libgen.rs 上面的种子"
type = ensure_enum(LibgenType, type)
if type is LibgenType.all:
yield from libgen_torrents(
1, download=download, predicate=predicate, max_workers=max_workers)
yield from libgen_torrents(
2, download=download, predicate=predicate, max_workers=max_workers)
yield from libgen_torrents(
3, download=download, predicate=predicate, max_workers=max_workers)
else:
if type is LibgenType.libgen:
url = "http://libgen.rs/repository_torrent/"
elif type is LibgenType.fiction:
url = "https://libgen.rs/fiction/repository_torrent/"
elif type is LibgenType.scimag:
url = "http://libgen.rs/scimag/repository_torrent/"
torrents = _libegn_torrent_iter(url)
if predicate:
torrents = filter(predicate, torrents)
if download:
it = _download_torrent_gen(max_workers)
try:
for torrent in torrents:
yield torrent
it.send(torrent)
except KeyboardInterrupt:
it.throw(KeyboardInterrupt)
raise
finally:
it.close()
else:
yield from torrents
def libgen_plus_torrents(type=1, download=False, predicate=None, max_workers=0):
"罗列和下载 libgen.li 上面的种子"
type = ensure_enum(LibgenPlusType, type)
if type is LibgenPlusType.all:
url = "http://libgen.li/torrents/"
elif type is LibgenPlusType.libgen:
url = "http://libgen.li/torrents/libgen/"
elif type is LibgenPlusType.fiction:
url = "http://libgen.li/torrents/fiction/"
elif type is LibgenPlusType.scimag:
url = "http://libgen.li/torrents/scimag/"
elif type is LibgenPlusType.comics:
url = "http://libgen.li/torrents/comics/"
elif type is LibgenPlusType.internet_archive:
url = "http://libgen.li/torrents/internet_archive/"
elif type is LibgenPlusType.isbndb:
url = "http://libgen.li/torrents/isbndb/"
elif type is LibgenPlusType.magazines:
url = "http://libgen.li/torrents/magazines/"
elif type is LibgenPlusType.pilimi:
url = "http://libgen.li/torrents/pilimi-zlib-all/"
elif type is LibgenPlusType.worldcat:
url = "http://libgen.li/torrents/worldcat/"
torrents = _libegn_torrent_iter(url)
if predicate:
torrents = filter(predicate, torrents)
if download:
it = _download_torrent_gen(max_workers)
try:
for torrent in torrents:
yield torrent
it.send(torrent)
except KeyboardInterrupt:
it.throw(KeyboardInterrupt)
raise
finally:
it.close()
else:
yield from torrents
if __name__ == "__main__":
from json import dumps
max_workers = args.download
download = max_workers != 0
show_detail = args.detail
select = args.select
if select:
select = eval("lambda item:" + select)
try:
for item in libgen_torrents(
args.type,
download=download,
max_workers=max_workers,
predicate=select,
):
if show_detail:
print(dumps(item._asdict(), ensure_ascii=False), flush=True)
else:
print(item, flush=True)
except BrokenPipeError:
stderr.close()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment