Skip to content

Instantly share code, notes, and snippets.

@Syfaro
Created April 8, 2022 21:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Syfaro/1ef1314b2269f35820be8bcf0b3c2131 to your computer and use it in GitHub Desktop.
Save Syfaro/1ef1314b2269f35820be8bcf0b3c2131 to your computer and use it in GitHub Desktop.
Create DNS zones in Technitium DNS server to use with LanCache
# Running instructions:
#
# git clone https://github.com/uklans/cache-domains.git
# TECHNITIUM_HOST=http://technitium:5380 TECHNITIUM_USERNAME=admin TECHNITIUM_PASSWORD=password python3 create-zones.py ./cache-domains lancache-ip
import sys
import os
import json
from dataclasses import dataclass
from typing import Dict, List, Set
import traceback
from publicsuffixlist import PublicSuffixList
import requests
PSL = PublicSuffixList()
@dataclass
class CacheDomain:
name: str
description: str
domains: List[str]
def load_domain_metadata(folder: str) -> List[CacheDomain]:
metadata_path = os.path.join(folder, "cache_domains.json")
with open(metadata_path, "r") as f:
metadata = json.load(f)
cache_domains = []
for entry in metadata["cache_domains"]:
name = entry["name"]
description = entry["description"]
files = entry["domain_files"]
domains = []
for file in files:
domains.extend(get_domain_entries(os.path.join(folder, file)))
cache_domains.append(CacheDomain(name, description, domains))
return cache_domains
def get_domain_entries(path: str) -> List[str]:
domains = []
with open(path, "r") as f:
for line in f:
domains.append(line.strip())
return [domain for domain in domains if domain and not domain.startswith("#")]
def compute_zones(domains: List[str]) -> Dict[str, List[str]]:
zones = {}
for domain in domains:
private_suffix = PSL.privatesuffix(domain)
if private_suffix not in zones:
zones[private_suffix] = []
zones[private_suffix].append(domain)
return zones
class TechnitiumDns:
session: requests.Session
host: str
token: str
def __init__(self, host: str, username: str, password: str):
self.session = requests.Session()
self.host = host
resp = self.session.get(
f"{host}/api/login",
params={
"user": username,
"pass": password,
},
)
resp.raise_for_status()
self.token = resp.json()["token"]
def logout(self) -> None:
resp = self.session.get(
f"{self.host}/api/logout",
params={
"token": self.token,
},
)
resp.raise_for_status()
def zones(self) -> List[str]:
resp = self.session.get(
f"{self.host}/api/zone/list",
params={
"token": self.token,
},
)
resp.raise_for_status()
zones = resp.json()["response"]["zones"]
return [zone["name"] for zone in zones]
def create_zone(self, zone: str):
resp = self.session.get(
f"{self.host}/api/zone/create",
params={
"token": self.token,
"zone": zone,
"type": "forwarder",
"forwarder": "this-server",
"dnssecValidation": "false",
},
)
resp.raise_for_status()
def a_records(self, zone: str, domain: str) -> Set[str]:
resp = self.session.get(
f"{self.host}/api/zone/getRecords",
params={
"token": self.token,
"zone": zone,
"domain": domain,
},
)
resp.raise_for_status()
records = set()
for record in resp.json()["response"]["records"]:
if record["type"] == "A":
records.add(record["rData"]["ipAddress"])
return records
def set_a_record(self, zone: str, domain: str, target: str, comment: str):
resp = self.session.get(
f"{self.host}/api/zone/addRecord",
params={
"token": self.token,
"zone": zone,
"domain": domain,
"type": "A",
"overwrite": "true",
"ipAddress": target,
"comments": comment,
},
)
resp.raise_for_status()
def comment_for_domain(cache_domain: CacheDomain) -> str:
return f"lancache entry for {cache_domain.name}"
if __name__ == "__main__":
host, username, password = (
os.environ["TECHNITIUM_HOST"],
os.environ["TECHNITIUM_USERNAME"],
os.environ["TECHNITIUM_PASSWORD"],
)
dns = TechnitiumDns(host, username, password)
try:
created_zones = dns.zones()
domains = load_domain_metadata(sys.argv[1])
target = sys.argv[2]
for domain in domains:
zones = compute_zones(domain.domains)
print(f"Adding records for {domain.name} ({len(zones)} zones)")
for (zone, entries) in zones.items():
if zone not in created_zones:
print(f"Creating zone {zone}")
dns.create_zone(zone)
created_zones.append(zone)
for entry in entries:
records = dns.a_records(zone, entry)
if target not in records:
print(f"Creating record {entry}")
dns.set_a_record(
zone, entry, target, comment_for_domain(domain)
)
except Exception:
print(traceback.format_exc())
dns.logout()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment