Skip to content

Instantly share code, notes, and snippets.

@Nachtalb
Last active January 24, 2022 21:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nachtalb/f3e5f26661989ed8f764166ff368f544 to your computer and use it in GitHub Desktop.
Save Nachtalb/f3e5f26661989ed8f764166ff368f544 to your computer and use it in GitHub Desktop.
Pihole addlists (and custom blacklist) to hosts file (also removes duplicates)
#!/usr/bin/env python
#
# Convert pihole domain blacklists to hosts file format.
#
# Usage:
# Create folder ./export
# Create pihole export and extract it to ./export
# Backup can be created in "Settings" > "Teleporter" > "Backup"
# Install requirements with "pip install aiohttp[speedups] tqdm"
# Run script "python pihole_to_hosts.py"
import asyncio
from concurrent.futures import ThreadPoolExecutor
import json
from pathlib import Path
import re
import shutil
from subprocess import Popen
from threading import Lock
from aiohttp import ClientSession
from tqdm.asyncio import tqdm_asyncio
from yarl import URL
print("Setup")
dir_path = Path(__file__).parent
whitelist_reg_file = dir_path / "export/whitelist.regex.json"
whitelist_exact_file = dir_path / "export/whitelist.exact.json"
blacklist_reg_file = dir_path / "export/blacklist.regex.json"
blacklist_exact_file = dir_path / "export/blacklist.exact.json"
adlist_file = dir_path / "export/adlist.json"
adlist = {}
whitelist_reg = set()
whitelist_exact = set()
blacklist_exact = {}
if whitelist_reg_file.is_file():
whitelist_reg = {re.compile(data["domain"]) for data in json.loads(whitelist_reg_file.read_text())}
if whitelist_exact_file.is_file():
whitelist_exact = {data["domain"] for data in json.loads(whitelist_exact_file.read_text())}
if blacklist_reg_file.is_file():
print('hosts file do not support regex thus "./export/blacklist.regex.json" is ignored')
if blacklist_exact_file.is_file():
blacklist_exact = dict(
sorted(
[(data["domain"], data["comment"]) for data in json.loads(blacklist_exact_file.read_text())],
key=lambda tpl: tpl[1],
)
)
if adlist_file.is_file():
adlist = {data["address"]: data["comment"] for data in json.loads(adlist_file.read_text()) if data["enabled"]}
print(f"User whitelist regex: {len(whitelist_reg)}")
print(f"User whitelist exact: {len(whitelist_exact)}")
print(f"User blacklist exact: {len(blacklist_exact)}")
print(f"Adlists: {len(adlist)}")
async def get_file(session: ClientSession, url: str) -> tuple[str, str]:
try:
async with session.get(url, timeout=15) as response:
content = await response.text()
return content or "", url
except asyncio.exceptions.TimeoutError:
tqdm_asyncio.write(f"Timeout: Could not retrieve {url}")
return "", url
domain_reg = re.compile(r"^((?!$|#))(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\s*|)(.*)$")
def fix_worker(result: str, url: str, lock: Lock, file, full: tqdm_asyncio):
if result:
result = result.strip() + "\n"
host = URL(url).host
lines = list(map(str.strip, result.split("\n")))
lines.insert(0, f"# FROM {url}\n")
for i, line in enumerate(tqdm_asyncio(lines, f"Fixing up {host}", leave=False)):
if match := domain_reg.match(line):
src = match.groups()[2]
if src in whitelist_exact or any([reg.match(src) for reg in whitelist_reg]):
continue
line = "0.0.0.0 " + src
lines[i] = line + "\n"
with lock:
file.writelines(lines)
full.update()
async def main():
print("Downloading and writing hosts file...")
hosts_file = dir_path / "hosts"
full = tqdm_asyncio(desc="Total", total=len(adlist), leave=False)
lock = Lock()
with hosts_file.open("w", encoding="utf-8") as file_obj:
with ThreadPoolExecutor(5) as executor:
async with ClientSession() as session:
tasks = [get_file(session, url) for url in adlist]
for task in tqdm_asyncio.as_completed(tasks, desc="Loading URLS", leave=False):
result, url = await task
executor.submit(fix_worker, result, url, lock, file_obj, full)
if blacklist_exact:
lines = ["User Blacklist\n"]
for domain, reason in blacklist_exact.items():
lines.append(f"0.0.0.0 {domain}\n")
lines.append(f"# {reason}\n")
file_obj.writelines(lines) # type: ignore
print()
print("Trying to remove duplicates")
if awk := shutil.which("awk"):
dup_out = hosts_file.with_name("hosts-no-dups")
had_error = False
try:
with dup_out.open("w") as dup_out_obj:
proc = Popen([awk, "!NF || !seen[$0]++", str(hosts_file)], stdout=dup_out_obj)
proc.wait()
except Exception as error:
print(error)
had_error = True
print("Error during removal of duplicates. Hosts file is save though!")
hosts_with_dups = hosts_file.with_name("hosts-with-dups")
try:
if dup_out.stat().st_size != 0:
shutil.move(hosts_file, hosts_with_dups)
shutil.move(dup_out, hosts_file)
except Exception as error:
print(error)
had_error = True
if hosts_with_dups.is_file():
print(f"Moved {hosts_file} to {hosts_with_dups} but could not move {dup_out} to {hosts_file}")
else:
print(f"Could not move {hosts_file} to {hosts_with_dups} thus non duplicate file is saved to {dup_out}")
if not had_error:
hosts_size = hosts_file.stat().st_size / (2 ** 20)
hosts_with_dups_size = hosts_with_dups.stat().st_size / (2 ** 20)
print(
f"Duplicates removed file size reduced by {hosts_with_dups_size - hosts_size:.1f}MB.\n Output file:"
f" {hosts_size:.1f}MB {hosts_file}\n File with dups: {hosts_with_dups_size:.1f}MB {hosts_with_dups}"
)
else:
print(
'Could not find "awk" to remove duplicates. If you do have "awk" or "gawk" you can use "awk \'!NF ||'
" !seen[$0]++' hosts > hosts-no-dups\""
)
print("Done and dusted")
asyncio.run(main())
aiodns==3.0.0
aiohttp==3.8.1
aiosignal==1.2.0
async-timeout==4.0.2
attrs==21.4.0
Brotli==1.0.9
cchardet==2.1.7
cffi==1.15.0
charset-normalizer==2.0.10
frozenlist==1.3.0
idna==3.3
multidict==6.0.2
pycares==4.1.2
pycparser==2.21
tqdm==4.62.3
yarl==1.7.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment