Skip to content

Instantly share code, notes, and snippets.

@w181496 w181496/ Secret
Created Nov 6, 2019

What would you like to do?
import werkzeug_raw
import json
import os
import re
import six
from polyfill import HTTPGenerator
from email.encoders import encode_noop
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from flask import request, abort, current_app
HEADERS = {"Content-Type": "application/json"}
CRLF = '\r\n'
class MIMEApplicationHTTPRequest(MIMEApplication, object):
def __init__(self, method, path, headers, body):
if isinstance(body, dict):
body = json.dumps(body)
headers['Content-Type'] = 'application/json'
headers['Content-Length'] = len(body)
body = body or ''
request_line = '{method} {path} HTTP/1.1'
lines = [request_line.format(method=method, path=path)]
lines += ['{k}: {v}'.format(k=k, v=v) for k, v in headers.items()]
request = CRLF.join(lines)
super(MIMEApplicationHTTPRequest, self).__init__(
request, 'http', encode_noop
class MIMEApplicationHTTPResponse(MIMEApplication, object):
def __init__(self, status, headers, body):
if isinstance(body, dict):
body = json.dumps(body)
headers['Content-Type'] = 'application/json'
headers['Content-Length'] = len(body)
body = body or ''
response_line = 'HTTP/1.1 {status}'
lines = [response_line.format(status=status)]
lines += ['{k}: {v}'.format(k=k, v=v) for k, v in headers.items()]
response = CRLF.join(lines)
super(MIMEApplicationHTTPResponse, self).__init__(
response, 'http', encode_noop
def strip_headers(bb):
headers, body = bb.split(b'\r\n\r\n', 1)
headers = headers.replace(b"\r", b"").split(b"\n")
content_id = None
for h in headers:
if h.lower().startswith(b"content-id"):
_, content_id = h.split(b":")
content_id = content_id.strip()
return content_id, body
def unquote(s):
s = s[1:] if s.startswith(b'"') else s
s = s[:-1] if s.endswith(b'"') else s
return s
def parse_multi(content_type, multi):
boundary_raw = get_boundary(content_type)
if not boundary_raw:
boundary = b"--" + unquote(boundary_raw.encode("ascii"))
payloads = multi.split(boundary)[1:-1]
return [strip_headers(payload) for payload in payloads]
def prepare_batch_response(responses, boundary):
if len(responses) == 0:
raise ValueError("Provide at least one response")
batch = MIMEMultipart(boundary=boundary)
for status, headers, body in responses:
subrequest = MIMEApplicationHTTPResponse(
status, headers, body)
buf = six.StringIO()
generator = HTTPGenerator(buf, False, 0)
payload = buf.getvalue()
# Strip off redundant header text
_, body = payload.split('\r\n\r\n', 1)
return dict(batch._headers), body
content_type_regexes = 'multipart/[a-z]+(?:-[a-z]+)*(?:[;,\\s]\\s*[a-z]+(?:-[a-z]+)*=(?:"(?:[^"]|\\")+"|\'(?:[^\']|\\\')+\'|[^"\':;,\\s]+))*[;,\\s]*boundary=(?:"([^"]+)"|\'([^\']+)\'|([^"\':;,\\s]+))(?:[;,\\s]\\s*[a-z]+(?:-[a-z]+)*=(?:"(?:[^"]|\\")+"|\'(?:[^\']|\\\')+\'|[^"\':;,\\s]+))*[;,]?'
def validate_content_type(ct):
_rex = re.compile(content_type_regexes, re.I)
if not _rex.fullmatch(ct):
return False
return True
def get_boundary(ct):
_rex = re.compile(content_type_regexes, re.I)
m = _rex.fullmatch(ct)
if not m:
for v in m.groups():
if v:
return v
def batch():
Execute multiple requests, submitted as a batch.
responses = []
app = current_app
data =
body = None
content_type = request.environ["CONTENT_TYPE"]
if content_type == 'text/plain' and request.args.get('ct'):
content_type = request.args.get('ct')
if not content_type.startswith("multipart/mixed") or not validate_content_type(content_type):
multi = parse_multi(content_type, data)
boundary = 'batch_' + os.urandom(8).hex()
for content_id, payload in multi:
environ = werkzeug_raw.environ(payload)
# ensure we only issue requests against the same host
# as the original request
environ.update({"SERVER_NAME": request.environ["SERVER_NAME"]})
environ.update({"SERVER_PORT": request.environ["SERVER_PORT"]})
environ.update({"HTTP_HOST": request.environ["HTTP_HOST"]})
# if we don't have content-length
environ.update({"wsgi.input_terminated": 1})
with app.request_context(environ):
rv = app.preprocess_request()
if rv is None:
rv = app.dispatch_request()
except Exception as e:
rv = app.handle_user_exception(e)
response = app.make_response(rv)
response = app.process_response(response)
if content_id:
response.headers.extend({"Content-ID": content_id})
headers, body = prepare_batch_response(responses, boundary)
if body is None:
# set content-type
if 'boundary=' in content_type:
new_ct = content_type[:content_type.index('boundary=')] + 'boundary=' + boundary
new_ct = content_type + '; boundary=' + boundary
headers["Content-Type"] = new_ct
return body, 200, headers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.