Last active
January 24, 2022 21:48
-
-
Save Nachtalb/f3e5f26661989ed8f764166ff368f544 to your computer and use it in GitHub Desktop.
Pihole addlists (and custom blacklist) to hosts file (also removes duplicates)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Convert pihole domain blacklists to hosts file format. | |
# | |
# Usage: | |
# Create folder ./export | |
# Create pihole export and extract it to ./export | |
# Backup can be created in "Settings" > "Teleporter" > "Backup" | |
# Install requirements with "pip install aiohttp[speedups] tqdm" | |
# Run script "python pihole_to_hosts.py" | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
import json | |
from pathlib import Path | |
import re | |
import shutil | |
from subprocess import Popen | |
from threading import Lock | |
from aiohttp import ClientSession | |
from tqdm.asyncio import tqdm_asyncio | |
from yarl import URL | |
print("Setup") | |
dir_path = Path(__file__).parent | |
whitelist_reg_file = dir_path / "export/whitelist.regex.json" | |
whitelist_exact_file = dir_path / "export/whitelist.exact.json" | |
blacklist_reg_file = dir_path / "export/blacklist.regex.json" | |
blacklist_exact_file = dir_path / "export/blacklist.exact.json" | |
adlist_file = dir_path / "export/adlist.json" | |
adlist = {} | |
whitelist_reg = set() | |
whitelist_exact = set() | |
blacklist_exact = {} | |
if whitelist_reg_file.is_file(): | |
whitelist_reg = {re.compile(data["domain"]) for data in json.loads(whitelist_reg_file.read_text())} | |
if whitelist_exact_file.is_file(): | |
whitelist_exact = {data["domain"] for data in json.loads(whitelist_exact_file.read_text())} | |
if blacklist_reg_file.is_file(): | |
print('hosts file do not support regex thus "./export/blacklist.regex.json" is ignored') | |
if blacklist_exact_file.is_file(): | |
blacklist_exact = dict( | |
sorted( | |
[(data["domain"], data["comment"]) for data in json.loads(blacklist_exact_file.read_text())], | |
key=lambda tpl: tpl[1], | |
) | |
) | |
if adlist_file.is_file(): | |
adlist = {data["address"]: data["comment"] for data in json.loads(adlist_file.read_text()) if data["enabled"]} | |
print(f"User whitelist regex: {len(whitelist_reg)}") | |
print(f"User whitelist exact: {len(whitelist_exact)}") | |
print(f"User blacklist exact: {len(blacklist_exact)}") | |
print(f"Adlists: {len(adlist)}") | |
async def get_file(session: ClientSession, url: str) -> tuple[str, str]: | |
try: | |
async with session.get(url, timeout=15) as response: | |
content = await response.text() | |
return content or "", url | |
except asyncio.exceptions.TimeoutError: | |
tqdm_asyncio.write(f"Timeout: Could not retrieve {url}") | |
return "", url | |
domain_reg = re.compile(r"^((?!$|#))(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\s*|)(.*)$") | |
def fix_worker(result: str, url: str, lock: Lock, file, full: tqdm_asyncio): | |
if result: | |
result = result.strip() + "\n" | |
host = URL(url).host | |
lines = list(map(str.strip, result.split("\n"))) | |
lines.insert(0, f"# FROM {url}\n") | |
for i, line in enumerate(tqdm_asyncio(lines, f"Fixing up {host}", leave=False)): | |
if match := domain_reg.match(line): | |
src = match.groups()[2] | |
if src in whitelist_exact or any([reg.match(src) for reg in whitelist_reg]): | |
continue | |
line = "0.0.0.0 " + src | |
lines[i] = line + "\n" | |
with lock: | |
file.writelines(lines) | |
full.update() | |
async def main(): | |
print("Downloading and writing hosts file...") | |
hosts_file = dir_path / "hosts" | |
full = tqdm_asyncio(desc="Total", total=len(adlist), leave=False) | |
lock = Lock() | |
with hosts_file.open("w", encoding="utf-8") as file_obj: | |
with ThreadPoolExecutor(5) as executor: | |
async with ClientSession() as session: | |
tasks = [get_file(session, url) for url in adlist] | |
for task in tqdm_asyncio.as_completed(tasks, desc="Loading URLS", leave=False): | |
result, url = await task | |
executor.submit(fix_worker, result, url, lock, file_obj, full) | |
if blacklist_exact: | |
lines = ["User Blacklist\n"] | |
for domain, reason in blacklist_exact.items(): | |
lines.append(f"0.0.0.0 {domain}\n") | |
lines.append(f"# {reason}\n") | |
file_obj.writelines(lines) # type: ignore | |
print() | |
print("Trying to remove duplicates") | |
if awk := shutil.which("awk"): | |
dup_out = hosts_file.with_name("hosts-no-dups") | |
had_error = False | |
try: | |
with dup_out.open("w") as dup_out_obj: | |
proc = Popen([awk, "!NF || !seen[$0]++", str(hosts_file)], stdout=dup_out_obj) | |
proc.wait() | |
except Exception as error: | |
print(error) | |
had_error = True | |
print("Error during removal of duplicates. Hosts file is save though!") | |
hosts_with_dups = hosts_file.with_name("hosts-with-dups") | |
try: | |
if dup_out.stat().st_size != 0: | |
shutil.move(hosts_file, hosts_with_dups) | |
shutil.move(dup_out, hosts_file) | |
except Exception as error: | |
print(error) | |
had_error = True | |
if hosts_with_dups.is_file(): | |
print(f"Moved {hosts_file} to {hosts_with_dups} but could not move {dup_out} to {hosts_file}") | |
else: | |
print(f"Could not move {hosts_file} to {hosts_with_dups} thus non duplicate file is saved to {dup_out}") | |
if not had_error: | |
hosts_size = hosts_file.stat().st_size / (2 ** 20) | |
hosts_with_dups_size = hosts_with_dups.stat().st_size / (2 ** 20) | |
print( | |
f"Duplicates removed file size reduced by {hosts_with_dups_size - hosts_size:.1f}MB.\n Output file:" | |
f" {hosts_size:.1f}MB {hosts_file}\n File with dups: {hosts_with_dups_size:.1f}MB {hosts_with_dups}" | |
) | |
else: | |
print( | |
'Could not find "awk" to remove duplicates. If you do have "awk" or "gawk" you can use "awk \'!NF ||' | |
" !seen[$0]++' hosts > hosts-no-dups\"" | |
) | |
print("Done and dusted") | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aiodns==3.0.0 | |
aiohttp==3.8.1 | |
aiosignal==1.2.0 | |
async-timeout==4.0.2 | |
attrs==21.4.0 | |
Brotli==1.0.9 | |
cchardet==2.1.7 | |
cffi==1.15.0 | |
charset-normalizer==2.0.10 | |
frozenlist==1.3.0 | |
idna==3.3 | |
multidict==6.0.2 | |
pycares==4.1.2 | |
pycparser==2.21 | |
tqdm==4.62.3 | |
yarl==1.7.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment