Last active
May 11, 2024 06:21
-
-
Save Potat0000/239a86dd152f12ab4a76092b9d735ae9 to your computer and use it in GitHub Desktop.
AIWEN Tech IPv4 Geo-Info API Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
############################################################################### | |
# AIWEN Tech IPv4 Geo-Info API Script # | |
# Only Support District-Level Query # | |
############################################################################### | |
# Requires Python 3.7+ due to the requirements of dnspython. # | |
# Tested on Debian 12. # | |
# # | |
# Use -h or --help option to get instructions. # | |
# Run the following command to install dependencies: # | |
# python3 -m pip install --upgrade dnspython requests # | |
############################################################################### | |
# The default DNS servers can be set in the DNS_LIST variable below. # | |
# The script will try to query CZ free API locally if CZ_DATABASE is set. # | |
# The API Key will be read from the following sources in order: # | |
# 1. --key option # | |
# 2. The KEY variables below # | |
# 3. Environment variable AIWEN_API_KEY # | |
# Note that the key will be picked from the next source only if it is empty. # | |
# The validity of the key won't be checked during pickup. # | |
############################################################################### | |
KEY = '' | |
DNS_LIST = ['223.5.5.5', '119.29.29.29', '8.8.8.8', '114.114.114.114'] | |
CZ_DATABASE = '' | |
from argparse import ArgumentParser, RawTextHelpFormatter | |
from collections import namedtuple | |
from ipaddress import IPv4Address, IPv6Address, ip_network | |
from json import dumps as json_dumps | |
from os import getenv | |
from re import compile as re_pattern | |
from struct import unpack | |
from sys import stdin | |
from threading import Thread | |
from time import perf_counter as current_time | |
from urllib.parse import urlparse | |
import dns.resolver | |
import requests | |
from dns.reversename import from_address as reverse_address | |
API_TIMEOUT = 5 | |
MIN_CZ_TIME = 3 | |
data_tuple = namedtuple( | |
'data_tuple', ['details', 'status_code', 'rdns', 'cz_free', 'cz_accurate'], defaults=(None, 0, None, None, None) | |
) | |
def get_cz_from_db(ip, db_path='qqwry.dat'): | |
def get_long3(): # 获取 3 字节的长整形数 | |
a, b = unpack('HB', db.read(3)) | |
return (b << 16) + a | |
def get_string(offset): # 从 offset 位置读取字符串 | |
if offset: | |
db.seek(offset) | |
temp = b'' | |
while (c := db.read(1))[0]: | |
temp += c | |
return temp.decode('gbk') | |
if ip is ...: # 获取版本号 | |
ip_num = 4294967040 # int(ipaddress.IPv4Address('255.255.255.0')) | |
else: | |
ip_num = int(IPv4Address(ip)) | |
if ip_num >= 4294967040: | |
ip_num = 4026531840 # int(ipaddress.IPv4Address('240.0.0.0')) | |
with open(db_path, 'rb') as db: | |
# 初始化 | |
first_index, last_index = unpack('II', db.read(8)) | |
index_count = int((last_index - first_index) / 7 + 1) | |
# 查找 IP 所在范围 | |
L, R = 0, index_count - 1 | |
while L < R - 1: | |
M = int((L + R) / 2) | |
db.seek(first_index + M * 7) | |
start_ip = unpack('IHB', db.read(7))[0] | |
if ip_num == start_ip: | |
L = M | |
break | |
if ip_num > start_ip: | |
L = M | |
else: | |
R = M | |
if ip is ...: | |
L = R # 获取版本号 | |
db.seek(first_index + L * 7) | |
start_ip, of1, of2 = unpack('IHB', db.read(7)) | |
end_ip_offset = of1 + (of2 << 16) | |
db.seek(end_ip_offset) | |
end_ip = unpack('I', db.read(4))[0] | |
# 无数据 | |
if not start_ip <= ip_num <= end_ip: | |
return None | |
# 读取 GEO 信息 | |
db.seek(end_ip_offset + 4) | |
byte = db.read(1)[0] | |
isp_offset = 0 | |
if byte == 0x01: # 重定向模式1 | |
geo_offset = get_long3() | |
db.seek(geo_offset) | |
if db.read(1)[0] == 0x02: | |
geo = get_string(get_long3()) | |
db.seek(geo_offset + 4) | |
else: | |
geo = get_string(geo_offset) | |
elif byte == 0x02: # 重定向模式2 | |
geo = get_string(get_long3()) | |
isp_offset = end_ip_offset + 8 | |
else: # 字符串模式 | |
geo = get_string(end_ip_offset + 4) | |
# 获取 ISP 信息 | |
if isp_offset: | |
db.seek(isp_offset) | |
if db.read(1)[0] in [0x01, 0x02]: | |
p = get_long3() | |
if p: | |
isp = get_string(p) | |
else: | |
isp = '' | |
else: | |
db.seek(-1, 1) | |
isp = get_string(isp_offset) | |
return geo.strip(), isp.strip() | |
def get_rdns(ip, dns_nameservers=None): | |
resolver = dns.resolver.Resolver() | |
resolver.timeout = 0.25 | |
resolver.lifetime = 1 | |
resolver.nameservers = dns_nameservers if dns_nameservers else DNS_LIST | |
domain = None | |
try: | |
domain = str(resolver.resolve(reverse_address(ip), 'PTR')[0]) | |
if ip == domain: | |
domain = None | |
except BaseException: | |
pass | |
return domain | |
def resolve(address, dns_nameservers=None): | |
if '//' in address: | |
address = urlparse(address).hostname | |
try: | |
IPv6Address(address) | |
return '不支持 IPv6 地址', None | |
except BaseException: | |
pass | |
try: | |
return [str(IPv4Address(address))], None | |
except BaseException: | |
try: | |
ip_network(address) | |
return '不支持 IP 段查询', None | |
except BaseException: | |
pass | |
if ':' in address or address.split('.')[-1].isdigit(): | |
return '无效的地址', None | |
error_msg = None | |
for dns_nameserver in dns_nameservers if dns_nameservers else DNS_LIST: | |
ips = [] | |
try: | |
resolver = dns.resolver.Resolver() | |
resolver.timeout = 0.25 | |
resolver.lifetime = 1 | |
resolver.nameservers = [dns_nameserver] | |
for i in resolver.resolve(address, 'A'): | |
try: | |
ips.append(str(i)) | |
except BaseException: | |
pass | |
except dns.resolver.NXDOMAIN: | |
if not ips and not error_msg: | |
error_msg = '域名不存在' | |
except dns.resolver.NoAnswer: | |
if not ips and not error_msg: | |
error_msg = '域名无 A 记录' | |
except BaseException: | |
pass | |
if ips: | |
return ips, resolver.nameservers | |
return error_msg if error_msg else '域名解析失败', None | |
def get_aiwen(ip, key): | |
api = 'https://api.ipplus360.com/ip/geo/v1/district/?key={key}&ip={ip}' | |
try: | |
raw = requests.get(api.format(key=key, ip=ip), timeout=API_TIMEOUT) | |
return raw.json(), raw.status_code | |
except requests.exceptions.Timeout: | |
return None, -1 | |
def get_cz_free(ip, db_path=CZ_DATABASE): | |
try: | |
geo, isp = get_cz_from_db(ip, db_path) | |
isp = isp if isp != 'CZ88.NET' else '' | |
return geo, isp | |
except BaseException: | |
api = 'https://cz88.net/api/cz88/ip/openIPInfo?ip={ip}' | |
try: | |
raw = requests.get(api.format(ip=ip), timeout=API_TIMEOUT) | |
if raw.status_code == 200 and raw.json()['success'] is True: | |
raw = raw.json()['data'] | |
geo = raw['geo'].strip() | |
isp = raw['isp'].strip() | |
if isp.endswith('CZ88.NET'): | |
isp = isp[:-8] | |
return geo, isp | |
except requests.exceptions.Timeout: | |
pass | |
except KeyError: | |
pass | |
return None | |
def get_cz_accurate(ip): | |
api = 'https://cz88.net/api/cz88/ip/accurate?ip={ip}' | |
try: | |
raw = requests.get(api.format(ip=ip), timeout=API_TIMEOUT) | |
if raw.status_code == 200 and raw.json()['success'] is True: | |
return raw.json().get('data', None) | |
except requests.exceptions.Timeout: | |
pass | |
return None | |
def get_args(): | |
parser = ArgumentParser( | |
description='A script that calls AIWEN Tech IPv4 Geo-Information API', | |
usage='%(prog)s [options] target [ip2/dns1] [ip3/dns2] ...', | |
add_help=False, | |
formatter_class=RawTextHelpFormatter, | |
) | |
parser.add_argument( | |
'target', | |
help='IP or Domain that wants to test\nUse "-" to read from stdin', | |
nargs='?', | |
default='-', | |
) | |
parser.add_argument( | |
'dns', | |
help='DNS to resolve domain\nOnly available if target is a domain', | |
nargs='*', | |
) | |
parser.add_argument( | |
'-a', | |
'--all', | |
help='Get IP info for all IP addresses resolved from the domain\nAutomatically enabled when using stdin input', | |
action='store_true', | |
) | |
parser.add_argument('--cz', help='Use CZ paid API (limited free trials per day)', action='store_true') | |
parser.add_argument('-c', '--cord', help='Get coordinates in text format result', action='store_true') | |
parser.add_argument('-j', '--json', help='Get JSON format result', action='store_true') | |
parser.add_argument('--key', help='API Key for ipplus360.com') | |
parser.add_argument('-v', '--version', help='Show version and exit', action='store_true') | |
parser.add_argument('-h', '--help', help='Show this help message and exit', action='help') | |
args = parser.parse_args() | |
if not args.version and (args.target == '-' and stdin.isatty()): | |
parser.error('required argument "target" is missing') | |
if not args.key: | |
if KEY: | |
args.key = KEY | |
else: | |
args.key = getenv('AIWEN_API_KEY', '') | |
if args.dns: | |
dns = [] | |
for i in args.dns: | |
try: | |
dns.append(str(IPv4Address(i))) | |
except BaseException: | |
pass | |
args.dns = dns | |
return args | |
def parse_target(args): | |
stdin_input = None | |
if args.target == '-': | |
nameservers = None | |
stdin_input = stdin.buffer.read() | |
try: | |
stdin_input = stdin_input.decode('utf-8') | |
except UnicodeDecodeError: | |
stdin_input = stdin_input.decode(encoding=stdin.encoding) | |
ipv4_pattern = re_pattern(r'(?:(?<=[^\d^.])|^)(?:(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)\.?\b){4}(?:(?=[^\d^.])|$)') | |
ips = list(ipv4_pattern.findall(stdin_input)) | |
if not ips: | |
raise RuntimeError('无法从输入中提取出 IP 地址') | |
else: | |
ips, nameservers = resolve(args.target, args.dns) | |
if type(ips) is str: | |
raise RuntimeError(ips) | |
elif not nameservers: | |
ips.extend(str(IPv4Address(i)) for i in args.dns) | |
ips = list(dict.fromkeys(ips)) # 去重 | |
return ips, nameservers, stdin_input | |
class ThreadWithReturnValue(Thread): | |
def __init__(self, *args, **kwargs): | |
Thread.__init__(self, *args, **kwargs) | |
self.rtn_value = None | |
def run(self): | |
if self._target is not None: | |
try: | |
self.rtn_value = self._target(*self._args, **self._kwargs) | |
except BaseException: | |
pass | |
def join(self, *args): | |
Thread.join(self, *args) | |
return self.rtn_value | |
def query(ip, key, enable_cz=False, dns_nameservers=None, db_path=CZ_DATABASE): | |
if IPv4Address(ip).is_private: | |
return data_tuple({'ip': ip, 'msg': '保留IP'}, -3, get_rdns(ip, dns_nameservers)) | |
aiwen_thread = ThreadWithReturnValue(target=get_aiwen, args=(ip, key), daemon=True) | |
rdns_thread = ThreadWithReturnValue(target=get_rdns, args=(ip, dns_nameservers), daemon=True) | |
cz_free_thread = ThreadWithReturnValue(target=get_cz_free, args=(ip, db_path), daemon=True) | |
cz_accurate_thread = ThreadWithReturnValue(target=get_cz_accurate, args=(ip,), daemon=True) | |
time_start = current_time() | |
aiwen_thread.start() | |
rdns_thread.start() | |
cz_free_thread.start() | |
try: | |
raw, status_code = aiwen_thread.join() | |
except BaseException: | |
raw, status_code = {'ip': ip, 'msg': '请求错误'}, -2 | |
if status_code == -1: | |
raw, status_code = {'ip': ip, 'msg': '请求超时'}, -1 | |
if status_code == 200 and raw['code'] == 'NoData': | |
status_code = 204 | |
if status_code != 200: | |
raw, status_code = {'ip': ip, **raw}, status_code | |
if enable_cz and status_code == 200: | |
cz_accurate_thread.start() | |
rdns = rdns_thread.join() | |
if cz_free_thread.is_alive() and (delta := current_time() - time_start) < MIN_CZ_TIME: | |
cz_free_thread.join(MIN_CZ_TIME - delta) | |
cz_free = cz_free_thread.rtn_value | |
if cz_accurate_thread.is_alive() and (delta := current_time() - time_start) < MIN_CZ_TIME: | |
cz_accurate_thread.join(MIN_CZ_TIME - delta) | |
cz_accurate = cz_accurate_thread.rtn_value | |
return data_tuple(raw, status_code, rdns, cz_free, cz_accurate) | |
def parse_rawdata(data): | |
if data.status_code == -3: | |
return data_tuple({'IP': data.details['ip'], 'rDNS': data.rdns, 'Accuracy': '保留IP'}, data.status_code) | |
elif data.status_code != 200: | |
details = {'IP': data.details['ip'], 'ErrorMessage': data.details['msg']} | |
if data.cz_free: | |
details['CZ_GEO'] = data.cz_free[0] | |
details['CZ_ISP'] = data.cz_free[1] | |
return data_tuple(details, data.status_code, data.rdns if data.cz_free else None, data.cz_free) | |
details_data = data.details['data'] | |
if data.cz_free: | |
cz_free = data.cz_free[1] | |
if cz_free == '本机或本网络' or cz_free in details_data['isp']: | |
cz_free = None | |
else: | |
cz_free = None | |
if details_data['lng'] and details_data['lat']: | |
location = [ | |
{ | |
'Longitude': float(details_data['lng']), | |
'Latitude': float(details_data['lat']), | |
'Radius': float(details_data['radius']) if details_data['radius'] else None, | |
} | |
] | |
else: | |
location = [] | |
if data.cz_accurate and data.cz_accurate['locations']: | |
location += [ | |
{ | |
'Longitude': float(i['longitude']), | |
'Latitude': float(i['latitude']), | |
'Radius': i['radius'] / 1000, | |
} | |
for i in data.cz_accurate['locations'] | |
] | |
if details_data['timezone']: | |
if details_data['timezone'].endswith(':30'): | |
tz = float(details_data['timezone'][3:-3] + '.5') | |
else: | |
tz = int(details_data['timezone'][3:]) | |
else: | |
tz = None | |
parsed = { | |
'IP': data.details['ip'], | |
'rDNS': data.rdns, | |
'ASN': int(details_data['asnumber']) if details_data['asnumber'] else None, | |
'ISP': details_data['isp'], | |
'Owner': details_data['owner'] if details_data['owner'] != details_data['isp'] else None, | |
'Accuracy': details_data['accuracy'], | |
'Continent': details_data['continent'], | |
'Country': details_data['country'], | |
'CountryCode': details_data.get('areacode'), | |
'Province': details_data['prov'], | |
'City': details_data['city'], | |
'District': details_data['district'], | |
'AdministrativeCode': details_data['adcode'], | |
'CZ': cz_free, | |
'NetworkType': data.cz_accurate.get('netWorkType') if data.cz_accurate else None, | |
'ZipCode': details_data['zipcode'], | |
'Timezone': tz, | |
'Location': location, | |
'CurrencyCode': details_data.get('currency_code'), | |
'CurrencyName': details_data.get('currency_name'), | |
} | |
if details_data['accuracy'] == '保留IP': | |
return data_tuple({k: v if k in ['IP', 'rDNS', 'Accuracy'] else None for k, v in parsed.items()}, -3) | |
if details_data['accuracy'] == '区县': | |
parsed['Cost'] = '达到精度,正常扣费' | |
elif data.details['charge']: | |
parsed['Cost'] = '未达精度满 50 次,补扣费 1 次' | |
else: | |
parsed['Cost'] = '未达精度,不扣费' | |
return data_tuple(parsed, data.status_code) | |
def json_output(datas, nameservers): | |
print_data = [] | |
for data in datas: | |
t = {k: v.strip() if type(v) is str else v for k, v in data.details.items() if v} | |
if nameservers: | |
t['DNS'] = ', '.join(nameservers) | |
t['HTTPCode'] = data.status_code | |
print_data.append(t) | |
return json_dumps(print_data, ensure_ascii=False) | |
def stdout_output_gen(data, nameservers, show_cord=False): | |
details = data.details | |
output = [('IP', details['IP'])] | |
if data.status_code == -3: | |
if details.get('rDNS'): | |
output.append(('rDNS', details['rDNS'])) | |
output.append(('精度', '保留IP')) | |
elif data.status_code != 200: | |
if details.get('CZ_GEO'): | |
output.append(('地区', details['CZ_GEO'])) | |
if details.get('CZ_ISP'): | |
output.append(('纯真', details['CZ_ISP'])) | |
output.append(('错误', details['ErrorMessage'])) | |
else: | |
if nameservers: | |
output.append(('DNS', ', '.join(nameservers))) | |
if details['rDNS']: | |
output.append(('rDNS', details['rDNS'])) | |
if details['ISP']: | |
if details['ASN']: | |
details['ISP'] += f" (AS{details['ASN']})" | |
output.append(('ISP', details['ISP'])) | |
if details['Owner']: | |
output.append(('归属', details['Owner'])) | |
if details['Accuracy']: | |
output.append(('精度', details['Accuracy'] + '级')) | |
if country := ' '.join(i for i in (details['Continent'], details['Country']) if i): | |
tz_str = f"UTC{details['Timezone']:+}" if details['Timezone'] else '' | |
if extra := ', '.join(i for i in (details['CountryCode'], tz_str) if i): | |
country += f' ({extra})' | |
output.append(('国家', country)) | |
if location := ' '.join(i for i in (details['Province'], details['City'], details['District']) if i): | |
if details['AdministrativeCode']: | |
location += f" ({details['AdministrativeCode']})" | |
output.append(('地区', location)) | |
if details['CZ']: | |
if details['NetworkType']: | |
details['CZ'] += f" ({details['NetworkType']})" | |
output.append(('纯真', details['CZ'])) | |
elif details['NetworkType']: | |
output.append(('纯真', details['NetworkType'])) | |
if details['ZipCode']: | |
output.append(('邮编', details['ZipCode'])) | |
if show_cord and details['Location']: | |
s = '经纬' | |
for i in details['Location']: | |
output.append((s, f"{i['Longitude']}, {i['Latitude']} ({i['Radius']}km)")) | |
s = '' | |
if details['CurrencyCode']: | |
if details['CurrencyName']: | |
details['CurrencyCode'] += f" ({details['CurrencyName']})" | |
output.append(('货币', details['CurrencyCode'])) | |
output.append(('扣费', details['Cost'])) | |
return '\n'.join(f'{k}\t{v.strip()}' for k, v in output) | |
def stdout_output(datas, nameservers, stdin_input, show_cord): | |
print_text = [] | |
if stdin_input: | |
BOLD_TEXT = '\033[1;4m{text}\033[0m' | |
for ip in [i.details['IP'] for i in datas]: | |
stdin_input = stdin_input.replace(ip, BOLD_TEXT.format(text=ip)) | |
print_text.append(stdin_input.strip()) | |
print_text.append('========================================') | |
elif len(datas) > 1 and nameservers: | |
print_text.append('DNS\t' + ', '.join(nameservers)) | |
print_text += [stdout_output_gen(data, nameservers if len(datas) == 1 else None, show_cord) for data in datas] | |
return '\n\n'.join(print_text) | |
if __name__ == '__main__': | |
args = get_args() | |
if args.version: | |
try: | |
if not CZ_DATABASE: | |
raise RuntimeError | |
cz_version = get_cz_from_db(..., CZ_DATABASE)[1][:-4] | |
if args.json: | |
cz_version = cz_version[0:4] + cz_version[5:7] + cz_version[8:10] | |
except BaseException: | |
cz_version = 'Online' if args.json else '调用 API 在线查询' | |
if args.json: | |
print(json_dumps({'IPPlus': 'Online', 'CZ': cz_version}, ensure_ascii=False)) | |
else: | |
print(f'埃文\t调用 API 在线查询\n纯真\t{cz_version}') | |
exit(0) | |
try: | |
ips, nameservers, stdin_input = parse_target(args) | |
except RuntimeError as e: | |
err_msg = e.args[0] | |
if args.json: | |
print(json_dumps([{'ip': None, 'ErrorMessage': err_msg, 'HTTPCode': -2}], ensure_ascii=False)) | |
else: | |
print(f'错误\t{err_msg}') | |
exit(1) | |
if (not nameservers) or args.all: | |
datas = [query(ip, args.key, args.cz) for ip in ips] | |
else: | |
datas = [query(ips[0], args.key, args.cz)] | |
datas += [data_tuple({'ip': ip, 'msg': '使用 -a 参数查询所有 IP 地址的信息'}) for ip in ips[1:]] | |
datas = [parse_rawdata(data) for data in datas] | |
if args.json: | |
result = json_output(datas, nameservers) | |
else: | |
result = stdout_output(datas, nameservers, stdin_input, args.cord) | |
print(result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
############################################################################### | |
# Telegram Bot for # | |
# AIWEN Tech IPv4 Geo-Info API Script # | |
############################################################################### | |
# Requires `ipplus360.py` to be in the same directory. # | |
# Requires Python 3.8+ due to the requirements of pyrate-limiter. # | |
# Tested on Debian 12. # | |
# # | |
# Run the following command to install dependencies: # | |
# python3 -m pip install --upgrade \ # | |
# aiohttp dnspython pyrate-limiter pyTelegramBotAPI requests # | |
############################################################################### | |
import re | |
import string | |
from html import escape | |
from ipaddress import IPv4Address as IP | |
from random import SystemRandom | |
from time import sleep | |
import telebot | |
from aiohttp import web | |
from pyrate_limiter import BucketFullException, Duration, Limiter, Rate | |
import ipplus360 # type: ignore | |
################################################################################### | |
# # # # # # # # # # # # # # Configuration Section Start # # # # # # # # # # # # # # | |
################################################################################### | |
# AIWEN_API_KEY: Your API Key of AIWEN Tech IPv4 Geo-Info API | |
AIWEN_API_KEY = '' | |
# BOT_TOKEN: Your Telegram Bot Token | |
BOT_TOKEN = '' | |
# DNS_LIST: List of DNS servers to use when resolving domain names | |
DNS_LIST = ['223.5.5.5', '119.29.29.29', '8.8.8.8', '114.114.114.114'] | |
# CZ_DATABASE: Path to the CZ Database file. Leave it blank to use online API | |
CZ_DATABASE = '' | |
# Webhook settings: Leave WEBHOOK_URL blank to use infinity_polling | |
WEBHOOK_URL = '' | |
WEBHOOK_LISTEN_HOST = None | |
WEBHOOK_LISTEN_PORT = None | |
# WHITELIST_ID: | |
# Whitelist of User/Group ID. Leave it blank to allow all users | |
# Use `None` to disable the rate limit | |
# Use `...` to use default settings | |
WHITELIST_ID = { | |
11111: Rate(3, Duration.SECOND), | |
22222: None, | |
33333: ..., | |
} | |
DEFAULT_SETTINGS = [ | |
Rate(15, Duration.MINUTE), | |
Rate(60, Duration.HOUR), | |
Rate(200, Duration.DAY), | |
] | |
################################################################################### | |
# # # # # # # # # # # # # # Configuration Section End # # # # # # # # # # # # # # | |
################################################################################### | |
bot = telebot.TeleBot(BOT_TOKEN) | |
for k, v in WHITELIST_ID.items(): | |
if v is ...: | |
WHITELIST_ID[k] = Limiter(DEFAULT_SETTINGS) | |
elif v is not None: | |
WHITELIST_ID[k] = Limiter(v) | |
HELP_TEXT = ( | |
'发送 IP 即可查询其地理信息,仅支持 IPv4 地址\n' | |
'发送整段文本可从中提取 IP 进行查询\n\n' | |
'也支持发送单一域名,将解析其指向的 IPv4 地址,查询对应的地理信息\n' | |
'此时可在待查询域名后跟随若干由空格分隔的 IP 地址用作 DNS 服务器' | |
) | |
def parse_target(query_text): | |
ips, nameservers, paragraph_input = None, None, None | |
if '\n' not in query_text: | |
t = query_text.split() | |
for i in t[1:]: | |
try: | |
IP(i) | |
except BaseException: | |
break | |
else: | |
ips, nameservers = ipplus360.resolve(t[0], dns if (dns := [str(IP(i)) for i in t[1:]]) else DNS_LIST) | |
if type(ips) is str: | |
if len(t) == 1: | |
raise RuntimeError(ips) | |
ips = None | |
elif not nameservers: | |
ips.extend(str(IP(i)) for i in t[1:]) | |
if not ips: | |
paragraph_input = query_text | |
ipv4_pattern = re.compile(r'(?:(?<=[^\d^.])|^)(?:(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)\.?\b){4}(?:(?=[^\d^.])|$)') | |
ips = list(ipv4_pattern.findall(paragraph_input)) | |
if not ips: | |
raise RuntimeError('无法从输入中提取出 IP 地址') | |
else: | |
ips = list(dict.fromkeys(ips)) # 去重 | |
return ips, nameservers, paragraph_input | |
def gen_output_text(data, nameservers=None): | |
output = [] | |
for line in ipplus360.stdout_output_gen(data, nameservers).splitlines(): | |
line = line.replace('\t', ' ') | |
if line.startswith('IP'): | |
output.append('IP ' + line[4:]) | |
elif line.startswith('rDNS'): | |
output.append('rDNS ' + line[6:]) | |
elif line.startswith('扣费'): | |
pass | |
else: | |
output.append(line) | |
return '```IP-Info\n' + '\n'.join(output) + '```' | |
def edit_msg_with_button(text, msg): | |
if '错误' in text: | |
markup = telebot.types.InlineKeyboardMarkup() | |
markup.row_width = 1 | |
markup.add(telebot.types.InlineKeyboardButton('重试错误项', callback_data='query_all')) | |
else: | |
markup = None | |
bot.edit_message_text(text.strip(), msg.chat.id, msg.message_id, parse_mode='Markdown', reply_markup=markup) | |
@bot.message_handler() | |
def on_message(message): | |
# 命令内容分解 | |
if message.reply_to_message and message.reply_to_message.from_user.id == bot.get_me().id: | |
return | |
query_text = message.text.strip() | |
command = query_text.split(maxsplit=1)[0] | |
if message.chat.type == 'private' and query_text == '/start': | |
bot.reply_to(message, HELP_TEXT) | |
return | |
elif ( | |
message.chat.type == 'private' and query_text == '/version' | |
) or query_text == f'/version@{bot.get_me().username}': | |
try: | |
if not CZ_DATABASE: | |
raise RuntimeError | |
cz_version = ipplus360.get_cz_from_db(..., CZ_DATABASE)[1][:-4] | |
except BaseException: | |
cz_version = '调用 API 在线查询' | |
bot.reply_to(message, f'```Version\n埃文 调用 API 在线查询\n纯真 {cz_version}```', parse_mode='Markdown') | |
return | |
elif command in ['/help', f'/help@{bot.get_me().username}']: | |
bot.reply_to(message, HELP_TEXT) | |
return | |
elif command in ['/ip', f'/ip@{bot.get_me().username}', f'@{bot.get_me().username}']: | |
query_text = query_text.split(maxsplit=1) | |
if len(query_text) == 1: | |
if message.reply_to_message: | |
query_text = message.reply_to_message.text.strip() | |
if query_text.startswith('/') or query_text.startswith('@'): | |
query_text = query_text.split(maxsplit=1) | |
if len(query_text) == 1: | |
query_text = query_text[0] | |
else: | |
query_text = query_text[1] | |
else: | |
bot.reply_to(message, HELP_TEXT) | |
return | |
else: | |
query_text = query_text[1] | |
elif message.chat.type != 'private': | |
return | |
# 获取 limiter | |
if WHITELIST_ID: | |
try: | |
limiter = WHITELIST_ID[message.from_user.id] | |
except KeyError: | |
try: | |
limiter = WHITELIST_ID[message.chat.id] | |
except KeyError: | |
bot.reply_to(message, '您没有权限使用此 Bot') | |
return | |
# 开始查询 | |
msg = bot.reply_to(message, '正在查询...') | |
bot.send_chat_action(chat_id=message.chat.id, action='typing') | |
# 解析输入 | |
try: | |
ips, nameservers, paragraph_input = parse_target(query_text) | |
except RuntimeError as e: | |
bot.edit_message_text(f'```IP-Info\n错误 {e.args[0]}```', message.chat.id, msg.message_id, parse_mode='Markdown') | |
return | |
# 查询 | |
datas = [] | |
for ip in ips: | |
try: | |
if limiter: | |
limiter.try_acquire(ip) | |
datas.append(ipplus360.query(ip, AIWEN_API_KEY, dns_nameservers=DNS_LIST, db_path=CZ_DATABASE)) | |
except BucketFullException: | |
try: | |
cz_free = ipplus360.get_cz_from_db(ip, CZ_DATABASE) | |
except BaseException: | |
cz_free = None | |
datas.append(ipplus360.data_tuple({'ip': ip, 'msg': '超出速率限制,请稍后再提交'}, cz_free=cz_free)) | |
if nameservers: # Domain 默认不查询所有 IP | |
datas += [ipplus360.data_tuple({'ip': ip, 'msg': '默认不查询所有 IP'}) for ip in ips[1:]] | |
break | |
datas = [ipplus360.parse_rawdata(data) for data in datas] | |
# 生成输出 | |
output_text = [] | |
if paragraph_input: # 整段输入 | |
paragraph_input = escape(paragraph_input) | |
for ip in [i.details['IP'] for i in datas]: | |
paragraph_input = paragraph_input.replace(ip, f'`{ip}`') | |
output_text.append(paragraph_input.strip()) | |
elif len(datas) > 1 and nameservers: # 多个 IP 的域名,DNS 前置 | |
output_text.append('DNS: `' + ', '.join(nameservers) + '`') | |
if len(datas) > 1: | |
output_text += [gen_output_text(data) for data in datas] | |
else: | |
output_text.append(gen_output_text(datas[0], nameservers)) | |
output_text = '\n\n'.join(output_text) | |
# 输出 | |
edit_msg_with_button(output_text, msg) | |
@bot.callback_query_handler(func=lambda call: call.message.reply_to_message.from_user.id == call.from_user.id) | |
def on_button_click(call): | |
text = re.sub(r'(<pre>IP +.*\n(?:.+\n)*)错误 +.+(</pre>)', r'\1正在重新查询...\2', call.message.html_text) | |
text = text.replace('<pre>', '```IP-Info\n').replace('</pre>', '```') | |
text = text.replace('<code>', '`').replace('</code>', '`') | |
bot.edit_message_text(text.strip(), call.message.chat.id, call.message.message_id, parse_mode='Markdown') | |
bot.send_chat_action(chat_id=call.message.chat.id, action='typing') | |
# 获取 limiter | |
if WHITELIST_ID: | |
try: | |
limiter = WHITELIST_ID[call.from_user.id] | |
except KeyError: | |
limiter = WHITELIST_ID[call.message.reply_to_message.chat.id] | |
# 逐个对错误项重新查询 | |
for match in re.finditer(r'```IP-Info\nIP +(.*)\n(?:.+\n)*正在重新查询...```', text): | |
old_text, ip = match.group(0), match.group(1) | |
try: | |
if limiter: | |
limiter.try_acquire(ip) | |
data = ipplus360.query(ip, AIWEN_API_KEY, dns_nameservers=DNS_LIST, db_path=CZ_DATABASE) | |
except BucketFullException: | |
try: | |
cz_free = ipplus360.get_cz_from_db(ip, CZ_DATABASE) | |
except BaseException: | |
cz_free = None | |
data = ipplus360.data_tuple({'ip': ip, 'msg': '超出速率限制,请稍后再提交'}, cz_free=cz_free) | |
output = gen_output_text(ipplus360.parse_rawdata(data)) | |
text = text.replace(old_text, output) | |
# 输出 | |
edit_msg_with_button(text, call.message) | |
bot.enable_save_next_step_handlers(delay=2, filename='./step.save') | |
bot.load_next_step_handlers(filename='./step.save') | |
bot.set_my_commands( | |
[telebot.types.BotCommand('ip', '查询 IP/域名 归属地'), telebot.types.BotCommand('help', '使用帮助')], | |
scope=telebot.types.BotCommandScopeAllGroupChats(), | |
) | |
bot.set_my_commands( | |
[telebot.types.BotCommand('help', '使用帮助')], | |
scope=telebot.types.BotCommandScopeAllPrivateChats(), | |
) | |
bot.remove_webhook() | |
if WEBHOOK_URL: | |
sleep(0.5) | |
WEBHOOK_SECRET = ''.join( | |
SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32) | |
) | |
bot.set_webhook(url=WEBHOOK_URL, secret_token=WEBHOOK_SECRET) | |
async def handle(request): | |
secret = request.headers.get('X-Telegram-Bot-Api-Secret-Token') | |
if secret == WEBHOOK_SECRET: | |
request_body_dict = await request.json() | |
update = telebot.types.Update.de_json(request_body_dict) | |
bot.process_new_updates([update]) | |
return web.Response() | |
else: | |
return web.Response(status=403) | |
app = web.Application() | |
app.router.add_post('/', handle) | |
web.run_app(app, host=WEBHOOK_LISTEN_HOST, port=WEBHOOK_LISTEN_PORT) | |
else: | |
bot.infinity_polling() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment