Skip to content

Instantly share code, notes, and snippets.

@botlabsDev
Last active July 27, 2022 13:23
Show Gist options
  • Save botlabsDev/60b9c86c13566cc69aed0c6041d71322 to your computer and use it in GitHub Desktop.
Save botlabsDev/60b9c86c13566cc69aed0c6041d71322 to your computer and use it in GitHub Desktop.
Historical IP to ASN Mapper - find the ASN of an IP to a given timestamp. Could be faster.
# Twitter: @botlabsDev
# $ python3 ip_to_asn_history.py 8.8.8.8 --day 2019-01-01
# alternative online tool: https://stat.ripe.net/widget/routing-history
import argparse
import datetime
import ipaddress
import tarfile
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
import dateparser as dateparser
import requests
CACHE_FOLDER = Path(".cache")
@dataclass
class AsnStruct:
asn: int
description: str
country: str
@dataclass
class NetStruct:
asn: int
subnet_str: str
@property
def subnet(self):
return ipaddress.ip_network(self.subnet_str)
@property
def subnet_size(self):
return ipaddress.ip_network(self.subnet).num_addresses
@property
def subnet_start(self):
return ipaddress.ip_network(self.subnet).network_address
@property
def subnet_end(self):
return ipaddress.ip_network(self.subnet).broadcast_address
def contains_subnet(self, sub) -> bool:
return self.subnet_start <= sub.subnet_start < sub.subnet_end <= self.subnet_end
def contains_ip(self, ip) -> bool:
return self.subnet_start <= ip <= self.subnet_end
def ip_to_asn_history(day, ip):
data_raw_table, data_used_autnums = download_archive_for_day(day, CACHE_FOLDER / str(day))
networks = [_convert_to_net_struct(line) for line in data_raw_table.read_text().splitlines()]
tmpp_autnums = [_convert_to_asn_struct(line) for line in data_used_autnums.read_text().splitlines()]
autnums = {autnum.asn: autnum for autnum in tmpp_autnums}
networks_found = []
for network in networks:
if network.contains_ip(ip):
networks_found.append(network)
yield {"day": day,
"network_asn": network.asn,
"network.subnet_start": network.subnet_start,
"network.subnet_end": network.subnet_end,
"autnums.asn": autnums[network.asn].asn,
"autnums.description": autnums[network.asn].description,
"autnums.country": autnums[network.asn].country}
def parse_args():
def _dateparser_parse(value):
return dateparser.parse(value, settings={'TIMEZONE': 'UTC'})
parser = argparse.ArgumentParser()
parser.add_argument("--day", type=_dateparser_parse, default=str(datetime.date.today()))
parser.add_argument("ip", type=ipaddress.ip_address)
return parser.parse_args()
def download_archive_for_day(date: datetime.date, folder: Path):
url = f"http://thyme.apnic.net/ipv4/au/{date.year}/{date.month:02}/{date.day:02}/data.tar.bz2"
archive = folder / "data.tar.bz2"
data_raw_table = folder / "data-raw-table"
data_used_autnums = folder / "data-used-autnums"
done_file = folder / "done.txt"
if not done_file.exists():
folder.mkdir(exist_ok=True, parents=True)
print("download file")
results = requests.get(url, stream=True)
with archive.open("wb") as f:
f.writelines(results.iter_content(chunk_size=1024))
results.raise_for_status()
data_raw_table.write_text("\n".join(line for line in _read_file_from_archive(archive, "data-raw-table")))
data_used_autnums.write_text("\n".join(line for line in _read_file_from_archive(archive, "data-used-autnums")))
archive.unlink()
done_file.touch()
return data_raw_table, data_used_autnums
def _load_net_data(archive: Path):
for line in _read_file_from_archive(archive, "data-raw-table"):
yield _convert_to_net_struct(line)
def _load_used_asns_for_day(archive: Path):
for line in _read_file_from_archive(archive, "data-used-autnums"):
yield _convert_to_asn_struct(line)
def _read_file_from_archive(archive: Path, fileName) -> Iterator[str]:
with tarfile.open(str(archive), "r:bz2") as tar:
fd = tar.extractfile(fileName)
lines = (line.strip().lstrip() for line in fd.read().decode(encoding="ISO-8859-1").splitlines())
yield from (line for line in lines if line != "")
def _convert_to_net_struct(line: str):
net, asn = line.split("\t")
return NetStruct(subnet_str=net, asn=int(asn))
def _convert_to_asn_struct(line):
tmp = line.split()
asn = tmp[0]
name = " ".join(tmp[1:-1])
name = name[:-1] if name.endswith(",") else name
country = tmp[-1]
return AsnStruct(int(asn), name, country)
if __name__ == "__main__":
opts = parse_args()
for data in ip_to_asn_history(opts.day.date(), opts.ip):
# print(data)
print(" ".join([f"{value}" for key, value in data.items()]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment