Created
July 1, 2019 11:42
-
-
Save CERT-W/a340d29f0df7b0caacd0280da6d9c5b5 to your computer and use it in GitHub Desktop.
YesWeHack - LeHack challenge 2019
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pycurl | |
import os | |
import logging | |
from flask import ( | |
Flask, | |
request, | |
render_template, | |
render_template_string, | |
abort, | |
send_from_directory, | |
) | |
from pygments import highlight | |
from pygments.lexers import HtmlLexer | |
from pygments.formatters import HtmlFormatter | |
import pymemcache | |
from hmac import HMAC | |
from io import BytesIO | |
from hashlib import md5 | |
###### | |
# | |
# To read the flag a shell can be usefull ;) | |
# | |
###### | |
MAX_DOWNLOAD = 1024 * 100 | |
APP_SECRET = b"s3cr3t_h4shm4c_c0d3" | |
FULLSCREEN_HEADER = """ | |
<html> | |
<head> | |
<title>{{title}}</title> | |
<style> | |
body {padding: 15px;margin: 0} | |
{{css}} | |
</style> | |
</head> | |
<body> | |
""" | |
FULLSCREEN_FOOTER = "</body></html>" | |
class Utf8Cache(pymemcache.client.base.Client): | |
""" | |
Exactly like memcache but with utf8 values | |
""" | |
def get(self, key, default=None): | |
value = super().get(key, default) | |
if value is None: | |
return None | |
return value.decode("utf8") | |
def set(self, key, value): | |
return super().set(key, value.encode("utf8")) | |
app = Flask(__name__) | |
html_formater = HtmlFormatter(style="monokai", linenos="table") | |
html_lexer = HtmlLexer() | |
cache = Utf8Cache(("memcached", 11211)) | |
# cache = Utf8Cache(("127.0.0.1", 11211)) | |
@app.route("/robots.txt") | |
def static_from_root(): | |
return send_from_directory(app.static_folder, request.path[1:]) | |
def hash_url(s): | |
return HMAC(APP_SECRET, s.encode("utf8"), md5).hexdigest() | |
def highlight_file(code): | |
code = highlight(code, html_lexer, html_formater) | |
return code.replace("{", "{").replace("}", "}") | |
def progress_func(download_total, downloaded, upload_total, uploaded): | |
""" | |
Stop after downloading MAX_DOWNLOAD bytes. | |
""" | |
if downloaded > MAX_DOWNLOAD: | |
return True | |
return False | |
def fetch(url): | |
""" | |
Fetch an url with a timeout of 3 and a max_dowload of MAX_DOWNLOAD | |
return the html content | |
""" | |
with BytesIO() as buff: | |
curl = pycurl.Curl() | |
curl.setopt(curl.URL, url) | |
curl.setopt(curl.XFERINFOFUNCTION, progress_func) | |
curl.setopt(pycurl.TIMEOUT, 3) | |
curl.setopt(curl.NOPROGRESS, False) | |
curl.setopt(curl.WRITEDATA, buff) | |
try: | |
curl.perform() | |
html = buff.getvalue() | |
except pycurl.error as e: | |
# 42 == file is too big | |
# return the first MAX_DOWNLOAD bytes | |
if e.args[0] == 42: | |
html = buff.getvalue()[:MAX_DOWNLOAD] | |
else: | |
html = e.args[1] | |
except Exception as e: | |
print(e) | |
finally: | |
curl.close() | |
return html | |
@app.route("/full-bs/cache/<secret>", methods=["DELETE"]) | |
def delete(secret): | |
""" | |
If you want to keep your history secret, you can use this function | |
""" | |
cache_key = hash_url(secret) | |
cache.delete(cache_key) | |
return "OK" | |
@app.route("/full-bs/cache/<secret>") | |
def fullscreen(secret): | |
cache_key = hash_url(secret) | |
page_html = cache.get(cache_key) | |
if page_html is None: | |
return abort(404) | |
html = FULLSCREEN_HEADER + page_html + FULLSCREEN_FOOTER | |
css = html_formater.get_style_defs("body") | |
title = secret | |
return render_template_string(html, title=title, css=css) | |
def fetch_and_store(url, refresh): | |
""" | |
fetch and store url into cache | |
return secret | |
""" | |
secret = hash_url(url) | |
cache_key = hash_url(secret) | |
if not refresh and cache.get(cache_key) is not None: | |
return secret | |
raw_html = fetch(url) | |
if not raw_html: | |
raw_html = ":: Empty response ::" | |
html = highlight_file(raw_html) | |
cache.set(cache_key, html) | |
return secret | |
@app.route("/full-bs", methods=["GET", "POST"]) | |
def viewer(secret=None): | |
if request.method == "GET": | |
return render_template("viewer.htm") | |
refresh = request.form.get("refresh", False) | |
url = request.form.get("url", "") | |
app.logger.info("[%s] Accessing %s", request.headers.get("X-Forwarded-For"), url) | |
secret = fetch_and_store(url, refresh) | |
return render_template("viewer.htm", url=url, secret=secret) | |
@app.route("/") | |
def index(): | |
return render_template("index.htm") | |
if __name__ == "__main__": | |
app.logger.setLevel(logging.INFO) | |
app.run(debug=False, host="0.0.0.0", port=5000, threaded=True) | |
else: | |
gunicorn_logger = logging.getLogger("gunicorn.error") | |
app.logger.handlers = gunicorn_logger.handlers | |
app.logger.setLevel(gunicorn_logger.level) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment