Skip to content

Instantly share code, notes, and snippets.

@Pagliacii
Created October 24, 2021 03:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Pagliacii/a87f122b193b796d4ffd98d2a7e4c8d6 to your computer and use it in GitHub Desktop.
Save Pagliacii/a87f122b193b796d4ffd98d2a7e4c8d6 to your computer and use it in GitHub Desktop.
Query IP location based on QQWry.dat file
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import mmap
import struct
import socket
import pathlib
from typing import Tuple
# Ref: https://github.com/out0fmemory/qqwry.dat/blob/master/qqwry_lastest.dat
DEFAULT_DATA_FILE = pathlib.Path(__file__).parent / "qqwry.dat"
class QQWry:
def __init__(self, data_file=DEFAULT_DATA_FILE, charset="utf-8"):
self._ip_length = 4
self._offset_length = 3
self._index_length = 7
self._flag_one = b"\x01"
self._flag_two = b"\x02"
self._terminator = b"\x00"
with open(data_file, "rb") as f:
self.data = mmap.mmap(f.fileno(), 0, access=mmap.MAP_SHARED)
self.index_base, self.index_end = struct.unpack("<LL", self.data[:8])
self.count = (self.index_end - self.index_base) // self._index_length + 1
self.charset = charset
def _ip_to_ulong(self, ip: str) -> int:
return struct.unpack(">L", socket.inet_aton(ip))[0]
def _ulong_to_ip(self, data: int) -> str:
return socket.inet_ntoa(struct.pack(">L", data))
def _binary_search(self, target: int, begin: int, count: int) -> int:
mid = begin + count * 7
start_ip, offset = self.unpack_index(mid)
end_ip: int = self.unpack_ip(offset)
if start_ip <= target <= end_ip:
return offset
if start_ip > target:
return self._binary_search(target, begin, count // 2)
return self._binary_search(target, mid, (self.index_end - mid) // 7)
def _decode_string(self, start: int) -> Tuple[str, int]:
if start == 0:
return "Unknown", start + 1
end = self.data.find(self._terminator, start)
if end < 0:
raise Exception("fail to decode string")
string = self.data[start:end].decode(self.charset, errors="replace")
return string, end + 1
def _get_flag(self, index: int) -> bytes:
return self.data[index:index + 1]
def get_index(self, ip: str) -> int:
index = self._binary_search(
self._ip_to_ulong(ip),
self.index_base,
self.count // 2,
)
return index
def unpack_index(self, start: int) -> Tuple[int, ...]:
end: int = start + self._index_length
return struct.unpack(
"<LL",
self.data[start:end] + self._terminator,
)
def unpack_ip(self, start: int) -> int:
end: int = start + self._ip_length
return struct.unpack("<L", self.data[start:end])[0]
def unpack_offset(self, start: int) -> int:
end: int = start + self._offset_length
return struct.unpack(
"<L",
self.data[start:end] + self._terminator,
)[0]
def get_record(self, index: int) -> Tuple[str, str]:
# Ref: https://blog.dnomd343.top/qqwry.dat-analyse
flag: bytes = self._get_flag(index)
if flag not in (self._flag_one, self._flag_two):
print("Mode 1")
first_part, index = self._decode_string(index)
second_part, _ = self._decode_string(index)
return first_part, second_part
index += 1 # drop the flag byte
if flag == self._flag_two:
offset = self.unpack_offset(index)
first_part, _ = self._decode_string(offset)
index += self._offset_length
if self._get_flag(index) in (self._flag_one, self._flag_two):
print("Mode 5")
offset = self.unpack_offset(index + 1)
second_part, _ = self._decode_string(offset)
else:
print("Mode 3 or 4")
second_part, _ = self._decode_string(index)
return first_part, second_part
offset = self.unpack_offset(index)
if self._get_flag(offset) == self._flag_two:
return self.get_record(offset)
print("Mode 2")
first_part, index = self._decode_string(offset)
second_part, _ = self._decode_string(index)
return first_part, second_part
def lookup_ip(self, ip: str) -> Tuple[str, str]:
index = self.get_index(ip)
# print(f"{index=}")
index += self._ip_length # drop the end ip
return self.get_record(index)
if __name__ == "__main__":
q = QQWry(charset="gbk")
print(q.lookup_ip("0.0.0.0"))
print()
print(q.lookup_ip("12.34.56.78"))
print()
print(q.lookup_ip("45.46.47.48"))
print()
print(q.lookup_ip("223.5.5.5"))
print()
print(q.lookup_ip("8.8.8.8"))
print()
print(q.lookup_ip("114.114.114.114"))
Mode 1
('IANA', '保留地址')
Mode 3 or 4
('美国', 'ATT用户')
Mode 5
('美国', ' CZ88.NET')
Mode 1
('阿里巴巴anycast公共DNS', '\x02\x0f\x02')
Mode 3 or 4
('美国', '加利福尼亚州圣克拉拉县山景市谷歌公司DNS服务器')
Mode 3 or 4
('江苏省南京市', '南京信风网络科技有限公司GreatbitDNS服务器')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment