Skip to content

Instantly share code, notes, and snippets.

@sorz
Created Aug 14, 2020
Embed
What would you like to do?
Lastest update on 2019-10-31. Mocked service may have changed since then. Compatibility issues expected.
#!/usr/bin/env python3
import asyncio
import logging
from aiohttp import web
from aiodnsresolver import Resolver, TYPES, DnsRecordDoesNotExist
from des import DesKey
TENCENT_KEYS = {
'000': b'DES-KEY1',
'111': b'DES-KEY2',
'222': b'DES-KEY3',
}
async def resolve(request: web.Request) -> (str, [str], int):
host = request.query.get('host') or request.query.get('dn')
if host is None:
raise web.HTTPBadRequest(text='missing query')
if not '.' in host:
logging.info('not a domain: %s', host)
raise web.HTTPBadRequest(text='not a domain')
try:
ips, ttl = await resolver.query(host)
except DnsRecordDoesNotExist:
logging.info('NXDOMAIN: %s', host)
raise web.HTTPNotFound(text='NXDOMAIN')
return host, ips, ttl
async def handle_aliyun(request: web.Request) -> web.Response:
host, ips, ttl = await resolve(request)
return web.json_response(dict(host=host, ips=ips, ttl=ttl))
async def handle_tencent(request: web.Request) -> web.Response:
key_id = request.query.get('id')
ttl_enabled = request.query.get('ttl') == '1'
if key_id is None:
_, ips, ttl = await resolve(request)
else:
key = TENCENT_KEYS.get(key_id)
if key is None:
logging.info('missing key: %s', key_id)
raise web.HTTPNotFound(text='Missing keys')
cipher = DesKey(key)
host = request.query.get('host') or request.query.get('dn')
host = bytes.fromhex(host)
host = cipher.decrypt(host, padding=True).decode('utf8')
ips, ttl = await resolver.query(host)
if ttl_enabled:
resp = ';'.join(f'{ip},{ttl}' for ip in ips)
else:
resp = ';'.join(ips)
if key_id is not None:
resp = cipher.encrypt(resp.encode('utf8'), padding=True).hex()
return web.Response(text=resp)
async def handle_zhihu(request: web.Request) -> web.Response:
host = request.query.get('host')
if ',' not in host:
host, ips, ttl = await resolve(request)
resp = dict(host=host, ips=ips, ttl=ttl, origin_ttl=ttl)
else:
resp = []
for host in host.split(','):
ips, ttl = await resolver.query(host)
resp.append(dict(host=host, ips=ips, ttl=ttl, origin_ttl=ttl))
return web.json_response(dict(dns=resp))
class HostChooser:
def __init__(self):
self._host_handler = {}
async def do_route(self, request: web.Request):
handler = self._host_handler.get(request.host)
if handler is not None:
return await handler(request)
logging.info('Unknown host: %s', request.host)
raise web.HTTPNotFound()
def add(self, host: str, handler):
self._host_handler[host] = handler
class DNSResolver:
_resolve = None
_loop = None
async def query(self, host: str) -> ([str], int):
if self._resolve is None:
self._resolve, _ = Resolver()
if self._loop is None:
self._loop = asyncio.get_event_loop()
ips = await self._resolve(host, TYPES.A)
ttl = int(max(0, min(ip.expires_at for ip in ips) - self._loop.time()))
ips = [str(ip) for ip in ips]
return ips, ttl
app = web.Application()
resolver = DNSResolver()
hosts = HostChooser()
app.add_routes([web.get('/{path:.*}', hosts.do_route)])
hosts.add('203.107.1.33', handle_aliyun)
hosts.add('203.107.1.34', handle_aliyun)
hosts.add('203.107.1.65', handle_aliyun)
hosts.add('203.107.1.66', handle_aliyun)
hosts.add('119.29.29.29', handle_tencent)
hosts.add('118.89.204.198', handle_zhihu)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.getLogger('aiodnsresolver').setLevel(logging.WARN)
web.run_app(app, host='127.0.0.80', port=8011)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment