Skip to content

Instantly share code, notes, and snippets.

@KMConner
Created May 24, 2019 10:37
Show Gist options
  • Save KMConner/e057c1889570ee4b1071fd707ce346a7 to your computer and use it in GitHub Desktop.
Save KMConner/e057c1889570ee4b1071fd707ce346a7 to your computer and use it in GitHub Desktop.
Determine whether the specified IP address (both v4 and v6) is allocated or assigned to Japan
from urllib import request
from http.client import HTTPResponse
from typing import Iterable, Tuple, List
from ipaddress import IPv4Address, IPv6Address
import ipaddress
ALLOCATION_URL = 'https://ftp.apnic.net/stats/apnic/delegated-apnic-latest'
COUNTRY_CODE = 'JP'
class AddressRange:
def __init__(self, start: str, length: str):
address = ipaddress.ip_address(start)
range_length = int(length)
v6_address: IPv6Address
if address.version == 4:
v6_address = IPv6Address('::ffff:' + start)
elif address.version == 6:
v6_address = address
range_length = 2 ** (128 - int(length))
else:
raise Exception('Invalid Version of IP Address.')
packed = v6_address.packed
self.start = int.from_bytes(packed, byteorder='big', signed=False)
self.end = self.start + range_length
def contains(self, addr: int) -> bool:
return self.start <= addr < self.end
def __str__(self):
return 'Address range begin: {}, end {}'.format(self.start, self.end)
def devide_records(lines: Iterable[str], country_code: str) -> List[AddressRange]:
ranges: List[AddressRange] = []
l: str
for l in lines:
if l[0] == '#':
continue
parts = l.split('|')
if len(parts) != 7:
continue
if parts[1] != country_code:
continue
if parts[2] == 'ipv4' or parts[2] == 'ipv6':
ranges.append(AddressRange(parts[3], parts[4]))
return ranges
class AddressRangeCollection:
def __init__(self, ranges: List[AddressRange]):
v4_sorted = sorted(ranges, key=lambda v: v.start)
self.ranges: List[AddressRange] = []
i: AddressRange
for i in v4_sorted:
if len(self.ranges) == 0:
self.ranges.append(i)
continue
last_record: AddressRange = self.ranges[-1]
if last_record.contains(i.start):
last_record.end = max(last_record.end, i.end)
print('Duplicate', last_record, i)
else:
self.ranges.append(i)
def _contains(self, addr: int, search_begin: int, search_end: int) -> bool:
if search_begin >= search_end:
return False
mean = (search_begin+search_end) // 2
mean_range: AddressRange = self.ranges[mean]
if mean_range.contains(addr):
return True
if mean_range.start > addr:
return self._contains(addr, search_begin, mean)
else:
return self._contains(addr, mean + 1, search_end)
def contains(self, addr: str) -> bool:
address = ipaddress.ip_address(addr)
v6_address: IPv6Address
if address.version == 4:
v6_address = IPv6Address('::ffff:' + addr)
elif address.version == 6:
v6_address = address
packed = v6_address.packed
addr_int = int.from_bytes(packed, byteorder='big', signed=False)
return self._contains(addr_int, 0, len(self.ranges))
def main():
response: str
req: HTTPResponse
with request.urlopen(ALLOCATION_URL) as req:
response = req.read().decode('utf-8')
lines = response.splitlines()
records: List[AddressRange] = devide_records(lines, COUNTRY_CODE)
collection = AddressRangeCollection(records)
print(collection.contains('2001:240:1e00:f21::138')) # True
print(collection.contains('2001:df2:e500:ed1a::1')) # False
print(collection.contains('202.214.194.138')) # True
print(collection.contains('8.8.8.8')) # False
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment