Last active
August 3, 2018 10:52
-
-
Save benmaddison/f4ff3f203aaa4ae1020d72961c8abb88 to your computer and use it in GitHub Desktop.
afrinic irr mirroring check
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import re | |
import sys | |
import urllib.parse | |
RED = "\x1b[31m" | |
GREEN = "\x1b[32m" | |
YELLOW = "\x1b[33m" | |
CLEAR = "\x1b[0m" | |
class IRRException(Exception): | |
pass | |
async def databases(): | |
irr_list_url = "http://www.irr.net/docs/list.html" | |
url = urllib.parse.urlsplit(irr_list_url) | |
reader, writer = await asyncio.open_connection(url.hostname, port=80) | |
req = f"GET {url.path} HTTP/1.0\r\nHost: {url.hostname}\r\n\r\n" | |
writer.write(req.encode()) | |
await writer.drain() | |
databases = [] | |
while True: | |
line = await reader.readline() | |
if line: | |
line = line.decode().rstrip() | |
match = re.match(r"^Whois Location:\s+([\w\.\-]+)$", line) | |
if match: | |
yield match.group(1) | |
else: | |
break | |
writer.close() | |
return | |
async def sources(hostname): | |
reader, writer = await connect(hostname) | |
try: | |
query = "!s-lc\n" | |
writer.write(query.encode()) | |
await writer.drain() | |
resp = await reader.readline() | |
length = parse_resp_irrd(resp.decode()) | |
if length: | |
data = await reader.readuntil(b"\nC\n") | |
for src in data.decode().rstrip("C\n").split(","): | |
yield src.lower() | |
except IRRException as e: | |
reader, writer = await connect(hostname) | |
query = "-q sources\n" | |
writer.write(query.encode()) | |
await writer.drain() | |
while True: | |
line = await reader.readline() | |
if line: | |
line = line.decode() | |
match = re.match(r"^([\w\-]+):.+$", line) | |
if match: | |
yield match.group(1).lower() | |
else: | |
break | |
writer.close() | |
finally: | |
writer.close() | |
return | |
async def connect(hostname): | |
connect = asyncio.open_connection(hostname, port=43) | |
reader, writer = await asyncio.wait_for(connect, timeout=3) | |
return (reader, writer) | |
def parse_resp_irrd(resp): | |
resp_re = re.compile(r"^([ACDEF])(\d+)?$") | |
match = resp_re.match(resp) | |
if match: | |
code = match.group(1) | |
if code == "A": | |
return match.group(2) | |
elif code == "C": | |
return | |
elif code == "D": | |
raise IRRException("key not found") | |
elif code == "E": | |
raise IRRException("key not unique") | |
elif code == "F": | |
raise IRRException("other error") | |
raise IRRException("no status code found") | |
async def print_databases(): | |
async for db in databases(): | |
result = f"{YELLOW}[AFRINIC not found in source listing]{CLEAR}" | |
try: | |
async for source in sources(db): | |
match = re.match(r"^afrinic(-grs)?$", source) | |
if match: | |
result = f"{GREEN}[AFRINIC source listed]{CLEAR}" | |
break | |
except asyncio.TimeoutError: | |
result = f"{RED}[Connection timed out]{CLEAR}" | |
except Exception as e: | |
result = f"{RED}[{e}]{CLEAR}" | |
print(f"{db}: {result}") | |
return | |
def main(): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(print_databases()) | |
return | |
if __name__ == "__main__": | |
rc = main() | |
sys.exit(rc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment