Skip to content

Instantly share code, notes, and snippets.

@Potat0000
Last active May 11, 2024 06:21
Show Gist options
  • Save Potat0000/239a86dd152f12ab4a76092b9d735ae9 to your computer and use it in GitHub Desktop.
Save Potat0000/239a86dd152f12ab4a76092b9d735ae9 to your computer and use it in GitHub Desktop.
AIWEN Tech IPv4 Geo-Info API Script
#!/usr/bin/env python3
###############################################################################
# AIWEN Tech IPv4 Geo-Info API Script #
# Only Support District-Level Query #
###############################################################################
# Requires Python 3.7+ due to the requirements of dnspython. #
# Tested on Debian 12. #
# #
# Use -h or --help option to get instructions. #
# Run the following command to install dependencies: #
# python3 -m pip install --upgrade dnspython requests #
###############################################################################
# The default DNS servers can be set in the DNS_LIST variable below. #
# The script will try to query CZ free API locally if CZ_DATABASE is set. #
# The API Key will be read from the following sources in order: #
# 1. --key option #
# 2. The KEY variables below #
# 3. Environment variable AIWEN_API_KEY #
# Note that the key will be picked from the next source only if it is empty. #
# The validity of the key won't be checked during pickup. #
###############################################################################
KEY = ''
DNS_LIST = ['223.5.5.5', '119.29.29.29', '8.8.8.8', '114.114.114.114']
CZ_DATABASE = ''
from argparse import ArgumentParser, RawTextHelpFormatter
from collections import namedtuple
from ipaddress import IPv4Address, IPv6Address, ip_network
from json import dumps as json_dumps
from os import getenv
from re import compile as re_pattern
from struct import unpack
from sys import stdin
from threading import Thread
from time import perf_counter as current_time
from urllib.parse import urlparse
import dns.resolver
import requests
from dns.reversename import from_address as reverse_address
API_TIMEOUT = 5
MIN_CZ_TIME = 3
data_tuple = namedtuple(
'data_tuple', ['details', 'status_code', 'rdns', 'cz_free', 'cz_accurate'], defaults=(None, 0, None, None, None)
)
def get_cz_from_db(ip, db_path='qqwry.dat'):
def get_long3(): # 获取 3 字节的长整形数
a, b = unpack('HB', db.read(3))
return (b << 16) + a
def get_string(offset): # 从 offset 位置读取字符串
if offset:
db.seek(offset)
temp = b''
while (c := db.read(1))[0]:
temp += c
return temp.decode('gbk')
if ip is ...: # 获取版本号
ip_num = 4294967040 # int(ipaddress.IPv4Address('255.255.255.0'))
else:
ip_num = int(IPv4Address(ip))
if ip_num >= 4294967040:
ip_num = 4026531840 # int(ipaddress.IPv4Address('240.0.0.0'))
with open(db_path, 'rb') as db:
# 初始化
first_index, last_index = unpack('II', db.read(8))
index_count = int((last_index - first_index) / 7 + 1)
# 查找 IP 所在范围
L, R = 0, index_count - 1
while L < R - 1:
M = int((L + R) / 2)
db.seek(first_index + M * 7)
start_ip = unpack('IHB', db.read(7))[0]
if ip_num == start_ip:
L = M
break
if ip_num > start_ip:
L = M
else:
R = M
if ip is ...:
L = R # 获取版本号
db.seek(first_index + L * 7)
start_ip, of1, of2 = unpack('IHB', db.read(7))
end_ip_offset = of1 + (of2 << 16)
db.seek(end_ip_offset)
end_ip = unpack('I', db.read(4))[0]
# 无数据
if not start_ip <= ip_num <= end_ip:
return None
# 读取 GEO 信息
db.seek(end_ip_offset + 4)
byte = db.read(1)[0]
isp_offset = 0
if byte == 0x01: # 重定向模式1
geo_offset = get_long3()
db.seek(geo_offset)
if db.read(1)[0] == 0x02:
geo = get_string(get_long3())
db.seek(geo_offset + 4)
else:
geo = get_string(geo_offset)
elif byte == 0x02: # 重定向模式2
geo = get_string(get_long3())
isp_offset = end_ip_offset + 8
else: # 字符串模式
geo = get_string(end_ip_offset + 4)
# 获取 ISP 信息
if isp_offset:
db.seek(isp_offset)
if db.read(1)[0] in [0x01, 0x02]:
p = get_long3()
if p:
isp = get_string(p)
else:
isp = ''
else:
db.seek(-1, 1)
isp = get_string(isp_offset)
return geo.strip(), isp.strip()
def get_rdns(ip, dns_nameservers=None):
resolver = dns.resolver.Resolver()
resolver.timeout = 0.25
resolver.lifetime = 1
resolver.nameservers = dns_nameservers if dns_nameservers else DNS_LIST
domain = None
try:
domain = str(resolver.resolve(reverse_address(ip), 'PTR')[0])
if ip == domain:
domain = None
except BaseException:
pass
return domain
def resolve(address, dns_nameservers=None):
if '//' in address:
address = urlparse(address).hostname
try:
IPv6Address(address)
return '不支持 IPv6 地址', None
except BaseException:
pass
try:
return [str(IPv4Address(address))], None
except BaseException:
try:
ip_network(address)
return '不支持 IP 段查询', None
except BaseException:
pass
if ':' in address or address.split('.')[-1].isdigit():
return '无效的地址', None
error_msg = None
for dns_nameserver in dns_nameservers if dns_nameservers else DNS_LIST:
ips = []
try:
resolver = dns.resolver.Resolver()
resolver.timeout = 0.25
resolver.lifetime = 1
resolver.nameservers = [dns_nameserver]
for i in resolver.resolve(address, 'A'):
try:
ips.append(str(i))
except BaseException:
pass
except dns.resolver.NXDOMAIN:
if not ips and not error_msg:
error_msg = '域名不存在'
except dns.resolver.NoAnswer:
if not ips and not error_msg:
error_msg = '域名无 A 记录'
except BaseException:
pass
if ips:
return ips, resolver.nameservers
return error_msg if error_msg else '域名解析失败', None
def get_aiwen(ip, key):
api = 'https://api.ipplus360.com/ip/geo/v1/district/?key={key}&ip={ip}'
try:
raw = requests.get(api.format(key=key, ip=ip), timeout=API_TIMEOUT)
return raw.json(), raw.status_code
except requests.exceptions.Timeout:
return None, -1
def get_cz_free(ip, db_path=CZ_DATABASE):
try:
geo, isp = get_cz_from_db(ip, db_path)
isp = isp if isp != 'CZ88.NET' else ''
return geo, isp
except BaseException:
api = 'https://cz88.net/api/cz88/ip/openIPInfo?ip={ip}'
try:
raw = requests.get(api.format(ip=ip), timeout=API_TIMEOUT)
if raw.status_code == 200 and raw.json()['success'] is True:
raw = raw.json()['data']
geo = raw['geo'].strip()
isp = raw['isp'].strip()
if isp.endswith('CZ88.NET'):
isp = isp[:-8]
return geo, isp
except requests.exceptions.Timeout:
pass
except KeyError:
pass
return None
def get_cz_accurate(ip):
api = 'https://cz88.net/api/cz88/ip/accurate?ip={ip}'
try:
raw = requests.get(api.format(ip=ip), timeout=API_TIMEOUT)
if raw.status_code == 200 and raw.json()['success'] is True:
return raw.json().get('data', None)
except requests.exceptions.Timeout:
pass
return None
def get_args():
parser = ArgumentParser(
description='A script that calls AIWEN Tech IPv4 Geo-Information API',
usage='%(prog)s [options] target [ip2/dns1] [ip3/dns2] ...',
add_help=False,
formatter_class=RawTextHelpFormatter,
)
parser.add_argument(
'target',
help='IP or Domain that wants to test\nUse "-" to read from stdin',
nargs='?',
default='-',
)
parser.add_argument(
'dns',
help='DNS to resolve domain\nOnly available if target is a domain',
nargs='*',
)
parser.add_argument(
'-a',
'--all',
help='Get IP info for all IP addresses resolved from the domain\nAutomatically enabled when using stdin input',
action='store_true',
)
parser.add_argument('--cz', help='Use CZ paid API (limited free trials per day)', action='store_true')
parser.add_argument('-c', '--cord', help='Get coordinates in text format result', action='store_true')
parser.add_argument('-j', '--json', help='Get JSON format result', action='store_true')
parser.add_argument('--key', help='API Key for ipplus360.com')
parser.add_argument('-v', '--version', help='Show version and exit', action='store_true')
parser.add_argument('-h', '--help', help='Show this help message and exit', action='help')
args = parser.parse_args()
if not args.version and (args.target == '-' and stdin.isatty()):
parser.error('required argument "target" is missing')
if not args.key:
if KEY:
args.key = KEY
else:
args.key = getenv('AIWEN_API_KEY', '')
if args.dns:
dns = []
for i in args.dns:
try:
dns.append(str(IPv4Address(i)))
except BaseException:
pass
args.dns = dns
return args
def parse_target(args):
stdin_input = None
if args.target == '-':
nameservers = None
stdin_input = stdin.buffer.read()
try:
stdin_input = stdin_input.decode('utf-8')
except UnicodeDecodeError:
stdin_input = stdin_input.decode(encoding=stdin.encoding)
ipv4_pattern = re_pattern(r'(?:(?<=[^\d^.])|^)(?:(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)\.?\b){4}(?:(?=[^\d^.])|$)')
ips = list(ipv4_pattern.findall(stdin_input))
if not ips:
raise RuntimeError('无法从输入中提取出 IP 地址')
else:
ips, nameservers = resolve(args.target, args.dns)
if type(ips) is str:
raise RuntimeError(ips)
elif not nameservers:
ips.extend(str(IPv4Address(i)) for i in args.dns)
ips = list(dict.fromkeys(ips)) # 去重
return ips, nameservers, stdin_input
class ThreadWithReturnValue(Thread):
def __init__(self, *args, **kwargs):
Thread.__init__(self, *args, **kwargs)
self.rtn_value = None
def run(self):
if self._target is not None:
try:
self.rtn_value = self._target(*self._args, **self._kwargs)
except BaseException:
pass
def join(self, *args):
Thread.join(self, *args)
return self.rtn_value
def query(ip, key, enable_cz=False, dns_nameservers=None, db_path=CZ_DATABASE):
if IPv4Address(ip).is_private:
return data_tuple({'ip': ip, 'msg': '保留IP'}, -3, get_rdns(ip, dns_nameservers))
aiwen_thread = ThreadWithReturnValue(target=get_aiwen, args=(ip, key), daemon=True)
rdns_thread = ThreadWithReturnValue(target=get_rdns, args=(ip, dns_nameservers), daemon=True)
cz_free_thread = ThreadWithReturnValue(target=get_cz_free, args=(ip, db_path), daemon=True)
cz_accurate_thread = ThreadWithReturnValue(target=get_cz_accurate, args=(ip,), daemon=True)
time_start = current_time()
aiwen_thread.start()
rdns_thread.start()
cz_free_thread.start()
try:
raw, status_code = aiwen_thread.join()
except BaseException:
raw, status_code = {'ip': ip, 'msg': '请求错误'}, -2
if status_code == -1:
raw, status_code = {'ip': ip, 'msg': '请求超时'}, -1
if status_code == 200 and raw['code'] == 'NoData':
status_code = 204
if status_code != 200:
raw, status_code = {'ip': ip, **raw}, status_code
if enable_cz and status_code == 200:
cz_accurate_thread.start()
rdns = rdns_thread.join()
if cz_free_thread.is_alive() and (delta := current_time() - time_start) < MIN_CZ_TIME:
cz_free_thread.join(MIN_CZ_TIME - delta)
cz_free = cz_free_thread.rtn_value
if cz_accurate_thread.is_alive() and (delta := current_time() - time_start) < MIN_CZ_TIME:
cz_accurate_thread.join(MIN_CZ_TIME - delta)
cz_accurate = cz_accurate_thread.rtn_value
return data_tuple(raw, status_code, rdns, cz_free, cz_accurate)
def parse_rawdata(data):
if data.status_code == -3:
return data_tuple({'IP': data.details['ip'], 'rDNS': data.rdns, 'Accuracy': '保留IP'}, data.status_code)
elif data.status_code != 200:
details = {'IP': data.details['ip'], 'ErrorMessage': data.details['msg']}
if data.cz_free:
details['CZ_GEO'] = data.cz_free[0]
details['CZ_ISP'] = data.cz_free[1]
return data_tuple(details, data.status_code, data.rdns if data.cz_free else None, data.cz_free)
details_data = data.details['data']
if data.cz_free:
cz_free = data.cz_free[1]
if cz_free == '本机或本网络' or cz_free in details_data['isp']:
cz_free = None
else:
cz_free = None
if details_data['lng'] and details_data['lat']:
location = [
{
'Longitude': float(details_data['lng']),
'Latitude': float(details_data['lat']),
'Radius': float(details_data['radius']) if details_data['radius'] else None,
}
]
else:
location = []
if data.cz_accurate and data.cz_accurate['locations']:
location += [
{
'Longitude': float(i['longitude']),
'Latitude': float(i['latitude']),
'Radius': i['radius'] / 1000,
}
for i in data.cz_accurate['locations']
]
if details_data['timezone']:
if details_data['timezone'].endswith(':30'):
tz = float(details_data['timezone'][3:-3] + '.5')
else:
tz = int(details_data['timezone'][3:])
else:
tz = None
parsed = {
'IP': data.details['ip'],
'rDNS': data.rdns,
'ASN': int(details_data['asnumber']) if details_data['asnumber'] else None,
'ISP': details_data['isp'],
'Owner': details_data['owner'] if details_data['owner'] != details_data['isp'] else None,
'Accuracy': details_data['accuracy'],
'Continent': details_data['continent'],
'Country': details_data['country'],
'CountryCode': details_data.get('areacode'),
'Province': details_data['prov'],
'City': details_data['city'],
'District': details_data['district'],
'AdministrativeCode': details_data['adcode'],
'CZ': cz_free,
'NetworkType': data.cz_accurate.get('netWorkType') if data.cz_accurate else None,
'ZipCode': details_data['zipcode'],
'Timezone': tz,
'Location': location,
'CurrencyCode': details_data.get('currency_code'),
'CurrencyName': details_data.get('currency_name'),
}
if details_data['accuracy'] == '保留IP':
return data_tuple({k: v if k in ['IP', 'rDNS', 'Accuracy'] else None for k, v in parsed.items()}, -3)
if details_data['accuracy'] == '区县':
parsed['Cost'] = '达到精度,正常扣费'
elif data.details['charge']:
parsed['Cost'] = '未达精度满 50 次,补扣费 1 次'
else:
parsed['Cost'] = '未达精度,不扣费'
return data_tuple(parsed, data.status_code)
def json_output(datas, nameservers):
print_data = []
for data in datas:
t = {k: v.strip() if type(v) is str else v for k, v in data.details.items() if v}
if nameservers:
t['DNS'] = ', '.join(nameservers)
t['HTTPCode'] = data.status_code
print_data.append(t)
return json_dumps(print_data, ensure_ascii=False)
def stdout_output_gen(data, nameservers, show_cord=False):
details = data.details
output = [('IP', details['IP'])]
if data.status_code == -3:
if details.get('rDNS'):
output.append(('rDNS', details['rDNS']))
output.append(('精度', '保留IP'))
elif data.status_code != 200:
if details.get('CZ_GEO'):
output.append(('地区', details['CZ_GEO']))
if details.get('CZ_ISP'):
output.append(('纯真', details['CZ_ISP']))
output.append(('错误', details['ErrorMessage']))
else:
if nameservers:
output.append(('DNS', ', '.join(nameservers)))
if details['rDNS']:
output.append(('rDNS', details['rDNS']))
if details['ISP']:
if details['ASN']:
details['ISP'] += f" (AS{details['ASN']})"
output.append(('ISP', details['ISP']))
if details['Owner']:
output.append(('归属', details['Owner']))
if details['Accuracy']:
output.append(('精度', details['Accuracy'] + '级'))
if country := ' '.join(i for i in (details['Continent'], details['Country']) if i):
tz_str = f"UTC{details['Timezone']:+}" if details['Timezone'] else ''
if extra := ', '.join(i for i in (details['CountryCode'], tz_str) if i):
country += f' ({extra})'
output.append(('国家', country))
if location := ' '.join(i for i in (details['Province'], details['City'], details['District']) if i):
if details['AdministrativeCode']:
location += f" ({details['AdministrativeCode']})"
output.append(('地区', location))
if details['CZ']:
if details['NetworkType']:
details['CZ'] += f" ({details['NetworkType']})"
output.append(('纯真', details['CZ']))
elif details['NetworkType']:
output.append(('纯真', details['NetworkType']))
if details['ZipCode']:
output.append(('邮编', details['ZipCode']))
if show_cord and details['Location']:
s = '经纬'
for i in details['Location']:
output.append((s, f"{i['Longitude']}, {i['Latitude']} ({i['Radius']}km)"))
s = ''
if details['CurrencyCode']:
if details['CurrencyName']:
details['CurrencyCode'] += f" ({details['CurrencyName']})"
output.append(('货币', details['CurrencyCode']))
output.append(('扣费', details['Cost']))
return '\n'.join(f'{k}\t{v.strip()}' for k, v in output)
def stdout_output(datas, nameservers, stdin_input, show_cord):
print_text = []
if stdin_input:
BOLD_TEXT = '\033[1;4m{text}\033[0m'
for ip in [i.details['IP'] for i in datas]:
stdin_input = stdin_input.replace(ip, BOLD_TEXT.format(text=ip))
print_text.append(stdin_input.strip())
print_text.append('========================================')
elif len(datas) > 1 and nameservers:
print_text.append('DNS\t' + ', '.join(nameservers))
print_text += [stdout_output_gen(data, nameservers if len(datas) == 1 else None, show_cord) for data in datas]
return '\n\n'.join(print_text)
if __name__ == '__main__':
args = get_args()
if args.version:
try:
if not CZ_DATABASE:
raise RuntimeError
cz_version = get_cz_from_db(..., CZ_DATABASE)[1][:-4]
if args.json:
cz_version = cz_version[0:4] + cz_version[5:7] + cz_version[8:10]
except BaseException:
cz_version = 'Online' if args.json else '调用 API 在线查询'
if args.json:
print(json_dumps({'IPPlus': 'Online', 'CZ': cz_version}, ensure_ascii=False))
else:
print(f'埃文\t调用 API 在线查询\n纯真\t{cz_version}')
exit(0)
try:
ips, nameservers, stdin_input = parse_target(args)
except RuntimeError as e:
err_msg = e.args[0]
if args.json:
print(json_dumps([{'ip': None, 'ErrorMessage': err_msg, 'HTTPCode': -2}], ensure_ascii=False))
else:
print(f'错误\t{err_msg}')
exit(1)
if (not nameservers) or args.all:
datas = [query(ip, args.key, args.cz) for ip in ips]
else:
datas = [query(ips[0], args.key, args.cz)]
datas += [data_tuple({'ip': ip, 'msg': '使用 -a 参数查询所有 IP 地址的信息'}) for ip in ips[1:]]
datas = [parse_rawdata(data) for data in datas]
if args.json:
result = json_output(datas, nameservers)
else:
result = stdout_output(datas, nameservers, stdin_input, args.cord)
print(result)
#!/usr/bin/env python3
###############################################################################
# Telegram Bot for #
# AIWEN Tech IPv4 Geo-Info API Script #
###############################################################################
# Requires `ipplus360.py` to be in the same directory. #
# Requires Python 3.8+ due to the requirements of pyrate-limiter. #
# Tested on Debian 12. #
# #
# Run the following command to install dependencies: #
# python3 -m pip install --upgrade \ #
# aiohttp dnspython pyrate-limiter pyTelegramBotAPI requests #
###############################################################################
import re
import string
from html import escape
from ipaddress import IPv4Address as IP
from random import SystemRandom
from time import sleep
import telebot
from aiohttp import web
from pyrate_limiter import BucketFullException, Duration, Limiter, Rate
import ipplus360 # type: ignore
###################################################################################
# # # # # # # # # # # # # # Configuration Section Start # # # # # # # # # # # # # #
###################################################################################
# AIWEN_API_KEY: Your API Key of AIWEN Tech IPv4 Geo-Info API
AIWEN_API_KEY = ''
# BOT_TOKEN: Your Telegram Bot Token
BOT_TOKEN = ''
# DNS_LIST: List of DNS servers to use when resolving domain names
DNS_LIST = ['223.5.5.5', '119.29.29.29', '8.8.8.8', '114.114.114.114']
# CZ_DATABASE: Path to the CZ Database file. Leave it blank to use online API
CZ_DATABASE = ''
# Webhook settings: Leave WEBHOOK_URL blank to use infinity_polling
WEBHOOK_URL = ''
WEBHOOK_LISTEN_HOST = None
WEBHOOK_LISTEN_PORT = None
# WHITELIST_ID:
# Whitelist of User/Group ID. Leave it blank to allow all users
# Use `None` to disable the rate limit
# Use `...` to use default settings
WHITELIST_ID = {
11111: Rate(3, Duration.SECOND),
22222: None,
33333: ...,
}
DEFAULT_SETTINGS = [
Rate(15, Duration.MINUTE),
Rate(60, Duration.HOUR),
Rate(200, Duration.DAY),
]
###################################################################################
# # # # # # # # # # # # # # Configuration Section End # # # # # # # # # # # # # #
###################################################################################
bot = telebot.TeleBot(BOT_TOKEN)
for k, v in WHITELIST_ID.items():
if v is ...:
WHITELIST_ID[k] = Limiter(DEFAULT_SETTINGS)
elif v is not None:
WHITELIST_ID[k] = Limiter(v)
HELP_TEXT = (
'发送 IP 即可查询其地理信息,仅支持 IPv4 地址\n'
'发送整段文本可从中提取 IP 进行查询\n\n'
'也支持发送单一域名,将解析其指向的 IPv4 地址,查询对应的地理信息\n'
'此时可在待查询域名后跟随若干由空格分隔的 IP 地址用作 DNS 服务器'
)
def parse_target(query_text):
ips, nameservers, paragraph_input = None, None, None
if '\n' not in query_text:
t = query_text.split()
for i in t[1:]:
try:
IP(i)
except BaseException:
break
else:
ips, nameservers = ipplus360.resolve(t[0], dns if (dns := [str(IP(i)) for i in t[1:]]) else DNS_LIST)
if type(ips) is str:
if len(t) == 1:
raise RuntimeError(ips)
ips = None
elif not nameservers:
ips.extend(str(IP(i)) for i in t[1:])
if not ips:
paragraph_input = query_text
ipv4_pattern = re.compile(r'(?:(?<=[^\d^.])|^)(?:(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)\.?\b){4}(?:(?=[^\d^.])|$)')
ips = list(ipv4_pattern.findall(paragraph_input))
if not ips:
raise RuntimeError('无法从输入中提取出 IP 地址')
else:
ips = list(dict.fromkeys(ips)) # 去重
return ips, nameservers, paragraph_input
def gen_output_text(data, nameservers=None):
output = []
for line in ipplus360.stdout_output_gen(data, nameservers).splitlines():
line = line.replace('\t', ' ')
if line.startswith('IP'):
output.append('IP ' + line[4:])
elif line.startswith('rDNS'):
output.append('rDNS ' + line[6:])
elif line.startswith('扣费'):
pass
else:
output.append(line)
return '```IP-Info\n' + '\n'.join(output) + '```'
def edit_msg_with_button(text, msg):
if '错误' in text:
markup = telebot.types.InlineKeyboardMarkup()
markup.row_width = 1
markup.add(telebot.types.InlineKeyboardButton('重试错误项', callback_data='query_all'))
else:
markup = None
bot.edit_message_text(text.strip(), msg.chat.id, msg.message_id, parse_mode='Markdown', reply_markup=markup)
@bot.message_handler()
def on_message(message):
# 命令内容分解
if message.reply_to_message and message.reply_to_message.from_user.id == bot.get_me().id:
return
query_text = message.text.strip()
command = query_text.split(maxsplit=1)[0]
if message.chat.type == 'private' and query_text == '/start':
bot.reply_to(message, HELP_TEXT)
return
elif (
message.chat.type == 'private' and query_text == '/version'
) or query_text == f'/version@{bot.get_me().username}':
try:
if not CZ_DATABASE:
raise RuntimeError
cz_version = ipplus360.get_cz_from_db(..., CZ_DATABASE)[1][:-4]
except BaseException:
cz_version = '调用 API 在线查询'
bot.reply_to(message, f'```Version\n埃文 调用 API 在线查询\n纯真 {cz_version}```', parse_mode='Markdown')
return
elif command in ['/help', f'/help@{bot.get_me().username}']:
bot.reply_to(message, HELP_TEXT)
return
elif command in ['/ip', f'/ip@{bot.get_me().username}', f'@{bot.get_me().username}']:
query_text = query_text.split(maxsplit=1)
if len(query_text) == 1:
if message.reply_to_message:
query_text = message.reply_to_message.text.strip()
if query_text.startswith('/') or query_text.startswith('@'):
query_text = query_text.split(maxsplit=1)
if len(query_text) == 1:
query_text = query_text[0]
else:
query_text = query_text[1]
else:
bot.reply_to(message, HELP_TEXT)
return
else:
query_text = query_text[1]
elif message.chat.type != 'private':
return
# 获取 limiter
if WHITELIST_ID:
try:
limiter = WHITELIST_ID[message.from_user.id]
except KeyError:
try:
limiter = WHITELIST_ID[message.chat.id]
except KeyError:
bot.reply_to(message, '您没有权限使用此 Bot')
return
# 开始查询
msg = bot.reply_to(message, '正在查询...')
bot.send_chat_action(chat_id=message.chat.id, action='typing')
# 解析输入
try:
ips, nameservers, paragraph_input = parse_target(query_text)
except RuntimeError as e:
bot.edit_message_text(f'```IP-Info\n错误 {e.args[0]}```', message.chat.id, msg.message_id, parse_mode='Markdown')
return
# 查询
datas = []
for ip in ips:
try:
if limiter:
limiter.try_acquire(ip)
datas.append(ipplus360.query(ip, AIWEN_API_KEY, dns_nameservers=DNS_LIST, db_path=CZ_DATABASE))
except BucketFullException:
try:
cz_free = ipplus360.get_cz_from_db(ip, CZ_DATABASE)
except BaseException:
cz_free = None
datas.append(ipplus360.data_tuple({'ip': ip, 'msg': '超出速率限制,请稍后再提交'}, cz_free=cz_free))
if nameservers: # Domain 默认不查询所有 IP
datas += [ipplus360.data_tuple({'ip': ip, 'msg': '默认不查询所有 IP'}) for ip in ips[1:]]
break
datas = [ipplus360.parse_rawdata(data) for data in datas]
# 生成输出
output_text = []
if paragraph_input: # 整段输入
paragraph_input = escape(paragraph_input)
for ip in [i.details['IP'] for i in datas]:
paragraph_input = paragraph_input.replace(ip, f'`{ip}`')
output_text.append(paragraph_input.strip())
elif len(datas) > 1 and nameservers: # 多个 IP 的域名,DNS 前置
output_text.append('DNS: `' + ', '.join(nameservers) + '`')
if len(datas) > 1:
output_text += [gen_output_text(data) for data in datas]
else:
output_text.append(gen_output_text(datas[0], nameservers))
output_text = '\n\n'.join(output_text)
# 输出
edit_msg_with_button(output_text, msg)
@bot.callback_query_handler(func=lambda call: call.message.reply_to_message.from_user.id == call.from_user.id)
def on_button_click(call):
text = re.sub(r'(<pre>IP +.*\n(?:.+\n)*)错误 +.+(</pre>)', r'\1正在重新查询...\2', call.message.html_text)
text = text.replace('<pre>', '```IP-Info\n').replace('</pre>', '```')
text = text.replace('<code>', '`').replace('</code>', '`')
bot.edit_message_text(text.strip(), call.message.chat.id, call.message.message_id, parse_mode='Markdown')
bot.send_chat_action(chat_id=call.message.chat.id, action='typing')
# 获取 limiter
if WHITELIST_ID:
try:
limiter = WHITELIST_ID[call.from_user.id]
except KeyError:
limiter = WHITELIST_ID[call.message.reply_to_message.chat.id]
# 逐个对错误项重新查询
for match in re.finditer(r'```IP-Info\nIP +(.*)\n(?:.+\n)*正在重新查询...```', text):
old_text, ip = match.group(0), match.group(1)
try:
if limiter:
limiter.try_acquire(ip)
data = ipplus360.query(ip, AIWEN_API_KEY, dns_nameservers=DNS_LIST, db_path=CZ_DATABASE)
except BucketFullException:
try:
cz_free = ipplus360.get_cz_from_db(ip, CZ_DATABASE)
except BaseException:
cz_free = None
data = ipplus360.data_tuple({'ip': ip, 'msg': '超出速率限制,请稍后再提交'}, cz_free=cz_free)
output = gen_output_text(ipplus360.parse_rawdata(data))
text = text.replace(old_text, output)
# 输出
edit_msg_with_button(text, call.message)
bot.enable_save_next_step_handlers(delay=2, filename='./step.save')
bot.load_next_step_handlers(filename='./step.save')
bot.set_my_commands(
[telebot.types.BotCommand('ip', '查询 IP/域名 归属地'), telebot.types.BotCommand('help', '使用帮助')],
scope=telebot.types.BotCommandScopeAllGroupChats(),
)
bot.set_my_commands(
[telebot.types.BotCommand('help', '使用帮助')],
scope=telebot.types.BotCommandScopeAllPrivateChats(),
)
bot.remove_webhook()
if WEBHOOK_URL:
sleep(0.5)
WEBHOOK_SECRET = ''.join(
SystemRandom().choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(32)
)
bot.set_webhook(url=WEBHOOK_URL, secret_token=WEBHOOK_SECRET)
async def handle(request):
secret = request.headers.get('X-Telegram-Bot-Api-Secret-Token')
if secret == WEBHOOK_SECRET:
request_body_dict = await request.json()
update = telebot.types.Update.de_json(request_body_dict)
bot.process_new_updates([update])
return web.Response()
else:
return web.Response(status=403)
app = web.Application()
app.router.add_post('/', handle)
web.run_app(app, host=WEBHOOK_LISTEN_HOST, port=WEBHOOK_LISTEN_PORT)
else:
bot.infinity_polling()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment