Last active
July 27, 2022 13:23
-
-
Save botlabsDev/60b9c86c13566cc69aed0c6041d71322 to your computer and use it in GitHub Desktop.
Historical IP to ASN Mapper - find the ASN of an IP to a given timestamp. Could be faster.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Twitter: @botlabsDev | |
# $ python3 ip_to_asn_history.py 8.8.8.8 --day 2019-01-01 | |
# alternative online tool: https://stat.ripe.net/widget/routing-history | |
import argparse | |
import datetime | |
import ipaddress | |
import tarfile | |
from dataclasses import dataclass | |
from pathlib import Path | |
from typing import Iterator | |
import dateparser as dateparser | |
import requests | |
CACHE_FOLDER = Path(".cache") | |
@dataclass | |
class AsnStruct: | |
asn: int | |
description: str | |
country: str | |
@dataclass | |
class NetStruct: | |
asn: int | |
subnet_str: str | |
@property | |
def subnet(self): | |
return ipaddress.ip_network(self.subnet_str) | |
@property | |
def subnet_size(self): | |
return ipaddress.ip_network(self.subnet).num_addresses | |
@property | |
def subnet_start(self): | |
return ipaddress.ip_network(self.subnet).network_address | |
@property | |
def subnet_end(self): | |
return ipaddress.ip_network(self.subnet).broadcast_address | |
def contains_subnet(self, sub) -> bool: | |
return self.subnet_start <= sub.subnet_start < sub.subnet_end <= self.subnet_end | |
def contains_ip(self, ip) -> bool: | |
return self.subnet_start <= ip <= self.subnet_end | |
def ip_to_asn_history(day, ip): | |
data_raw_table, data_used_autnums = download_archive_for_day(day, CACHE_FOLDER / str(day)) | |
networks = [_convert_to_net_struct(line) for line in data_raw_table.read_text().splitlines()] | |
tmpp_autnums = [_convert_to_asn_struct(line) for line in data_used_autnums.read_text().splitlines()] | |
autnums = {autnum.asn: autnum for autnum in tmpp_autnums} | |
networks_found = [] | |
for network in networks: | |
if network.contains_ip(ip): | |
networks_found.append(network) | |
yield {"day": day, | |
"network_asn": network.asn, | |
"network.subnet_start": network.subnet_start, | |
"network.subnet_end": network.subnet_end, | |
"autnums.asn": autnums[network.asn].asn, | |
"autnums.description": autnums[network.asn].description, | |
"autnums.country": autnums[network.asn].country} | |
def parse_args(): | |
def _dateparser_parse(value): | |
return dateparser.parse(value, settings={'TIMEZONE': 'UTC'}) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--day", type=_dateparser_parse, default=str(datetime.date.today())) | |
parser.add_argument("ip", type=ipaddress.ip_address) | |
return parser.parse_args() | |
def download_archive_for_day(date: datetime.date, folder: Path): | |
url = f"http://thyme.apnic.net/ipv4/au/{date.year}/{date.month:02}/{date.day:02}/data.tar.bz2" | |
archive = folder / "data.tar.bz2" | |
data_raw_table = folder / "data-raw-table" | |
data_used_autnums = folder / "data-used-autnums" | |
done_file = folder / "done.txt" | |
if not done_file.exists(): | |
folder.mkdir(exist_ok=True, parents=True) | |
print("download file") | |
results = requests.get(url, stream=True) | |
with archive.open("wb") as f: | |
f.writelines(results.iter_content(chunk_size=1024)) | |
results.raise_for_status() | |
data_raw_table.write_text("\n".join(line for line in _read_file_from_archive(archive, "data-raw-table"))) | |
data_used_autnums.write_text("\n".join(line for line in _read_file_from_archive(archive, "data-used-autnums"))) | |
archive.unlink() | |
done_file.touch() | |
return data_raw_table, data_used_autnums | |
def _load_net_data(archive: Path): | |
for line in _read_file_from_archive(archive, "data-raw-table"): | |
yield _convert_to_net_struct(line) | |
def _load_used_asns_for_day(archive: Path): | |
for line in _read_file_from_archive(archive, "data-used-autnums"): | |
yield _convert_to_asn_struct(line) | |
def _read_file_from_archive(archive: Path, fileName) -> Iterator[str]: | |
with tarfile.open(str(archive), "r:bz2") as tar: | |
fd = tar.extractfile(fileName) | |
lines = (line.strip().lstrip() for line in fd.read().decode(encoding="ISO-8859-1").splitlines()) | |
yield from (line for line in lines if line != "") | |
def _convert_to_net_struct(line: str): | |
net, asn = line.split("\t") | |
return NetStruct(subnet_str=net, asn=int(asn)) | |
def _convert_to_asn_struct(line): | |
tmp = line.split() | |
asn = tmp[0] | |
name = " ".join(tmp[1:-1]) | |
name = name[:-1] if name.endswith(",") else name | |
country = tmp[-1] | |
return AsnStruct(int(asn), name, country) | |
if __name__ == "__main__": | |
opts = parse_args() | |
for data in ip_to_asn_history(opts.day.date(), opts.ip): | |
# print(data) | |
print(" ".join([f"{value}" for key, value in data.items()])) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment