-
-
Save FHantke/796e6f31a78f10503829873bd9713ad4 to your computer and use it in GitHub Desktop.
pooot challenge source
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, Response, send_from_directory, render_template, request, flash, redirect | |
from requests import get | |
from bs4 import BeautifulSoup | |
from flask_wtf import FlaskForm | |
from wtforms import StringField, SubmitField | |
from wtforms.fields.html5 import URLField | |
from wtforms.validators import DataRequired, url | |
from rq import Queue | |
from redis import Redis | |
from time import strftime | |
import traceback | |
import logging | |
import os | |
import socket | |
import re | |
from pygments import highlight | |
from pygments.lexers import PythonLexer, get_lexer_by_name | |
from pygments.formatters import HtmlFormatter | |
from worker import task | |
app = Flask('__main__', static_folder="/app/static", static_url_path="/static", template_folder='/app/templates' ) | |
app.config['SECRET_KEY'] = '\x1d\x07U\xcc\x04]\x9b=d&=>Z\xd6[\x08\xc5\xe2\x9a8R\x17\x06f' | |
REDIS_IP=os.environ.get("REDIS_IP") | |
redis_conn = Redis(host=REDIS_IP, port=6379, db=0) | |
q = Queue('my_queue', connection=redis_conn) | |
PROXY_URL="https://pooot.challenges.ooo" | |
class FeedbackForm(FlaskForm): | |
problem = StringField('Problem', validators=[DataRequired()]) | |
url = URLField("URL", validators=[url()]) | |
submit = SubmitField('Report') | |
class URLForm(FlaskForm): | |
url = URLField("URL", validators=[url()]) | |
submit = SubmitField('Submit') | |
@app.route('/', methods=['GET', 'POST']) | |
def index(): | |
form = URLForm() | |
if form.validate_on_submit(): | |
url = re.sub(r'http[s]*://', '', form.url.data) | |
app.logger.info(f"Redirecting to {url}") | |
return redirect(f'/{url}') | |
else: | |
app.logger.info(f"Form invalid for url: {form.url.data}") | |
return render_template("index.html", form=form) | |
@app.route('/<string:domain>/<path:path>') | |
@app.route('/<string:domain>') | |
def proxy(domain, path=''): | |
protocol = "https" | |
if request.headers.getlist("X-Forwarded-For"): | |
client_ip = request.headers.getlist("X-Forwarded-For")[0] | |
else: | |
client_ip = request.remote_addr | |
if isIP(domain): | |
protocol = "http" | |
if not client_ip.startswith("172.25.0.11"): | |
app.logger.error(f"Internal IP address {domain} from client {client_ip} not allowed." ) | |
return "Internal IP address not allowed", 400 | |
try: | |
app.logger.info(f"Fetching URL: {protocol}://{domain}/{path}") | |
response = get(f'{protocol}://{domain}/{path}', timeout=1) | |
except: | |
return "Could not reach this domain", 400 | |
content_type = response.headers['content-type'] | |
if "html" in content_type: | |
content = response.text | |
soup = BeautifulSoup(content, features="html.parser") | |
for link in soup.findAll(attrs={"src":True}): | |
if not link['src'].startswith("http"): | |
oldpath = link['src'] | |
if not oldpath.startswith("/"): | |
oldpath = f"/{oldpath}" | |
link['src'] = f"{PROXY_URL}/{domain}{oldpath}" | |
else: | |
link['src'] = re.sub(r'http[s]*://', PROXY_URL+"/", link['src'], flags=re.IGNORECASE) | |
for link in soup.findAll(href=True): | |
if not link['href'].startswith("http"): | |
oldpath = link['href'] | |
if not oldpath.startswith("/"): | |
oldpath = f"/{oldpath}" | |
link['href'] = f"{PROXY_URL}/{domain}{oldpath}" | |
head = soup.body | |
if head: | |
head.append(soup.new_tag('style', type='text/css')) | |
head.style.append(""" | |
footer { | |
display: flex; | |
justify-content: center; | |
padding: 5px; | |
color: #fff; | |
bottom: 0; | |
position: fixed; | |
} | |
""") | |
div_string = '<footer><a href="/feedback">Report a broken page</a></footer>' | |
div = BeautifulSoup(div_string, features="html.parser") | |
soup.html.insert(-1, div) | |
content = str(soup) | |
else: | |
content = response.content | |
return Response(content, mimetype=content_type) | |
@app.route('/source') | |
def source(): | |
with open(__file__, "r") as f: | |
code = f.read() | |
lexer = get_lexer_by_name("python", stripall=True) | |
formatter = HtmlFormatter(linenos=True, cssclass="source", style='friendly', full=True) | |
return highlight(code, lexer, formatter) | |
@app.route('/feedback', methods=['GET', 'POST']) | |
def feedback(): | |
form = FeedbackForm() | |
if form.validate_on_submit(): | |
if "172.25" in form.url.data: | |
flash('All internal servers are working fine!') | |
app.logger.info('Ignored URL: %s' % (form.url.data)) | |
return redirect('/') | |
flash('Feedback form submitted {}:{}'.format( | |
form.problem.data, form.url.data)) | |
url = re.sub(r'http[s]*://', '', form.url.data) | |
job = q.enqueue( | |
task, | |
url | |
) | |
app.logger.info('Reported URL: %s' % (form.url.data)) | |
return redirect('/') | |
return render_template('feedback.html', title='Feedback Form', feedform=form) | |
@app.after_request | |
def after_request(response): | |
""" Logging after every request. """ | |
# This avoids the duplication of registry in the log, | |
# since that 500 is already logged via @app.errorhandler. | |
if response.status_code != 500: | |
ts = strftime('[%Y-%b-%d %H:%M]') | |
app.logger.info('%s %s %s %s %s %s', | |
ts, | |
request.remote_addr, | |
request.method, | |
request.scheme, | |
request.full_path, | |
response.status) | |
return response | |
@app.errorhandler(404) | |
def page_not_found(e): | |
# note that we set the 404 status explicitly | |
return "404 not found", 404 | |
@app.errorhandler(Exception) | |
def exceptions(e): | |
""" Logging after every Exception. """ | |
ts = strftime('[%Y-%b-%d %H:%M]') | |
tb = traceback.format_exc() | |
app.logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s', | |
ts, | |
request.remote_addr, | |
request.method, | |
request.scheme, | |
request.full_path, | |
tb) | |
return "Internal Server Error", 500 | |
def isIP(domain): | |
if ":" in domain: | |
domain = domain.split(":")[0] | |
try: | |
if socket.gethostbyname(domain) == domain: | |
return True | |
except: | |
pass | |
return False | |
if __name__ != '__main__': | |
import logging | |
from logging.handlers import RotatingFileHandler | |
from flask.logging import default_handler | |
app.logger.removeHandler(default_handler) | |
gunicorn_logger = logging.getLogger('gunicorn.error') | |
file_handler = RotatingFileHandler('logs/flask.log', maxBytes=1024 * 1024 * 100, backupCount=200) | |
file_handler.setLevel(logging.INFO) | |
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
file_handler.setFormatter(formatter) | |
app.logger.addHandler(file_handler) | |
gunicorn_logger.addHandler(file_handler) | |
if __name__ == "__main__": | |
app.run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment