Skip to content

Instantly share code, notes, and snippets.

@FrankSpierings
Last active February 9, 2021 13:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FrankSpierings/af2be328e5d7c734b8f1277399f59dc6 to your computer and use it in GitHub Desktop.
Save FrankSpierings/af2be328e5d7c734b8f1277399f59dc6 to your computer and use it in GitHub Desktop.
Burp extension to minify a requests headers and parameters to another repeater tab.
from burp import IParameter
from burp import IBurpExtender
from burp import IContextMenuFactory
from burp import IContextMenuInvocation
from javax.swing import JMenuItem
import java.util.ArrayList as ArrayList
from threading import Thread
from Queue import Queue
from traceback import format_exc
import time
EXTENSION_NAME = "Request minify"
MAX_THREADS = 2
WORKER_SLEEP = 1
class Job(object):
def __init__(self, nr, target, args=None, callback=None):
self.nr = nr
self.target = target
self.args = args
self.callback = callback
class BurpExtender(IBurpExtender, IContextMenuFactory):
def log(self, message):
message = str(message)
self.callbacks.printOutput(message)
# Registers the extension in Burp
def registerExtenderCallbacks(self, callbacks):
self.callbacks = callbacks
self.helpers = callbacks.getHelpers()
callbacks.setExtensionName(EXTENSION_NAME)
callbacks.registerContextMenuFactory(self)
self.log("[+] Extension registered: {0}".format(EXTENSION_NAME))
# Start background threads that will keep running while this
# extension is running, to process jobs in the job_queue.
self._job_number = 0
self._job_queue = Queue(maxsize=0)
self._threads = []
for i in range(MAX_THREADS):
thread = Thread(target=self.worker, args=(i, self._job_queue))
thread.start()
self._threads.append(thread)
for thread in self._threads:
thread.join()
return
# Registers the menu item in the context menu
def createMenuItems(self, ctxMenuInvocation):
ctx = ctxMenuInvocation.getInvocationContext()
ctx_req = IContextMenuInvocation.CONTEXT_MESSAGE_EDITOR_REQUEST
if ctx == ctx_req:
self._ctxMenuInvocation = ctxMenuInvocation
menuItems = ArrayList()
menuItems.add(JMenuItem("Request minify",
actionPerformed=self.action_minify))
return menuItems
# Worker which is used in the threadpool.
def worker(self, threadid, job_queue, result_queue=None):
try:
self.log("Worker '{0}' started".format(threadid))
while True:
if not job_queue.empty():
job = job_queue.get()
result = job.target(*job.args)
if job.callback:
job.callback(result)
else:
time.sleep(WORKER_SLEEP)
except Exception:
self.callbacks.printError(format_exc())
self.log("Worker '{0}' stopped".format(threadid))
# The action handler, which places minify jobs in the job_queue
def action_minify(self, event):
httpRequestResponse = self._ctxMenuInvocation.getSelectedMessages()[-1]
request = Request(httpRequestResponse, self.callbacks)
self._job_number += 1
job = Job(1, self.job_minify, args=(request, self._job_number))
self._job_queue.put(job)
# The minify job which is executed in a seperate thread.
def job_minify(self, request, jobnr):
# Check the baseline
response1 = request.execute().getResponse()
response2 = request.execute().getResponse()
base_variation = self.helpers.analyzeResponseVariations(
(response1, response2)).getVariantAttributes()
# Minimize the headers first
base_headers = request.headers
required_headers = []
for i in range(len(base_headers)):
# copy headers, don't reference the base_headers so we can pop.
copy_headers = list(base_headers)
copy_headers.pop(i)
request.headers = copy_headers
response2 = request.execute().getResponse()
variation = self.helpers.analyzeResponseVariations(
(response1, response2)).getVariantAttributes()
# If there is a variation, we can conclude the header we removed
# is important.
if (len(base_variation) != len(variation)):
required_headers.append(base_headers[i])
request.headers = required_headers
# Minimize the parameters
base_parameters = request.parameters
required_parameters = []
for i in range(len(base_parameters)):
# copy parameters, don't reference the base_parameters so we can pop.
copy_parameters = list(base_parameters)
copy_parameters.pop(i)
request.parameters = copy_parameters
response2 = request.execute().getResponse()
variation = self.helpers.analyzeResponseVariations(
(response1, response2)).getVariantAttributes()
# If there is a variation, we can conclude the cookie we removed
# is important.
if (len(base_variation) != len(variation)):
required_parameters.append(base_parameters[i])
request.parameters = required_parameters
# Place the minimized request in a new Repeater window
request.to_repeater('Mini-{0:03d}'.format(jobnr))
class Request(object):
def __init__(self, httpRequestResponse, callbacks):
self.callbacks = callbacks
self.helpers = callbacks.getHelpers()
self._httpRequestResponse = httpRequestResponse
self._raw_request = self._httpRequestResponse.getRequest()
@property
def host(self):
return self.httpservice.getHost()
@property
def port(self):
return self.httpservice.getPort()
@property
def ssl(self):
if self.httpservice.getProtocol().lower() == 'https':
return True
else:
return False
@property
def httpservice(self):
return self._httpRequestResponse.getHttpService()
def to_repeater(self, name):
self.callbacks.sendToRepeater(
self.host, self.port, self.ssl, self._raw_request, name)
@property
def _analyzer(self):
return self.helpers.analyzeRequest(self._raw_request)
@property
def headers(self):
headers = self._analyzer.getHeaders()
if len(headers) > 0:
ret_headers = []
# list of key value pairs.
# (there could be headers with the same name/key)
for header in headers[1:]:
split = header.split(':')
key = split[0]
value = ':'.join(split[1:]).lstrip()
ret_headers.append((key, value))
return ret_headers
return None
@headers.setter
def headers(self, value):
new_headers = [self._analyzer.getHeaders()[0]]
for k, v in value:
new_headers.append('{0}: {1}'.format(k, v))
self._raw_request = self.helpers.buildHttpMessage(
new_headers, self.body)
@property
def cookies(self):
params = self._analyzer.getParameters()
cookies = [(p.getName(), p.getValue())
for p in params if p.getType() == IParameter.PARAM_COOKIE]
return cookies
@cookies.setter
def cookies(self, value):
request = self._raw_request
# Delete current cookies
params = self._analyzer.getParameters()
cookies = [p for p in params if p.getType() == IParameter.PARAM_COOKIE]
for cookie in cookies:
request = self.helpers.removeParameter(request, cookie)
# Rebuild new cookie parameters
cookies = [self.helpers.buildParameter(
p[0], p[1], IParameter.PARAM_COOKIE) for p in value]
for cookie in cookies:
request = self.helpers.addParameter(request, cookie)
self._raw_request = request
@property
def body(self):
request = self._raw_request.tostring()
body = request[self._analyzer.getBodyOffset():]
return body
def execute(self):
service = self._httpRequestResponse.getHttpService()
httpRequestResponse = self.callbacks.makeHttpRequest(
service, self._raw_request)
return httpRequestResponse
@property
def parameters(self):
params = self._analyzer.getParameters()
return [self.Parameter(p) for p in params]
@parameters.setter
def parameters(self, parameters):
request = self._raw_request
# Delete current parameters
iparams = self._analyzer.getParameters()
for iparam in iparams:
request = self.helpers.removeParameter(request, iparam)
# Add the given parameters
for param in parameters:
request = self.helpers.addParameter(request, param._iparameter)
self._raw_request = request
class Parameter(object):
def __init__(self, iparameter):
self._iparameter = iparameter
@property
def name(self):
return self._iparameter.getName()
@property
def value(self):
return self._iparameter.getValue()
@property
def type(self):
ptype = self._iparameter.getType()
if ptype == IParameter.PARAM_BODY:
return "body"
elif ptype == IParameter.PARAM_COOKIE:
return "cookie"
elif ptype == IParameter.PARAM_JSON:
return "json"
elif ptype == IParameter.PARAM_MULTIPART_ATTR:
return "multipart"
elif ptype == IParameter.PARAM_URL:
return "url"
elif ptype == IParameter.PARAM_XML:
return "xml"
elif ptype == IParameter.PARAM_XML_ATTR:
return "xml-attribute"
else:
return "unknown"
def __str__(self):
return "{0}: {1}={2}".format(self.type, self.name, self.value)
def __repr__(self):
return self.__str__()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment