Last active
February 9, 2021 13:17
-
-
Save FrankSpierings/af2be328e5d7c734b8f1277399f59dc6 to your computer and use it in GitHub Desktop.
Burp extension to minify a requests headers and parameters to another repeater tab.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from burp import IParameter | |
from burp import IBurpExtender | |
from burp import IContextMenuFactory | |
from burp import IContextMenuInvocation | |
from javax.swing import JMenuItem | |
import java.util.ArrayList as ArrayList | |
from threading import Thread | |
from Queue import Queue | |
from traceback import format_exc | |
import time | |
EXTENSION_NAME = "Request minify" | |
MAX_THREADS = 2 | |
WORKER_SLEEP = 1 | |
class Job(object): | |
def __init__(self, nr, target, args=None, callback=None): | |
self.nr = nr | |
self.target = target | |
self.args = args | |
self.callback = callback | |
class BurpExtender(IBurpExtender, IContextMenuFactory): | |
def log(self, message): | |
message = str(message) | |
self.callbacks.printOutput(message) | |
# Registers the extension in Burp | |
def registerExtenderCallbacks(self, callbacks): | |
self.callbacks = callbacks | |
self.helpers = callbacks.getHelpers() | |
callbacks.setExtensionName(EXTENSION_NAME) | |
callbacks.registerContextMenuFactory(self) | |
self.log("[+] Extension registered: {0}".format(EXTENSION_NAME)) | |
# Start background threads that will keep running while this | |
# extension is running, to process jobs in the job_queue. | |
self._job_number = 0 | |
self._job_queue = Queue(maxsize=0) | |
self._threads = [] | |
for i in range(MAX_THREADS): | |
thread = Thread(target=self.worker, args=(i, self._job_queue)) | |
thread.start() | |
self._threads.append(thread) | |
for thread in self._threads: | |
thread.join() | |
return | |
# Registers the menu item in the context menu | |
def createMenuItems(self, ctxMenuInvocation): | |
ctx = ctxMenuInvocation.getInvocationContext() | |
ctx_req = IContextMenuInvocation.CONTEXT_MESSAGE_EDITOR_REQUEST | |
if ctx == ctx_req: | |
self._ctxMenuInvocation = ctxMenuInvocation | |
menuItems = ArrayList() | |
menuItems.add(JMenuItem("Request minify", | |
actionPerformed=self.action_minify)) | |
return menuItems | |
# Worker which is used in the threadpool. | |
def worker(self, threadid, job_queue, result_queue=None): | |
try: | |
self.log("Worker '{0}' started".format(threadid)) | |
while True: | |
if not job_queue.empty(): | |
job = job_queue.get() | |
result = job.target(*job.args) | |
if job.callback: | |
job.callback(result) | |
else: | |
time.sleep(WORKER_SLEEP) | |
except Exception: | |
self.callbacks.printError(format_exc()) | |
self.log("Worker '{0}' stopped".format(threadid)) | |
# The action handler, which places minify jobs in the job_queue | |
def action_minify(self, event): | |
httpRequestResponse = self._ctxMenuInvocation.getSelectedMessages()[-1] | |
request = Request(httpRequestResponse, self.callbacks) | |
self._job_number += 1 | |
job = Job(1, self.job_minify, args=(request, self._job_number)) | |
self._job_queue.put(job) | |
# The minify job which is executed in a seperate thread. | |
def job_minify(self, request, jobnr): | |
# Check the baseline | |
response1 = request.execute().getResponse() | |
response2 = request.execute().getResponse() | |
base_variation = self.helpers.analyzeResponseVariations( | |
(response1, response2)).getVariantAttributes() | |
# Minimize the headers first | |
base_headers = request.headers | |
required_headers = [] | |
for i in range(len(base_headers)): | |
# copy headers, don't reference the base_headers so we can pop. | |
copy_headers = list(base_headers) | |
copy_headers.pop(i) | |
request.headers = copy_headers | |
response2 = request.execute().getResponse() | |
variation = self.helpers.analyzeResponseVariations( | |
(response1, response2)).getVariantAttributes() | |
# If there is a variation, we can conclude the header we removed | |
# is important. | |
if (len(base_variation) != len(variation)): | |
required_headers.append(base_headers[i]) | |
request.headers = required_headers | |
# Minimize the parameters | |
base_parameters = request.parameters | |
required_parameters = [] | |
for i in range(len(base_parameters)): | |
# copy parameters, don't reference the base_parameters so we can pop. | |
copy_parameters = list(base_parameters) | |
copy_parameters.pop(i) | |
request.parameters = copy_parameters | |
response2 = request.execute().getResponse() | |
variation = self.helpers.analyzeResponseVariations( | |
(response1, response2)).getVariantAttributes() | |
# If there is a variation, we can conclude the cookie we removed | |
# is important. | |
if (len(base_variation) != len(variation)): | |
required_parameters.append(base_parameters[i]) | |
request.parameters = required_parameters | |
# Place the minimized request in a new Repeater window | |
request.to_repeater('Mini-{0:03d}'.format(jobnr)) | |
class Request(object): | |
def __init__(self, httpRequestResponse, callbacks): | |
self.callbacks = callbacks | |
self.helpers = callbacks.getHelpers() | |
self._httpRequestResponse = httpRequestResponse | |
self._raw_request = self._httpRequestResponse.getRequest() | |
@property | |
def host(self): | |
return self.httpservice.getHost() | |
@property | |
def port(self): | |
return self.httpservice.getPort() | |
@property | |
def ssl(self): | |
if self.httpservice.getProtocol().lower() == 'https': | |
return True | |
else: | |
return False | |
@property | |
def httpservice(self): | |
return self._httpRequestResponse.getHttpService() | |
def to_repeater(self, name): | |
self.callbacks.sendToRepeater( | |
self.host, self.port, self.ssl, self._raw_request, name) | |
@property | |
def _analyzer(self): | |
return self.helpers.analyzeRequest(self._raw_request) | |
@property | |
def headers(self): | |
headers = self._analyzer.getHeaders() | |
if len(headers) > 0: | |
ret_headers = [] | |
# list of key value pairs. | |
# (there could be headers with the same name/key) | |
for header in headers[1:]: | |
split = header.split(':') | |
key = split[0] | |
value = ':'.join(split[1:]).lstrip() | |
ret_headers.append((key, value)) | |
return ret_headers | |
return None | |
@headers.setter | |
def headers(self, value): | |
new_headers = [self._analyzer.getHeaders()[0]] | |
for k, v in value: | |
new_headers.append('{0}: {1}'.format(k, v)) | |
self._raw_request = self.helpers.buildHttpMessage( | |
new_headers, self.body) | |
@property | |
def cookies(self): | |
params = self._analyzer.getParameters() | |
cookies = [(p.getName(), p.getValue()) | |
for p in params if p.getType() == IParameter.PARAM_COOKIE] | |
return cookies | |
@cookies.setter | |
def cookies(self, value): | |
request = self._raw_request | |
# Delete current cookies | |
params = self._analyzer.getParameters() | |
cookies = [p for p in params if p.getType() == IParameter.PARAM_COOKIE] | |
for cookie in cookies: | |
request = self.helpers.removeParameter(request, cookie) | |
# Rebuild new cookie parameters | |
cookies = [self.helpers.buildParameter( | |
p[0], p[1], IParameter.PARAM_COOKIE) for p in value] | |
for cookie in cookies: | |
request = self.helpers.addParameter(request, cookie) | |
self._raw_request = request | |
@property | |
def body(self): | |
request = self._raw_request.tostring() | |
body = request[self._analyzer.getBodyOffset():] | |
return body | |
def execute(self): | |
service = self._httpRequestResponse.getHttpService() | |
httpRequestResponse = self.callbacks.makeHttpRequest( | |
service, self._raw_request) | |
return httpRequestResponse | |
@property | |
def parameters(self): | |
params = self._analyzer.getParameters() | |
return [self.Parameter(p) for p in params] | |
@parameters.setter | |
def parameters(self, parameters): | |
request = self._raw_request | |
# Delete current parameters | |
iparams = self._analyzer.getParameters() | |
for iparam in iparams: | |
request = self.helpers.removeParameter(request, iparam) | |
# Add the given parameters | |
for param in parameters: | |
request = self.helpers.addParameter(request, param._iparameter) | |
self._raw_request = request | |
class Parameter(object): | |
def __init__(self, iparameter): | |
self._iparameter = iparameter | |
@property | |
def name(self): | |
return self._iparameter.getName() | |
@property | |
def value(self): | |
return self._iparameter.getValue() | |
@property | |
def type(self): | |
ptype = self._iparameter.getType() | |
if ptype == IParameter.PARAM_BODY: | |
return "body" | |
elif ptype == IParameter.PARAM_COOKIE: | |
return "cookie" | |
elif ptype == IParameter.PARAM_JSON: | |
return "json" | |
elif ptype == IParameter.PARAM_MULTIPART_ATTR: | |
return "multipart" | |
elif ptype == IParameter.PARAM_URL: | |
return "url" | |
elif ptype == IParameter.PARAM_XML: | |
return "xml" | |
elif ptype == IParameter.PARAM_XML_ATTR: | |
return "xml-attribute" | |
else: | |
return "unknown" | |
def __str__(self): | |
return "{0}: {1}={2}".format(self.type, self.name, self.value) | |
def __repr__(self): | |
return self.__str__() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment