Created
March 27, 2025 12:06
-
-
Save renatoruis/0830853f7315cf3f0112b71d2034b0c9 to your computer and use it in GitHub Desktop.
sync-dns-cloudflate-to-AD
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import subprocess | |
| import requests | |
| import json | |
| from typing import List, Dict | |
| # Config Active Directory | |
| IP_AD = "192.168.X.X" # IP do Active Directory | |
| AD_USER = "domain\\user" # Usuário do Active Directory | |
| AD_PASSWORD = "password" # Senha do Active Directory | |
| DOMAIN = "domain.com.br" # Domínio do Active Directory | |
| # Config Cloudflare | |
| CF_API_KEY = "API_KEY" # API Key do Cloudflare | |
| CF_ZONE_ID = "ZONE_ID" # ID da zona do Cloudflare | |
| CF_API_URL = f"https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/dns_records" | |
| # Slack webhook URL | |
| SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/TMABCDEFG/X1234567890/CBASTIMDEVOPS" | |
| # Funções auxiliares | |
| def normalize_hostname(hostname: str) -> str: | |
| hostname = hostname.lower().rstrip('.') | |
| if hostname.endswith(DOMAIN): | |
| hostname = hostname.replace(f".{DOMAIN}", "") | |
| return hostname if hostname else "@" | |
| def is_internal_or_special(hostname: str) -> bool: | |
| return hostname.startswith("_") or hostname.startswith("internal-") | |
| def parse_ad_record_output(output: str, hostname: str) -> Dict[str, Dict[str, List[str]]]: | |
| records = {} | |
| for line in output.splitlines(): | |
| if "A: " in line: | |
| ip = line.split("A: ")[1].split()[0].strip().rstrip('.') | |
| records.setdefault(hostname, {}).setdefault("A", []).append(ip) | |
| elif "CNAME: " in line: | |
| cname = line.split("CNAME: ")[1].split()[0].strip().rstrip('.') | |
| records.setdefault(hostname, {}).setdefault("CNAME", []).append(cname) | |
| elif "NS: " in line: | |
| ns = line.split("NS: ")[1].split()[0].strip().rstrip('.') | |
| records.setdefault(hostname, {}).setdefault("NS", []).append(ns) | |
| return records | |
| def fetch_cf_records() -> Dict[str, Dict[str, List[str]]]: | |
| headers = { | |
| "Authorization": f"Bearer {CF_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| params = {"per_page": 500} | |
| r = requests.get(CF_API_URL, headers=headers, params=params) | |
| cf_data = r.json() | |
| records = {} | |
| for rec in cf_data["result"]: | |
| if rec["type"] not in ["A", "CNAME"]: | |
| continue | |
| name = normalize_hostname(rec["name"]) | |
| if is_internal_or_special(name): | |
| continue | |
| content = rec["content"].strip().rstrip('.') | |
| records.setdefault(name, {}).setdefault(rec["type"], []).append(content) | |
| return records | |
| def fetch_ad_records(hostnames: List[str]) -> Dict[str, Dict[str, List[str]]]: | |
| all_records = {} | |
| for hostname in hostnames: | |
| if is_internal_or_special(hostname): | |
| continue | |
| raw_hostname = hostname if hostname != "@" else "@" | |
| cmd = [ | |
| "samba-tool", "dns", "query", | |
| IP_AD, DOMAIN, raw_hostname, "ALL", | |
| "-U", f"{AD_USER}%{AD_PASSWORD}" | |
| ] | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| parsed = parse_ad_record_output(result.stdout, hostname) | |
| for h, types in parsed.items(): | |
| for t, values in types.items(): | |
| all_records.setdefault(h, {}).setdefault(t, []).extend(values) | |
| return all_records | |
| def diff_records(cf: Dict[str, Dict[str, List[str]]], ad: Dict[str, Dict[str, List[str]]]): | |
| to_add = [] | |
| to_update = [] | |
| to_replace_type = [] | |
| for hostname in cf: | |
| for cf_type in cf[hostname]: | |
| cf_values = [v.strip().rstrip('.') for v in cf[hostname][cf_type]] | |
| ad_types = ad.get(hostname, {}) | |
| if cf_type in ad_types: | |
| ad_values = [v.strip().rstrip('.') for v in ad_types.get(cf_type, [])] | |
| for val in cf_values: | |
| if val not in ad_values: | |
| to_update.append((hostname, cf_type, ad_values, val)) | |
| elif ad_types: | |
| # Tipo diferente existe, precisa substituir | |
| for ad_type in ad_types: | |
| ad_values = [v.strip().rstrip('.') for v in ad_types[ad_type]] | |
| to_replace_type.append((hostname, ad_type, ad_values, cf_type, cf_values)) | |
| else: | |
| for val in cf_values: | |
| to_add.append((hostname, cf_type, val)) | |
| return to_add, to_update, to_replace_type | |
| def send_slack_summary(to_add, to_update, to_replace_type): | |
| total_add = len(to_add) | |
| total_update = len(to_update) | |
| total_replace = len(to_replace_type) | |
| total = total_add + total_update + total_replace | |
| summary = ( | |
| "*DNS Sync Summary:*\n" | |
| f"• ➕ Add: `{total_add}`\n" | |
| f"• 🔁 Update: `{total_update}`\n" | |
| f"• 🔄 Replace Type: `{total_replace}`\n" | |
| f"• 📊 Total Changes: `{total}`" | |
| ) | |
| payload = { | |
| "text": summary | |
| } | |
| try: | |
| requests.post(SLACK_WEBHOOK_URL, data=json.dumps(payload), headers={"Content-Type": "application/json"}) | |
| except Exception as e: | |
| print("Erro ao enviar mensagem pro Slack:", e) | |
| def main(): | |
| print("Coletando registros do Cloudflare...") | |
| cf_records = fetch_cf_records() | |
| print("Coletando registros do Active Directory...") | |
| ad_records = fetch_ad_records(list(cf_records.keys())) | |
| print("Comparando registros...") | |
| to_add, to_update, to_replace_type = diff_records(cf_records, ad_records) | |
| print("Executando comandos no Active Directory...") | |
| for h, t, v in to_add: | |
| target = "@" if h == "@" else h | |
| subprocess.run([ | |
| "samba-tool", "dns", "add", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, t, v, "-d", "0" | |
| ]) | |
| for h, t, current_vals, correct_val in to_update: | |
| target = "@" if h == "@" else h | |
| for current_val in current_vals: | |
| subprocess.run([ | |
| "samba-tool", "dns", "update", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, t, current_val, correct_val, "-d", "0" | |
| ]) | |
| for h, old_t, old_vals, new_t, new_vals in to_replace_type: | |
| target = "@" if h == "@" else h | |
| for old_val in old_vals: | |
| subprocess.run([ | |
| "samba-tool", "dns", "delete", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, old_t, old_val, "-d", "0" | |
| ]) | |
| for new_val in new_vals: | |
| subprocess.run([ | |
| "samba-tool", "dns", "add", "-U", f"{AD_USER}%{AD_PASSWORD}", IP_AD, DOMAIN, target, new_t, new_val, "-d", "0" | |
| ]) | |
| send_slack_summary(to_add, to_update, to_replace_type) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment