Skip to content

Instantly share code, notes, and snippets.

@benmaddison
Last active August 3, 2018 10:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save benmaddison/f4ff3f203aaa4ae1020d72961c8abb88 to your computer and use it in GitHub Desktop.
Save benmaddison/f4ff3f203aaa4ae1020d72961c8abb88 to your computer and use it in GitHub Desktop.
afrinic irr mirroring check
#!/usr/bin/env python3
import asyncio
import re
import sys
import urllib.parse
RED = "\x1b[31m"
GREEN = "\x1b[32m"
YELLOW = "\x1b[33m"
CLEAR = "\x1b[0m"
class IRRException(Exception):
pass
async def databases():
irr_list_url = "http://www.irr.net/docs/list.html"
url = urllib.parse.urlsplit(irr_list_url)
reader, writer = await asyncio.open_connection(url.hostname, port=80)
req = f"GET {url.path} HTTP/1.0\r\nHost: {url.hostname}\r\n\r\n"
writer.write(req.encode())
await writer.drain()
databases = []
while True:
line = await reader.readline()
if line:
line = line.decode().rstrip()
match = re.match(r"^Whois Location:\s+([\w\.\-]+)$", line)
if match:
yield match.group(1)
else:
break
writer.close()
return
async def sources(hostname):
reader, writer = await connect(hostname)
try:
query = "!s-lc\n"
writer.write(query.encode())
await writer.drain()
resp = await reader.readline()
length = parse_resp_irrd(resp.decode())
if length:
data = await reader.readuntil(b"\nC\n")
for src in data.decode().rstrip("C\n").split(","):
yield src.lower()
except IRRException as e:
reader, writer = await connect(hostname)
query = "-q sources\n"
writer.write(query.encode())
await writer.drain()
while True:
line = await reader.readline()
if line:
line = line.decode()
match = re.match(r"^([\w\-]+):.+$", line)
if match:
yield match.group(1).lower()
else:
break
writer.close()
finally:
writer.close()
return
async def connect(hostname):
connect = asyncio.open_connection(hostname, port=43)
reader, writer = await asyncio.wait_for(connect, timeout=3)
return (reader, writer)
def parse_resp_irrd(resp):
resp_re = re.compile(r"^([ACDEF])(\d+)?$")
match = resp_re.match(resp)
if match:
code = match.group(1)
if code == "A":
return match.group(2)
elif code == "C":
return
elif code == "D":
raise IRRException("key not found")
elif code == "E":
raise IRRException("key not unique")
elif code == "F":
raise IRRException("other error")
raise IRRException("no status code found")
async def print_databases():
async for db in databases():
result = f"{YELLOW}[AFRINIC not found in source listing]{CLEAR}"
try:
async for source in sources(db):
match = re.match(r"^afrinic(-grs)?$", source)
if match:
result = f"{GREEN}[AFRINIC source listed]{CLEAR}"
break
except asyncio.TimeoutError:
result = f"{RED}[Connection timed out]{CLEAR}"
except Exception as e:
result = f"{RED}[{e}]{CLEAR}"
print(f"{db}: {result}")
return
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(print_databases())
return
if __name__ == "__main__":
rc = main()
sys.exit(rc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment