Skip to content

Instantly share code, notes, and snippets.

@ninp0
Last active April 20, 2024 10:31
Show Gist options
  • Save ninp0/f8d1e37b5c72fb127b92141698159532 to your computer and use it in GitHub Desktop.
Save ninp0/f8d1e37b5c72fb127b92141698159532 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Boofuzz is a fork of and the successor to the venerable Sulley fuzzing framework.
# Besides numerous bug fixes, boofuzz aims for extensibility. The goal: fuzz everything.
# Github: https://github.com/jtpereyda/boofuzz
# Docs: http://boofuzz.readthedocs.io/en/latest/
# PDF: https://buildmedia.readthedocs.org/media/pdf/boofuzz/latest/boofuzz.pdf
#
# This project uses virtualenv to manage all dependencies. To properly install this project, run the following commands:
#
# ```
# $ sudo apt install virtualenv
# $ sudo mkdir /opt/boofuzzing
# $ sudo chown -R $USER:$USER /opt/boofuzzing
# $ sudo chmod 770 /opt/boofuzz-testcases
# $ cd /opt/boofuzzing
# $ echo boofuzz > requirements.txt
# $ virtualenv .
# $ source $(pwd)/bin/activate
# $ $(pwd)/bin/pip install --upgrade pip
# $ $(pwd)/bin/pip install -r requirements.txt
# $ which python
# /opt/boofuzzing/bin/python
# ```
# After typing which python and it responds to residing in $(pwd)/bin/python, you should be good to start fuzzing.
# Based Upon EPMD Protocol Specification: https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#epmd-protocol
import argparse
import time
import socket
from boofuzz import *
from random import randbytes
def hexify(data):
hexified_data = ''
for byte in data:
hexified_data = hexified_data + "\\x{0:0{1}x}".format(ord(byte), 2)
return hexified_data
# Called after the target is started or restarted
def post_start_target(target, fuzz_data_logger, session, sock, *args, **kwargs):
fuzz_data_logger.log_info('triggered post_start_target')
# Called prior to each fuzz request
def pre_send(target, fuzz_data_logger, session, sock, *args, **kwargs):
fuzz_data_logger.log_info('triggered pre_send')
# Called after each fuzz test case
def post_test_case(target, fuzz_data_logger, session, sock, *args, **kwargs):
fuzz_data_logger.log_info('triggered post_test_case')
request_body = ''.join(map(chr, session.last_send))
hexified_request_body = hexify(request_body)
fuzz_data_logger.log_info(repr(request_body))
fuzz_data_logger.log_info(hexified_request_body)
response_body = ''.join(map(chr, target.recv(65536)))
hexified_response_body = hexify(response_body)
fuzz_data_logger.log_info(repr(response_body))
fuzz_data_logger.log_info(hexified_response_body)
# Called after a failed post_test_case_callback
def restart(target, fuzz_data_logger, session, sock, *args, **kwargs):
fuzz_data_logger.log_info('triggered restart')
# Use to pass data received data between node transmits.
def connect_callback(target, fuzz_data_logger, session, node, edge, *args, **kwargs):
fuzz_data_logger.log_info('triggered connect_callback')
def main(args):
target = args.target
port = int(args.port)
web_port = int(args.web_port)
# Connect to target:
session = Session(
post_start_target_callbacks = [post_start_target],
pre_send_callbacks = [pre_send],
post_test_case_callbacks = [post_test_case],
restart_callbacks = [restart],
check_data_received_each_request=True,
ignore_connection_reset=True,
ignore_connection_aborted=True,
web_port=web_port,
keep_web_open=False,
target = Target(
connection = TCPSocketConnection(
target,
port,
send_timeout=0.3,
recv_timeout=0.6
)
)
)
# Begin building out protocol structure for fuzzing fields here:
s_initialize(name='epmd')
with s_block(name='request'):
s_size(name='length', block_name='body', output_format='binary', length=2, endian='>', fuzzable=False)
if args.request_type == 'FUZZ' or args.request_type == None:
with s_block(name='body'):
s_string(name='fuzz_payload', value='\x00', fuzzable=True)
elif args.request_type == 'KILL_EPMD':
with s_block(name='body'):
s_byte(name='request_type', value=0x6B, endian='>', fuzzable=False)
s_word(name='fuzz_payload', value=0x0000, endian='>', full_range=True, fuzzable=True)
elif args.request_type == 'DUMP_REQ':
with s_block(name='body'):
s_byte(name='request_type', value=0x64, endian='>', fuzzable=False)
s_word(name='fuzz_payload', value=0x0000, endian='>', full_range=True, fuzzable=True)
elif args.request_type == 'NAMES_REQ':
with s_block(name='body'):
s_byte(name='request_type', value=0x6E, endian='>', fuzzable=False)
s_word(name='fuzz_payload', value=0x0000, endian='>', full_range=True, fuzzable=True)
elif args.request_type == 'STOP_REQ':
with s_block(name='body'):
s_byte(name='request_type', value=0x73, endian='>', fuzzable=False)
s_word(name='fuzz_payload', value=0x0000, endian='>', full_range=True, fuzzable=True)
elif args.request_type == 'ALIVE2_REQ':
with s_block(name='body'):
s_byte(name='request_type', value=0x78, endian='>', full_range=True, fuzzable=True)
s_word(name='src_port', value=0x0000, endian='>', full_range=True, fuzzable=True)
s_byte(name='node_type', value=0x4d, endian='>', full_range=True, fuzzable=True)
s_byte(name='ip_protocol', value=0x00, endian='>', full_range=True, fuzzable=True)
s_word(name='highest_version', value=0x0005, endian='>', full_range=True, fuzzable=True)
s_word(name='lowest_version', value=0x0005, endian='>', full_range=True, fuzzable=True)
s_size(name='node_name_length', block_name='node_name', output_format='binary', length=2, endian='>', fuzzable=False)
s_string(name='node_name', value='self', fuzzable=True)
s_size(name='extra_length', block_name='extra', output_format='binary', length=2, endian='>', fuzzable=False)
s_string(name='extra', value='extra', fuzzable=True)
elif args.request_type == 'PORT_PLEASE2_REQ':
with s_block(name='body'):
s_byte(name='request_type', value=0x7A, endian='>', fuzzable=False)
s_string(name='node_name', value='self')
session.connect(
s_get('epmd'),
callback=connect_callback
)
session.fuzz()
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description = 'epmd network protocol fuzzer leveraging boofuzz'
)
parser.add_argument('-t', '--target',
default = 'localhost',
help = "target system running epmd (default: %(default)s)",
metavar = 'HOST',
dest = 'target'
)
parser.add_argument('-p', '--port',
default = 4369,
help = "epmd daemon listening port (default: %(default)s)",
metavar = 'PORT',
dest = 'port'
)
parser.add_argument('-w', '--web-port',
default = 26000,
help = "boofuzz web port (default: %(default)s)",
metavar = 'WEB_PORT',
dest = 'web_port'
)
subparsers = parser.add_subparsers(
help='epmd request type - pass custom values to sub-arguments OR pass "FUZZ" to any sub-rgument to assign fields for fuzz mutations',
dest='request_type'
)
pfuzz = subparsers.add_parser(
'FUZZ',
help='Sending Random Fuzz Mutations to EPMD Daemon'
)
p1 = subparsers.add_parser(
'KILL_EPMD',
help='Kill EPMD'
)
p2 = subparsers.add_parser(
'DUMP_REQ',
help='Dump All Data from EPMD'
)
p3 = subparsers.add_parser(
'NAMES_REQ',
help='Get All Registered Names from EPMD'
)
p4 = subparsers.add_parser(
'STOP_REQ',
help='According to EPMD Specification,Not Used...But Still Available?'
)
p5 = subparsers.add_parser(
'ALIVE2_REQ',
help='Register a Node in EPMD'
)
p6 = subparsers.add_parser(
'PORT_PLEASE2_REQ',
help='Get the Distribution Port of Another Node'
)
args = parser.parse_args()
exit(main(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment