-
-
Save w181496/5a8b3398ae380419999e429bee64171f to your computer and use it in GitHub Desktop.
gctf_ct.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import werkzeug_raw | |
import json | |
import os | |
import re | |
import six | |
from polyfill import HTTPGenerator | |
from email.encoders import encode_noop | |
from email.mime.application import MIMEApplication | |
from email.mime.multipart import MIMEMultipart | |
from flask import request, abort, current_app | |
HEADERS = {"Content-Type": "application/json"} | |
CRLF = '\r\n' | |
class MIMEApplicationHTTPRequest(MIMEApplication, object): | |
def __init__(self, method, path, headers, body): | |
if isinstance(body, dict): | |
body = json.dumps(body) | |
headers['Content-Type'] = 'application/json' | |
headers['Content-Length'] = len(body) | |
body = body or '' | |
request_line = '{method} {path} HTTP/1.1' | |
lines = [request_line.format(method=method, path=path)] | |
lines += ['{k}: {v}'.format(k=k, v=v) for k, v in headers.items()] | |
lines.append('') | |
lines.append(body) | |
request = CRLF.join(lines) | |
super(MIMEApplicationHTTPRequest, self).__init__( | |
request, 'http', encode_noop | |
) | |
class MIMEApplicationHTTPResponse(MIMEApplication, object): | |
def __init__(self, status, headers, body): | |
if isinstance(body, dict): | |
body = json.dumps(body) | |
headers['Content-Type'] = 'application/json' | |
headers['Content-Length'] = len(body) | |
body = body or '' | |
response_line = 'HTTP/1.1 {status}' | |
lines = [response_line.format(status=status)] | |
lines += ['{k}: {v}'.format(k=k, v=v) for k, v in headers.items()] | |
lines.append('') | |
lines.append(body) | |
response = CRLF.join(lines) | |
super(MIMEApplicationHTTPResponse, self).__init__( | |
response, 'http', encode_noop | |
) | |
def strip_headers(bb): | |
headers, body = bb.split(b'\r\n\r\n', 1) | |
headers = headers.replace(b"\r", b"").split(b"\n") | |
content_id = None | |
for h in headers: | |
if h.lower().startswith(b"content-id"): | |
_, content_id = h.split(b":") | |
content_id = content_id.strip() | |
return content_id, body | |
def unquote(s): | |
s = s[1:] if s.startswith(b'"') else s | |
s = s[:-1] if s.endswith(b'"') else s | |
return s | |
def parse_multi(content_type, multi): | |
boundary_raw = get_boundary(content_type) | |
if not boundary_raw: | |
abort(500) | |
boundary = b"--" + unquote(boundary_raw.encode("ascii")) | |
payloads = multi.split(boundary)[1:-1] | |
return [strip_headers(payload) for payload in payloads] | |
def prepare_batch_response(responses, boundary): | |
if len(responses) == 0: | |
raise ValueError("Provide at least one response") | |
batch = MIMEMultipart(boundary=boundary) | |
for status, headers, body in responses: | |
subrequest = MIMEApplicationHTTPResponse( | |
status, headers, body) | |
batch.attach(subrequest) | |
buf = six.StringIO() | |
generator = HTTPGenerator(buf, False, 0) | |
generator.flatten(batch) | |
payload = buf.getvalue() | |
# Strip off redundant header text | |
_, body = payload.split('\r\n\r\n', 1) | |
return dict(batch._headers), body | |
content_type_regexes = 'multipart/[a-z]+(?:-[a-z]+)*(?:[;,\\s]\\s*[a-z]+(?:-[a-z]+)*=(?:"(?:[^"]|\\")+"|\'(?:[^\']|\\\')+\'|[^"\':;,\\s]+))*[;,\\s]*boundary=(?:"([^"]+)"|\'([^\']+)\'|([^"\':;,\\s]+))(?:[;,\\s]\\s*[a-z]+(?:-[a-z]+)*=(?:"(?:[^"]|\\")+"|\'(?:[^\']|\\\')+\'|[^"\':;,\\s]+))*[;,]?' | |
def validate_content_type(ct): | |
_rex = re.compile(content_type_regexes, re.I) | |
if not _rex.fullmatch(ct): | |
return False | |
return True | |
def get_boundary(ct): | |
_rex = re.compile(content_type_regexes, re.I) | |
m = _rex.fullmatch(ct) | |
if not m: | |
return | |
for v in m.groups(): | |
if v: | |
return v | |
def batch(): | |
""" | |
Execute multiple requests, submitted as a batch. | |
""" | |
responses = [] | |
app = current_app | |
data = request.stream.read() | |
body = None | |
content_type = request.environ["CONTENT_TYPE"] | |
if content_type == 'text/plain' and request.args.get('ct'): | |
content_type = request.args.get('ct') | |
if not content_type.startswith("multipart/mixed") or not validate_content_type(content_type): | |
abort(400) | |
multi = parse_multi(content_type, data) | |
boundary = 'batch_' + os.urandom(8).hex() | |
for content_id, payload in multi: | |
environ = werkzeug_raw.environ(payload) | |
# ensure we only issue requests against the same host | |
# as the original request | |
environ.update({"SERVER_NAME": request.environ["SERVER_NAME"]}) | |
environ.update({"SERVER_PORT": request.environ["SERVER_PORT"]}) | |
environ.update({"HTTP_HOST": request.environ["HTTP_HOST"]}) | |
# if we don't have content-length | |
environ.update({"wsgi.input_terminated": 1}) | |
with app.request_context(environ): | |
try: | |
rv = app.preprocess_request() | |
if rv is None: | |
rv = app.dispatch_request() | |
except Exception as e: | |
rv = app.handle_user_exception(e) | |
response = app.make_response(rv) | |
response = app.process_response(response) | |
if content_id: | |
response.headers.extend({"Content-ID": content_id}) | |
responses.append(( | |
response.status, | |
response.headers, | |
response.json | |
)) | |
headers, body = prepare_batch_response(responses, boundary) | |
if body is None: | |
abort(500) | |
# set content-type | |
if 'boundary=' in content_type: | |
new_ct = content_type[:content_type.index('boundary=')] + 'boundary=' + boundary | |
else: | |
new_ct = content_type + '; boundary=' + boundary | |
headers["Content-Type"] = new_ct | |
return body, 200, headers |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment