Skip to content

Instantly share code, notes, and snippets.

@renatoruis
Created March 27, 2025 12:06
Show Gist options
  • Select an option

  • Save renatoruis/0830853f7315cf3f0112b71d2034b0c9 to your computer and use it in GitHub Desktop.

Select an option

Save renatoruis/0830853f7315cf3f0112b71d2034b0c9 to your computer and use it in GitHub Desktop.
sync-dns-cloudflate-to-AD
import subprocess
import requests
import json
from typing import List, Dict
# Config Active Directory
IP_AD = "192.168.X.X" # IP do Active Directory
AD_USER = "domain\\user" # Usuário do Active Directory
AD_PASSWORD = "password" # Senha do Active Directory
DOMAIN = "domain.com.br" # Domínio do Active Directory
# Config Cloudflare
CF_API_KEY = "API_KEY" # API Key do Cloudflare
CF_ZONE_ID = "ZONE_ID" # ID da zona do Cloudflare
CF_API_URL = f"https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/dns_records"
# Slack webhook URL
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/TMABCDEFG/X1234567890/CBASTIMDEVOPS"
# Funções auxiliares
def normalize_hostname(hostname: str) -> str:
hostname = hostname.lower().rstrip('.')
if hostname.endswith(DOMAIN):
hostname = hostname.replace(f".{DOMAIN}", "")
return hostname if hostname else "@"
def is_internal_or_special(hostname: str) -> bool:
return hostname.startswith("_") or hostname.startswith("internal-")
def parse_ad_record_output(output: str, hostname: str) -> Dict[str, Dict[str, List[str]]]:
records = {}
for line in output.splitlines():
if "A: " in line:
ip = line.split("A: ")[1].split()[0].strip().rstrip('.')
records.setdefault(hostname, {}).setdefault("A", []).append(ip)
elif "CNAME: " in line:
cname = line.split("CNAME: ")[1].split()[0].strip().rstrip('.')
records.setdefault(hostname, {}).setdefault("CNAME", []).append(cname)
elif "NS: " in line:
ns = line.split("NS: ")[1].split()[0].strip().rstrip('.')
records.setdefault(hostname, {}).setdefault("NS", []).append(ns)
return records
def fetch_cf_records() -> Dict[str, Dict[str, List[str]]]:
headers = {
"Authorization": f"Bearer {CF_API_KEY}",
"Content-Type": "application/json"
}
params = {"per_page": 500}
r = requests.get(CF_API_URL, headers=headers, params=params)
cf_data = r.json()
records = {}
for rec in cf_data["result"]:
if rec["type"] not in ["A", "CNAME"]:
continue
name = normalize_hostname(rec["name"])
if is_internal_or_special(name):
continue
content = rec["content"].strip().rstrip('.')
records.setdefault(name, {}).setdefault(rec["type"], []).append(content)
return records
def fetch_ad_records(hostnames: List[str]) -> Dict[str, Dict[str, List[str]]]:
all_records = {}
for hostname in hostnames:
if is_internal_or_special(hostname):
continue
raw_hostname = hostname if hostname != "@" else "@"
cmd = [
"samba-tool", "dns", "query",
IP_AD, DOMAIN, raw_hostname, "ALL",
"-U", f"{AD_USER}%{AD_PASSWORD}"
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
parsed = parse_ad_record_output(result.stdout, hostname)
for h, types in parsed.items():
for t, values in types.items():
all_records.setdefault(h, {}).setdefault(t, []).extend(values)
return all_records
def diff_records(cf: Dict[str, Dict[str, List[str]]], ad: Dict[str, Dict[str, List[str]]]):
to_add = []
to_update = []
to_replace_type = []
for hostname in cf:
for cf_type in cf[hostname]:
cf_values = [v.strip().rstrip('.') for v in cf[hostname][cf_type]]
ad_types = ad.get(hostname, {})
if cf_type in ad_types:
ad_values = [v.strip().rstrip('.') for v in ad_types.get(cf_type, [])]
for val in cf_values:
if val not in ad_values:
to_update.append((hostname, cf_type, ad_values, val))
elif ad_types:
# Tipo diferente existe, precisa substituir
for ad_type in ad_types:
ad_values = [v.strip().rstrip('.') for v in ad_types[ad_type]]
to_replace_type.append((hostname, ad_type, ad_values, cf_type, cf_values))
else:
for val in cf_values:
to_add.append((hostname, cf_type, val))
return to_add, to_update, to_replace_type
def send_slack_summary(to_add, to_update, to_replace_type):
total_add = len(to_add)
total_update = len(to_update)
total_replace = len(to_replace_type)
total = total_add + total_update + total_replace
summary = (
"*DNS Sync Summary:*\n"
f"• ➕ Add: `{total_add}`\n"
f"• 🔁 Update: `{total_update}`\n"
f"• 🔄 Replace Type: `{total_replace}`\n"
f"• 📊 Total Changes: `{total}`"
)
payload = {
"text": summary
}
try:
requests.post(SLACK_WEBHOOK_URL, data=json.dumps(payload), headers={"Content-Type": "application/json"})
except Exception as e:
print("Erro ao enviar mensagem pro Slack:", e)
def main():
print("Coletando registros do Cloudflare...")
cf_records = fetch_cf_records()
print("Coletando registros do Active Directory...")
ad_records = fetch_ad_records(list(cf_records.keys()))
print("Comparando registros...")
to_add, to_update, to_replace_type = diff_records(cf_records, ad_records)
print("Executando comandos no Active Directory...")
for h, t, v in to_add:
target = "@" if h == "@" else h
subprocess.run([
"samba-tool", "dns", "add", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, t, v, "-d", "0"
])
for h, t, current_vals, correct_val in to_update:
target = "@" if h == "@" else h
for current_val in current_vals:
subprocess.run([
"samba-tool", "dns", "update", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, t, current_val, correct_val, "-d", "0"
])
for h, old_t, old_vals, new_t, new_vals in to_replace_type:
target = "@" if h == "@" else h
for old_val in old_vals:
subprocess.run([
"samba-tool", "dns", "delete", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, old_t, old_val, "-d", "0"
])
for new_val in new_vals:
subprocess.run([
"samba-tool", "dns", "add", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, new_t, new_val, "-d", "0"
])
send_slack_summary(to_add, to_update, to_replace_type)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment