Created
May 24, 2019 10:37
-
-
Save KMConner/e057c1889570ee4b1071fd707ce346a7 to your computer and use it in GitHub Desktop.
Determine whether the specified IP address (both v4 and v6) is allocated or assigned to Japan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urllib import request | |
from http.client import HTTPResponse | |
from typing import Iterable, Tuple, List | |
from ipaddress import IPv4Address, IPv6Address | |
import ipaddress | |
ALLOCATION_URL = 'https://ftp.apnic.net/stats/apnic/delegated-apnic-latest' | |
COUNTRY_CODE = 'JP' | |
class AddressRange: | |
def __init__(self, start: str, length: str): | |
address = ipaddress.ip_address(start) | |
range_length = int(length) | |
v6_address: IPv6Address | |
if address.version == 4: | |
v6_address = IPv6Address('::ffff:' + start) | |
elif address.version == 6: | |
v6_address = address | |
range_length = 2 ** (128 - int(length)) | |
else: | |
raise Exception('Invalid Version of IP Address.') | |
packed = v6_address.packed | |
self.start = int.from_bytes(packed, byteorder='big', signed=False) | |
self.end = self.start + range_length | |
def contains(self, addr: int) -> bool: | |
return self.start <= addr < self.end | |
def __str__(self): | |
return 'Address range begin: {}, end {}'.format(self.start, self.end) | |
def devide_records(lines: Iterable[str], country_code: str) -> List[AddressRange]: | |
ranges: List[AddressRange] = [] | |
l: str | |
for l in lines: | |
if l[0] == '#': | |
continue | |
parts = l.split('|') | |
if len(parts) != 7: | |
continue | |
if parts[1] != country_code: | |
continue | |
if parts[2] == 'ipv4' or parts[2] == 'ipv6': | |
ranges.append(AddressRange(parts[3], parts[4])) | |
return ranges | |
class AddressRangeCollection: | |
def __init__(self, ranges: List[AddressRange]): | |
v4_sorted = sorted(ranges, key=lambda v: v.start) | |
self.ranges: List[AddressRange] = [] | |
i: AddressRange | |
for i in v4_sorted: | |
if len(self.ranges) == 0: | |
self.ranges.append(i) | |
continue | |
last_record: AddressRange = self.ranges[-1] | |
if last_record.contains(i.start): | |
last_record.end = max(last_record.end, i.end) | |
print('Duplicate', last_record, i) | |
else: | |
self.ranges.append(i) | |
def _contains(self, addr: int, search_begin: int, search_end: int) -> bool: | |
if search_begin >= search_end: | |
return False | |
mean = (search_begin+search_end) // 2 | |
mean_range: AddressRange = self.ranges[mean] | |
if mean_range.contains(addr): | |
return True | |
if mean_range.start > addr: | |
return self._contains(addr, search_begin, mean) | |
else: | |
return self._contains(addr, mean + 1, search_end) | |
def contains(self, addr: str) -> bool: | |
address = ipaddress.ip_address(addr) | |
v6_address: IPv6Address | |
if address.version == 4: | |
v6_address = IPv6Address('::ffff:' + addr) | |
elif address.version == 6: | |
v6_address = address | |
packed = v6_address.packed | |
addr_int = int.from_bytes(packed, byteorder='big', signed=False) | |
return self._contains(addr_int, 0, len(self.ranges)) | |
def main(): | |
response: str | |
req: HTTPResponse | |
with request.urlopen(ALLOCATION_URL) as req: | |
response = req.read().decode('utf-8') | |
lines = response.splitlines() | |
records: List[AddressRange] = devide_records(lines, COUNTRY_CODE) | |
collection = AddressRangeCollection(records) | |
print(collection.contains('2001:240:1e00:f21::138')) # True | |
print(collection.contains('2001:df2:e500:ed1a::1')) # False | |
print(collection.contains('202.214.194.138')) # True | |
print(collection.contains('8.8.8.8')) # False | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment