Skip to content

Instantly share code, notes, and snippets.

@rckclmbr
Created July 29, 2017 05:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rckclmbr/3731fe456529787f1a0824bd26d65407 to your computer and use it in GitHub Desktop.
Save rckclmbr/3731fe456529787f1a0824bd26d65407 to your computer and use it in GitHub Desktop.
echo.py
import asyncio
from aiohttp import web
from cachetools import TTLCache
import json
import uuid
import pprint
cache = TTLCache(maxsize=10, ttl=300)
async def retrace(request):
try:
key = request.GET.get("key", None)
cache_item = cache[key]
headers = cache_item['headers']
if "Content-Type" in headers:
del headers['Content-Type']
return web.Response(text=cache_item['body'],
headers=headers,
charset=cache_item['charset'])
except KeyError:
return web.Response(text="(no data)", content_type="application/json")
async def echo_get(request):
key = uuid.uuid4().hex
response_body = """
Retrace key: {}
Path: {}
"""\
.format(
key,
request.path_qs,
"\n".join(
["{}: {}".format(t, v) for t, v in request.headers.items()],
),
)
cache[key] = dict(
body=None,
headers=request.headers.items(),
)
headers = dict(request.headers.items())
if "Cookie" in headers:
headers['Set-Cookie'] = headers['Cookie']+'; domain=echo.xr6.me'
return web.Response(text=response_body,
headers=headers)
async def echo_post(request):
request_body = await request.text()
request_body = request_body.replace('\\', '\\\\')
key = uuid.uuid4().hex
qs = request.rel_url.query_string
response_body = """
Retrace key: {}
Path: /{}
Body:
{}
"""\
.format(
key,
str(request.match_info['path']) + "?"+qs if qs else "",
request_body,
)
print(response_body)
headers = dict(request.headers.items())
if "Cookie" in headers:
headers['Set-Cookie'] = headers['Cookie']+'; domain=echo.xr6.me'
cache[key] = dict(
body=request_body,
headers=headers,
charset=request.charset,
)
return web.Response(text=response_body,
headers=headers)
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/_retrace', retrace)
app.router.add_route('GET', r'/{path:.*}', echo_get)
app.router.add_route('POST', r'/{path:.*}', echo_post)
app.router.add_route('PUT', r'/{path:.*}', echo_post)
app.router.add_route('DELETE', r'/{path:.*}', echo_post)
handler = app.make_handler()
srv = await loop.create_server(handler, '127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return srv, handler
loop = asyncio.get_event_loop()
srv, handler = loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(handler.finish_connections())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment