Skip to content

Instantly share code, notes, and snippets.

@FrankSpierings
Last active November 10, 2020 08:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FrankSpierings/b4348c8c49086750f9faea94fe4a28dc to your computer and use it in GitHub Desktop.
Save FrankSpierings/b4348c8c49086750f9faea94fe4a28dc to your computer and use it in GitHub Desktop.
Burp NTFS Alternative Data Stream Scanner
# coding=utf-8
from burp import IBurpExtender
from burp import IScannerCheck
from burp import IScanIssue
from burp import IScannerInsertionPoint
from array import array
class BurpExtender(IBurpExtender, IScannerCheck):
#
# implement IBurpExtender
#
def registerExtenderCallbacks(self, callbacks):
# keep a reference to our callbacks object
self._callbacks = callbacks
# obtain an extension helpers object
self._helpers = callbacks.getHelpers()
# set our extension name
callbacks.setExtensionName("NTFS Alternate Data Stream Scanner")
# register ourselves as a custom scanner check
callbacks.registerScannerCheck(self)
#
# implement IScannerCheck
#
def doPassiveScan(self, baseRequestResponse):
return []
def doActiveScan(self, baseRequestResponse, insertionPoint):
# make a request containing our injection test in the insertion point
if insertionPoint.getInsertionPointType() == IScannerInsertionPoint.INS_URL_PATH_FILENAME:
# Per file checks
return self.doActiveScanPerfile(baseRequestResponse, insertionPoint)
return None
def consolidateDuplicateIssues(self, existingIssue, newIssue):
# This method is called when multiple issues are reported for the same URL
# path by the same extension-provided check. The value we return from this
# method determines how/whether Burp consolidates the multiple issues
# to prevent duplication
#
# Since the issue name is sufficient to identify our issues as different,
# if both issues have the same name, only report the existing issue
# otherwise report both issues
if existingIssue.getIssueName() == newIssue.getIssueName():
return -1
return 0
def doActiveScanPerfile(self, baseRequestResponse, insertionPoint):
placeholder = 'PLACEHOLDER'
url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
filename = url.getFile().split('/')[-1]
inject = '{0}{1}'.format(filename, placeholder)
# Build the new request
checkRequest = insertionPoint.buildRequest(inject)
# Convert array.array to string (python2.7 string==bytes)
checkRequestBytes = checkRequest.tostring()
# Replace the placeholder with ::$DATA, to circumvent automatic URL-encoding
checkRequestBytes = checkRequestBytes.replace(placeholder, '::$DATA')
# Convert the string back to Java compatible byte[]
checkRequest = array('b', checkRequestBytes)
# Send the request
checkRequestResponse = self._callbacks.makeHttpRequest(
baseRequestResponse.getHttpService(), checkRequest)
# Get the response object
checkResponse = checkRequestResponse.getResponse()
status_code = self._helpers.analyzeResponse(checkResponse).getStatusCode()
if status_code == 200:
url = self._helpers.analyzeRequest(checkRequestResponse).getUrl()
httpMessages = [checkRequestResponse]
name = "NTFS Alternate Data Stream"
detail = "Requesting the file with ::$DATA attached to it."
severity = "High"
return [CustomScanIssue(baseRequestResponse.getHttpService(), url, httpMessages, name, detail, severity)]
return None
#
# class implementing IScanIssue to hold our custom scan issue details
#
class CustomScanIssue (IScanIssue):
def __init__(self, httpService, url, httpMessages, name, detail, severity):
self._httpService = httpService
self._url = url
self._httpMessages = httpMessages
self._name = name
self._detail = detail
self._severity = severity
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0
def getSeverity(self):
return self._severity
def getConfidence(self):
return "Tentative"
def getIssueBackground(self):
pass
def getRemediationBackground(self):
pass
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
pass
def getHttpMessages(self):
return self._httpMessages
def getHttpService(self):
return self._httpService
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment