Last active
February 8, 2022 18:14
-
-
Save wumb0/d9c59a05cd84f25c92099ccff4fa90a7 to your computer and use it in GitHub Desktop.
more optimized frida drcov script for lighthouse that monitors windows targets for new threads and modules
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import argparse | |
import json | |
import os | |
import sys | |
import threading | |
import functools | |
import frida | |
""" | |
Frida BB tracer that outputs in DRcov format. | |
Frida script is responsible for: | |
- Getting and sending the process module map initially | |
- Getting the code execution events | |
- Parsing the raw event into a GumCompileEvent | |
- Converting from GumCompileEvent to DRcov block | |
- Sending a list of DRcov blocks to python | |
- Monitoring for new threads and dynamically loaded modules (windows only right now) | |
Python side is responsible for: | |
- Attaching and detaching from the target process | |
- Removing duplicate DRcov blocks | |
- Formatting module map and blocks | |
- Writing the output file | |
""" | |
# Our frida script, takes two string arguments to embed | |
# 1. whitelist of modules, in the form "['module_a', 'module_b']" or "['all']" | |
# 2. threads to trace, in the form "[345, 765]" or "['all']" | |
js = """ | |
"use strict"; | |
var whitelist = %s; | |
var threadlist = %s; | |
var mod_idx = 0; | |
var module_ids = {}; | |
var maps = Process.enumerateModulesSync(); | |
populate_maps(maps); | |
function populate_maps(maps) { | |
maps.map(function(o) { | |
o.id = mod_idx++; | |
o.end = o.base.add(o.size); | |
module_ids[o.path] = {id: o.id, start: o.base}; | |
}); | |
send({'map': maps}); | |
} | |
var filtered_maps = new ModuleMap(function (m) { | |
if (whitelist.indexOf('all') >= 0) { return true; } | |
var include = whitelist.indexOf(m.name) >= 0; | |
// uncommenting the following might provide a speedup, but also | |
// might cause you to miss blocks. YMMV | |
//if (!include) { Stalker.exclude(m); } | |
return include; | |
}); | |
// use a global buffer and only send it when it fills or the trace ends | |
var entry_sz = 8; | |
var buffer_size = 1024 * 1024; | |
var bb = new ArrayBuffer(entry_sz * buffer_size); | |
var cur = 0; | |
function sendBuffer() { | |
send({bbs: 1}, new Uint8Array(bb, 0, cur * entry_sz)); | |
cur = 0; | |
} | |
// This function takes a list of GumCompileEvents and converts it into a DRcov | |
// entry. Note that we'll get duplicated events when two traced threads | |
// execute the same code, but this will be handled by the python side. | |
function drcov_bbs(bbs, fmaps, path_ids) { | |
// We're going to use send(..., data) so we need an array buffer to send | |
// our results back with. Let's go ahead and alloc the max possible | |
// reply size | |
/* | |
// Data structure for the coverage info itself | |
typedef struct _bb_entry_t { | |
uint start; // offset of bb start from the image base | |
ushort size; | |
ushort mod_id; | |
} bb_entry_t; | |
*/ | |
for (var i = 0; i < bbs.length; ++i) { | |
if (cur == buffer_size) { | |
sendBuffer(); | |
} | |
var e = bbs[i]; | |
var start = e[0]; | |
var end = e[1]; | |
var path = fmaps.findPath(start); | |
if (path == null) { continue; } | |
var mod_info = path_ids[path]; | |
var offset = start.sub(mod_info.start).toInt32(); | |
var size = end.sub(start).toInt32(); | |
var mod_id = mod_info.id; | |
// We're going to create two memory views into the array we alloc'd at | |
// the start. | |
// we want one u32 after all the other entries we've created | |
var x = new Uint32Array(bb, cur * entry_sz, 1); | |
x[0] = offset; | |
// we want two u16's offset after the 4 byte u32 above | |
var y = new Uint16Array(bb, cur * entry_sz + 4, 2); | |
y[0] = size; | |
y[1] = mod_id; | |
++cur; | |
} | |
} | |
// Punt on self modifying code -- should improve speed and lighthouse will | |
// barf on it anyways | |
Stalker.trustThreshold = 0; | |
console.log('Starting to stalk threads...'); | |
var threads = []; | |
function stalkthread(thread_id) { | |
if (threadlist.indexOf(thread_id) < 0 && | |
threadlist.indexOf('all') < 0) { | |
// This is not the thread you're look for | |
return; | |
} | |
console.log('Stalking thread ' + thread_id + '.'); | |
threads.push(thread_id); | |
Stalker.follow(thread_id, { | |
events: { | |
compile: true | |
}, | |
onReceive: function (event) { | |
var bb_events = Stalker.parse(event, | |
{stringify: false, annotate: false}); | |
drcov_bbs(bb_events, filtered_maps, module_ids); | |
} | |
}); | |
} | |
Process.enumerateThreads({ | |
onMatch(thread) { stalkthread(thread.id) }, | |
onComplete: function () { console.log('Done stalking existing threads.'); } | |
}); | |
// platform specific code | |
if (Process.platform == "windows") { | |
var GetThreadId = new NativeFunction(Module.getExportByName('kernel32', "GetThreadId"), 'uint32', ['pointer']); | |
function onLeaveCreateThread(ret) { | |
if (ret == 0) { | |
// on success read the thread handle and get its ID | |
var handle = this.thandle_ptr.readPointer(); | |
var tid = GetThreadId(handle); | |
stalkthread(tid); | |
} | |
} | |
var NtCreateThread = Module.getExportByName('ntdll', 'NtCreateThread'); | |
var NtCreateThreadEx = Module.getExportByName('ntdll', 'NtCreateThreadEx'); | |
Interceptor.attach(NtCreateThread, { | |
onEnter(args) { | |
// arg 5 is PCLIENT_ID, 2nd HANDLE of CLIENT _ID is THreadHandle | |
this.thandle_ptr = ptr(args[4]).add(Process.pointerSize); | |
}, | |
onLeave: onLeaveCreateThread | |
}); | |
Interceptor.attach(NtCreateThreadEx, { | |
onEnter(args) { | |
// arg 0 receives thread handle on success | |
this.thandle_ptr = ptr(args[0]); | |
}, | |
onLeave: onLeaveCreateThread | |
}); | |
// all load library boils down to LoadLibraryExW | |
var LoadLibraryW = Module.getExportByName("kernelbase", "LoadLibraryExW"); | |
Interceptor.attach(LoadLibraryW, { onLeave(args) { | |
// refresh module list, send new modules | |
var mods = Process.enumerateModules().filter(mod => !(mod.path in module_ids)); | |
if (mods.length != 0) { | |
populate_maps(mods); | |
filtered_maps.update(); | |
} | |
}}); | |
} | |
// when done in python, unload and send remaining buffer | |
recv("done", function() { | |
Interceptor.detachAll(); | |
threads.map(function(i) { Stalker.unfollow(i) }); | |
Stalker.flush(); | |
Stalker.garbageCollect(); | |
sendBuffer(); | |
send("done"); | |
}); | |
""" | |
# These are global so we can easily access them from the frida callbacks or | |
# signal handlers. It's important that bbs is a set, as we're going to depend | |
# on it's uniquing behavior for deduplication | |
modules = [] | |
bbs = set([]) | |
outfile = 'frida-cov.log' | |
# This converts the object frida sends which has string addresses into | |
# a python dict | |
def populate_modules(image_list): | |
global modules | |
for image in image_list: | |
idx = image['id'] | |
path = image['path'] | |
base = int(image['base'], 0) | |
end = int(image['end'], 0) | |
size = image['size'] | |
m = { | |
'id': idx, | |
'path': path, | |
'base': base, | |
'end': end, | |
'size': size} | |
modules.append(m) | |
# called when we get coverage data from frida | |
def populate_bbs(data): | |
global bbs | |
# we know every drcov block is 8 bytes, so lets just blindly slice and | |
# insert. This will dedup for us. | |
block_sz = 8 | |
for i in range(0, len(data), block_sz): | |
bbs.add(data[i:i+block_sz]) | |
# take the module dict and format it as a drcov logfile header | |
def create_header(mods): | |
header = '' | |
header += 'DRCOV VERSION: 2\n' | |
header += 'DRCOV FLAVOR: frida\n' | |
header += 'Module Table: version 2, count %d\n' % len(mods) | |
header += 'Columns: id, base, end, entry, checksum, timestamp, path\n' | |
entries = [] | |
for m in mods: | |
# drcov: id, base, end, entry, checksum, timestamp, path | |
# frida doesnt give us entry, checksum, or timestamp | |
# luckily, I don't think we need them. | |
entry = '%3d, %#016x, %#016x, %#016x, %#08x, %#08x, %s' % ( | |
m['id'], m['base'], m['end'], 0, 0, 0, m['path']) | |
entries.append(entry) | |
header_modules = '\n'.join(entries) | |
return ("%s%s\n" % (header, header_modules)).encode("utf-8") | |
# take the recv'd basic blocks, finish the header, and append the coverage | |
def create_coverage(data): | |
bb_header = b'BB Table: %d bbs\n' % len(data) | |
return bb_header + b''.join(data) | |
def on_message(event, msg, data): | |
#print(msg) | |
if msg['type'] == 'error': | |
print(msg) | |
return | |
pay = msg['payload'] | |
if 'map' in pay: | |
maps = pay['map'] | |
populate_modules(maps) | |
elif "done" in pay: | |
event.set() | |
else: | |
populate_bbs(data) | |
def save_coverage(): | |
header = create_header(modules) | |
body = create_coverage(bbs) | |
with open(outfile, 'wb') as h: | |
h.write(header) | |
h.write(body) | |
def main(): | |
global outfile | |
parser = argparse.ArgumentParser() | |
parser.add_argument('target', | |
help='target process name or pid', | |
default=-1) | |
parser.add_argument('-o', '--outfile', | |
help='coverage file', | |
default='frida-cov.log') | |
parser.add_argument('-w', '--whitelist-modules', | |
help='module to trace, may be specified multiple times [all]', | |
action='append', default=[]) | |
parser.add_argument('-t', '--thread-id', | |
help='threads to trace, may be specified multiple times [all]', | |
action='append', type=int, default=[]) | |
parser.add_argument('-D', '--device', | |
help='select a device by id [local]', | |
default='local') | |
args = parser.parse_args() | |
outfile = args.outfile | |
device = frida.get_device(args.device) | |
target = -1 | |
for p in device.enumerate_processes(): | |
if args.target in [str(p.pid), p.name]: | |
if target == -1: | |
target = p.pid | |
else: | |
print('[-] Warning: multiple processes on device match ' | |
'\'%s\', using pid: %d' % (args.target, target)) | |
if target == -1: | |
print('[-] Error: could not find process matching ' | |
'\'%s\' on device \'%s\'' % (args.target, device.id)) | |
sys.exit(1) | |
print('[*] Attaching to pid \'%d\' on device \'%s\'...' % | |
(target, device.id)) | |
session = device.attach(target) | |
print('[+] Attached. Loading script...') | |
whitelist_modules = ['all'] | |
if len(args.whitelist_modules): | |
whitelist_modules = args.whitelist_modules | |
threadlist = ['all'] | |
if len(args.thread_id): | |
threadlist = args.thread_id | |
json_whitelist_modules = json.dumps(whitelist_modules) | |
json_threadlist = json.dumps(threadlist) | |
script = session.create_script(js % (json_whitelist_modules, json_threadlist)) | |
event = threading.Event() | |
script.on('message', functools.partial(on_message, event)) | |
script.load() | |
print('[*] Now collecting info, press enter to end trace') | |
try: | |
sys.stdin.read(1) | |
except KeyboardInterrupt: | |
pass | |
script.post({"type": "done"}) | |
print('[*] Detaching, this might take a second...') | |
event.wait() | |
print('[+] Detached. Got %d basic blocks.' % len(bbs)) | |
print('[*] Formatting coverage and saving...') | |
save_coverage() | |
print('[!] Done') | |
# session.detach and script.unload hang the process | |
# I don't like doing this but I can't figure out another way | |
os._exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment