Skip to content

Instantly share code, notes, and snippets.

@Cr4sh
Last active June 12, 2020 06:59
Show Gist options
  • Save Cr4sh/5b06371f0e3a3b137684fd79885f0519 to your computer and use it in GitHub Desktop.
Save Cr4sh/5b06371f0e3a3b137684fd79885f0519 to your computer and use it in GitHub Desktop.
AT commands fuzzer based on ATFuzzer code base
#!/usr/bin/env python2
'''
********************************************************************************
AT commands fuzzer based on ATFuzzer code base.
* https://github.com/Imtiazkarimik23/ATFuzzer
* https://relentless-warrior.github.io/wp-content/uploads/2019/11/atfuzz.pdf
USAGE: at_fuzzer.py <serial_port_device>
********************************************************************************
'''
import sys, os, time, random, string, json, copy
import serial
from numpy import setdiff1d
# number of iterations
ATTEMPTS = 1
# number of input generated for each grammar
INPUT_NUMBER = 1
# number of commands in the input line
CMD_NUMBER = 1
# grammar database
GRAM_FILE_PATH = 'at_command_grammar.json'
DEVICE_BAUD = 115200
DEVICE_RED_LEN = 0x100
DEVICE_TIMEOUT = 1
C_END = '\33[0m'
C_RED = '\33[31m'
C_GREEN = '\33[32m'
C_YELLOW = '\33[33m'
C_BLUE = '\33[34m'
MAX_INT = sys.maxsize
MIN_INT = -sys.maxsize
# list of standard versions of the grammars
m_standard_set = []
# serial port device
m_device = None
m_count = 0
semicolon = ';'
def color(col, text):
# print text with given color
return col + text + C_END
def read_conf():
conf = json.load(open(GRAM_FILE_PATH))
return conf['AT_CMD_GRAMMARS']
def flip_coin(faces = 1):
# returns a random number between 0 and faces
return random.randint(0, faces) if faces > 0 else random.randint(0, 1)
def remove_elements(list_1, list_2):
new_list = []
for e in list_1:
if e not in list_2:
new_list.append(e)
return new_list
def in_gram_crossover(cmd_gram, move_command = 0):
gram_struct = cmd_gram['struct']
if len(gram_struct) > 2:
start = 0 if move_command == 1 else 2
i = random.randint(start, len(gram_struct) - 1)
j = random.randint(start, len(gram_struct) - 1)
gram_struct[i], gram_struct[j] = gram_struct[j], gram_struct[i]
return cmd_gram
def multi_gram_crossover(grams_set):
gram_1, gram_2 = random.choices(grams_set, k = 2)
arg_1 = random.choice(gram_1['arg'])
arg_2 = random.choice(gram_2['arg'])
gram_1[arg_1], gram_2[arg_2] = gram_2[arg_2], gram_1[arg_1]
def add_field(cmd_gram):
gram_struct = cmd_gram['struct']
possible_elements = remove_elements(cmd_gram.keys(), [ 'struct', 'arg', 'separator', 'score' ])
missing_elements = setdiff1d(possible_elements, gram_struct)
if len(missing_elements) > 0:
gram_struct.append(random.choice(missing_elements))
else:
cmd_gram['struct'].append('random')
cmd_gram['arg'].append('random')
cmd_gram['random'] = { 'type': 'string', 'length': 5 }
if not cmd_gram.has_key('separator'):
# there is no separator, probably due to the fact that there is one ore less argument
cmd_gram['separator'] = ','
def remove_field(cmd_gram, move_command = 0):
gram_struct = cmd_gram['struct']
if len(gram_struct) > 2:
start = 0 if move_command == 1 else 2
gram_struct.pop(random.randint(start, len(gram_struct) - 1))
def gram_random_add_delete(cmd_gram, move_command = 0):
''' Takes list of elements and add/delete an element to/from that list.
'''
gram_struct = cmd_gram['struct']
if len(gram_struct) < 2 or flip_coin() == 0:
# add field to the input
add_field(cmd_gram)
else:
# remove field from the imput
remove_field(cmd_gram, move_command)
def negate_condition(cmd_gram):
''' Negate the condition associated to a field. For instance, the field is an integer between 1 and 10,
the function changes the range below 1 or above 10.
'''
arg_in_struct = []
for el in cmd_gram['arg']:
if el in cmd_gram['struct']:
arg_in_struct.append(el)
if len(arg_in_struct) > 0:
arg_name = random.choice(arg_in_struct)
arg = cmd_gram[arg_name]
type = arg['type']
if type == 'digit' or type == 'letters' or type == 'string':
# length: int
half_len, double_len = (arg['length'] / 2), (arg['length'] * 2)
arg['length'] = double_len if half_len < 2 else random.choice([ half_len, double_len ])
elif type == 'ranged':
start, end = arg['range'][0], arg['range'][1]
start_or_end = flip_coin()
if start_or_end == 1:
if start > MIN_INT:
# negate start
arg['range'] = [ start - 100, start - 1 ] if (start - 100 > MIN_INT) else [ MIN_INT, start - 1 ]
else:
if end < MAX_INT:
# engate end
arg['range'] = [ end + 1, end + 100 ] if (end + 100 < MAX_INT) else [ end + 1, MAX_INT ]
elif type == 'fixed':
# values: [ str_1, str_2, ... , str_N ]
new_value = []
for x in range(10):
new_value.append(random.choice(string.ascii_letters + string.digits + string.punctuation))
arg['values'].append(''.join(new_value))
elif type == 'immutable':
# immutable value: str
pass
else:
raise Exception('Unknow argument type')
def try_restore_type(arg, type):
''' Try to restore a fixed or ranged type in the case it was negate in a previous iteration.
This leverages the fact that we do not delete the field values or range when negating the type.
'''
if flip_coin() == 1:
if arg.has_key('values'):
arg['type'] = random.choice([ 'string', 'letters', 'fixed' ])
else:
if arg.has_key('range'):
arg['type'] = random.choice([ 'string', 'letters', 'ranged' ])
else:
if type == 'digit':
arg['type'] = random.choice([ 'string', 'letters' ])
elif type == 'letters':
arg['type'] = random.choice([ 'string', 'digit' ])
elif type == 'string':
pass
else:
if type == 'digit':
arg['type'] = random.choice([ 'string', 'letters' ])
elif type == 'letters':
arg['type'] = random.choice([ 'string', 'digit' ])
elif type == 'string':
pass
def negate_type(cmd_gram):
''' Negates the type associated to a field. For instance, the field is an integer,
the function converts it into a string.
'''
arg_in_struct = []
for el in cmd_gram['arg']:
if el in cmd_gram['struct']:
arg_in_struct.append(el)
if len(arg_in_struct) > 0:
arg_name = random.choice(arg_in_struct)
arg = cmd_gram[arg_name]
type = arg['type']
if type == 'digit' or type == 'letters' or type == 'string':
try_restore_type(arg, type)
elif type == 'ranged' or type == 'fixed':
arg['type'] = random.choice(['string', 'digit', 'letters'])
arg['length'] = random.randint(0, 100)
elif type == 'immutable':
# cannot change immutable
pass
else:
raise Exception('Unknow argument type')
def fixed_integers(cmd_gram):
''' Negates the condition associated to a field and set value of the filed to corner case values.
'''
arg_in_struct = []
for el in cmd_gram['arg']:
if el in cmd_gram['struct']:
arg_in_struct.append(el)
if len(arg_in_struct) > 0:
arg_name = random.choice(arg_in_struct)
arg = cmd_gram[arg_name]
type = arg['type']
if type == 'digit':
arg['type'] = 'fixed'
arg['values'] = [ MIN_INT, MAX_INT ]
if type == 'fixed':
arg['values'].append([ MIN_INT, MAX_INT ])
if type == 'ranged':
if flip_coin() == 0:
arg['range'] = [ MIN_INT, arg['range'][1] ]
else:
arg['range'] = [ arg['range'][0], MAX_INT ]
def alter_connectors(cmd_gram):
try:
# alters the symbols used for connecting grammars and fields with grammars
cmd_gram['separator'] = random.choice(string.punctuation)
except KeyError:
pass
def modify_grammar(cmd_gram, move_command = 0):
in_gram_crossover(cmd_gram, move_command)
if flip_coin() == 1:
gram_random_add_delete(cmd_gram, move_command)
if flip_coin() == 1:
negate_condition(cmd_gram)
if flip_coin() == 1:
negate_type(cmd_gram)
if flip_coin() == 1:
fixed_integers(cmd_gram)
if flip_coin() == 1:
alter_connectors(cmd_gram)
def random_semicolon():
''' Produce a string with a random number of semicolon (1-5).
'''
semicolon_num = random.randint(1, 5)
return ''.join(semicolon for x in range(semicolon_num))
def random_digits(value_length):
''' Produce a random number of maximum length of value_length.
'''
if value_length < 0:
raise Exception('Negative length')
if value_length == 0:
value_length = 1
l = random.randint(1, value_length)
range_start = 10 ** (l - 1)
range_end = (10 ** l) - 1
return str(random.randint(range_start, range_end))
def random_letters(value_length):
''' Produce a random string of letters of length of value_length.
'''
if value_length < 0:
raise Exception('Negative length')
l = 1 if value_length == 1 or value_length == 0 else random.randint(1, value_length)
return ''.join(random.choice(string.lowercase) for x in range(l))
def random_string(value_length):
''' Produce a random string of numbers, letters and symbols of length of value_length.
'''
if value_length < 0:
raise Exception('Negative length')
l = 1 if value_length == 1 or value_length == 0 else random.randint(1, value_length)
return ''.join(random.choice(string.ascii_letters + string.digits + string.punctuation) for x in range(l))
def generate_value(singol_arg):
type = singol_arg['type']
if type == 'digit':
return random_digits(singol_arg['length'])
elif type == 'letters':
return random_letters(singol_arg['length'])
elif type == 'string':
return random_string(singol_arg['length'])
elif type == 'ranged':
start, end = singol_arg['range'][0], singol_arg['range'][1]
return str(random.randint(start, end))
elif type == 'fixed':
return str(random.choice(singol_arg['values']))
elif type == 'immutable':
return singol_arg['immutable_value']
else:
raise Exception('Unknown type')
def gen_terminal(cmd_info, elem):
try:
arg = cmd_info['arg']
except:
arg = []
return generate_value(cmd_info[elem]) if elem in arg else str(cmd_info[elem])
def gen_command(command_gram):
''' Build one AT command based on the grammar structure and the command info.
'''
command, previous_was_arg = '', 0
for elem in command_gram['struct']:
term = gen_terminal(command_gram, elem)
try:
arg = command_gram['arg']
except:
arg = []
if elem in arg:
if term != 'null':
command += (command_gram['separator'] + term) if len(arg) > 1 and previous_was_arg == 1 else term
previous_was_arg = 1
else:
command += term
previous_was_arg = 0
return command
def command_exec(command, quiet = False):
global m_device, m_count
reply = ''
# send command
m_device.read(DEVICE_RED_LEN)
m_device.write(command + '\r')
if not quiet:
print('[%.8x] >>> %s' % (m_count, repr(command)))
t = time.time()
try:
# receive reply
reply = m_device.readline().strip()
t = time.time() - t
if reply == command.strip():
reply = m_device.readline().strip()
except serial.serialutil.SerialException:
pass
if not quiet:
reply_str = reply
if len(reply_str) == 0:
reply_str = None
if reply_str is None:
reply_str = color(C_RED, repr(reply_str))
elif reply_str.endswith('OK'):
reply_str = color(C_GREEN, repr(reply_str))
elif not reply_str.endswith('ERROR'):
reply_str = color(C_YELLOW, repr(reply_str))
else:
reply_str = repr(reply_str)
print('[%.8x] <<< %s %s' % (m_count, reply_str, color(C_BLUE, '%.4f' % t)))
return reply
def command_eval(command):
global m_count
ok = False
for _ in range(0, 5):
# check connection
reply = command_exec('AT', quiet = True)
if reply.strip() == 'OK':
ok = True
break
else:
time.sleep(1)
if not ok:
raise(Exception('Bad reply'))
command_exec(command)
m_count += 1
def set_eval(gram_set):
for _ in range(INPUT_NUMBER):
command = ''
for gram in gram_set:
# generate command data for given grammar
command += 'AT' + str(gen_command(gram)) + ';'
# execute command
command_eval(command[: -1])
# accept a list of grammars and return a list of as new list as the diversification factor
def set_modify(gram_set, diversification_factor):
global m_standard_set
modified_set = []
for _ in range(diversification_factor):
modified_set.append([])
for gram in gram_set:
generated = 0
gram_prev = {}
while generated < diversification_factor:
# copy and modify current grammar
gram_new = copy.deepcopy(gram)
modify_grammar(gram_new)
if gram_new not in m_standard_set and gram_new != gram_prev:
modified_set[generated].append(gram_new)
generated += 1
gram_prev = copy.deepcopy(gram_new)
return modified_set
def population_create(gram_sets, diversification_factor = 1):
new_population = []
for single_set in gram_sets:
for new_set in set_modify(single_set, diversification_factor):
new_population.append(new_set)
return new_population
def population_select(scores):
# randomly select 2 grammars
return random.sample(scores, 2)
def gram_fuzz(gram_set):
global m_standard_set
for gram in gram_set:
gram['score'] = 0
if not gram.has_key('arg'):
# no argument is expected
gram['struct'].append('random')
gram['separator'] = ''
gram['arg'] = [ 'random' ]
gram['random'] = { 'type': 'string', 'length': 5 }
if len(gram['struct']) > 3 and gram['cmd'] != '+CMGS':
m_standard_set.append(gram)
set_population = population_create([ gram_set ], 10)
set_scores = []
for _ in range(ATTEMPTS):
for gram_set in set_population:
# evaluate grammars
set_eval(gram_set)
set_scores.append(gram_set)
# create new population
selected_sets = population_select(set_scores)
set_population = population_create(selected_sets, 2)
def gram_eval():
grams = read_conf()
while True:
current_set = random.sample(grams.values(), CMD_NUMBER)
gram_fuzz(current_set)
def main():
global m_device
if len(sys.argv) < 2:
print('USAGE: at_fuzzer.py <serial_port_device>')
return 0
# open serial port
m_device = serial.Serial(port = sys.argv[1], baudrate = DEVICE_BAUD, timeout = DEVICE_TIMEOUT)
# check connectivity
command_exec('AT', quiet = True)
try:
# run fuzzing
gram_eval()
except KeyboardInterrupt:
print('\nEXIT')
return 0
if __name__ == '__main__':
exit(main())
#
# EoF
#
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment