Created
October 24, 2021 03:30
-
-
Save Pagliacii/a87f122b193b796d4ffd98d2a7e4c8d6 to your computer and use it in GitHub Desktop.
Query IP location based on QQWry.dat file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding:utf-8 -*- | |
import mmap | |
import struct | |
import socket | |
import pathlib | |
from typing import Tuple | |
# Ref: https://github.com/out0fmemory/qqwry.dat/blob/master/qqwry_lastest.dat | |
DEFAULT_DATA_FILE = pathlib.Path(__file__).parent / "qqwry.dat" | |
class QQWry: | |
def __init__(self, data_file=DEFAULT_DATA_FILE, charset="utf-8"): | |
self._ip_length = 4 | |
self._offset_length = 3 | |
self._index_length = 7 | |
self._flag_one = b"\x01" | |
self._flag_two = b"\x02" | |
self._terminator = b"\x00" | |
with open(data_file, "rb") as f: | |
self.data = mmap.mmap(f.fileno(), 0, access=mmap.MAP_SHARED) | |
self.index_base, self.index_end = struct.unpack("<LL", self.data[:8]) | |
self.count = (self.index_end - self.index_base) // self._index_length + 1 | |
self.charset = charset | |
def _ip_to_ulong(self, ip: str) -> int: | |
return struct.unpack(">L", socket.inet_aton(ip))[0] | |
def _ulong_to_ip(self, data: int) -> str: | |
return socket.inet_ntoa(struct.pack(">L", data)) | |
def _binary_search(self, target: int, begin: int, count: int) -> int: | |
mid = begin + count * 7 | |
start_ip, offset = self.unpack_index(mid) | |
end_ip: int = self.unpack_ip(offset) | |
if start_ip <= target <= end_ip: | |
return offset | |
if start_ip > target: | |
return self._binary_search(target, begin, count // 2) | |
return self._binary_search(target, mid, (self.index_end - mid) // 7) | |
def _decode_string(self, start: int) -> Tuple[str, int]: | |
if start == 0: | |
return "Unknown", start + 1 | |
end = self.data.find(self._terminator, start) | |
if end < 0: | |
raise Exception("fail to decode string") | |
string = self.data[start:end].decode(self.charset, errors="replace") | |
return string, end + 1 | |
def _get_flag(self, index: int) -> bytes: | |
return self.data[index:index + 1] | |
def get_index(self, ip: str) -> int: | |
index = self._binary_search( | |
self._ip_to_ulong(ip), | |
self.index_base, | |
self.count // 2, | |
) | |
return index | |
def unpack_index(self, start: int) -> Tuple[int, ...]: | |
end: int = start + self._index_length | |
return struct.unpack( | |
"<LL", | |
self.data[start:end] + self._terminator, | |
) | |
def unpack_ip(self, start: int) -> int: | |
end: int = start + self._ip_length | |
return struct.unpack("<L", self.data[start:end])[0] | |
def unpack_offset(self, start: int) -> int: | |
end: int = start + self._offset_length | |
return struct.unpack( | |
"<L", | |
self.data[start:end] + self._terminator, | |
)[0] | |
def get_record(self, index: int) -> Tuple[str, str]: | |
# Ref: https://blog.dnomd343.top/qqwry.dat-analyse | |
flag: bytes = self._get_flag(index) | |
if flag not in (self._flag_one, self._flag_two): | |
print("Mode 1") | |
first_part, index = self._decode_string(index) | |
second_part, _ = self._decode_string(index) | |
return first_part, second_part | |
index += 1 # drop the flag byte | |
if flag == self._flag_two: | |
offset = self.unpack_offset(index) | |
first_part, _ = self._decode_string(offset) | |
index += self._offset_length | |
if self._get_flag(index) in (self._flag_one, self._flag_two): | |
print("Mode 5") | |
offset = self.unpack_offset(index + 1) | |
second_part, _ = self._decode_string(offset) | |
else: | |
print("Mode 3 or 4") | |
second_part, _ = self._decode_string(index) | |
return first_part, second_part | |
offset = self.unpack_offset(index) | |
if self._get_flag(offset) == self._flag_two: | |
return self.get_record(offset) | |
print("Mode 2") | |
first_part, index = self._decode_string(offset) | |
second_part, _ = self._decode_string(index) | |
return first_part, second_part | |
def lookup_ip(self, ip: str) -> Tuple[str, str]: | |
index = self.get_index(ip) | |
# print(f"{index=}") | |
index += self._ip_length # drop the end ip | |
return self.get_record(index) | |
if __name__ == "__main__": | |
q = QQWry(charset="gbk") | |
print(q.lookup_ip("0.0.0.0")) | |
print() | |
print(q.lookup_ip("12.34.56.78")) | |
print() | |
print(q.lookup_ip("45.46.47.48")) | |
print() | |
print(q.lookup_ip("223.5.5.5")) | |
print() | |
print(q.lookup_ip("8.8.8.8")) | |
print() | |
print(q.lookup_ip("114.114.114.114")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Mode 1 | |
('IANA', '保留地址') | |
Mode 3 or 4 | |
('美国', 'ATT用户') | |
Mode 5 | |
('美国', ' CZ88.NET') | |
Mode 1 | |
('阿里巴巴anycast公共DNS', '\x02\x0f\x02') | |
Mode 3 or 4 | |
('美国', '加利福尼亚州圣克拉拉县山景市谷歌公司DNS服务器') | |
Mode 3 or 4 | |
('江苏省南京市', '南京信风网络科技有限公司GreatbitDNS服务器') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment