Skip to content

Instantly share code, notes, and snippets.

@wumb0
Last active February 8, 2022 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wumb0/d9c59a05cd84f25c92099ccff4fa90a7 to your computer and use it in GitHub Desktop.
Save wumb0/d9c59a05cd84f25c92099ccff4fa90a7 to your computer and use it in GitHub Desktop.
more optimized frida drcov script for lighthouse that monitors windows targets for new threads and modules
#!/usr/bin/env python
from __future__ import print_function
import argparse
import json
import os
import sys
import threading
import functools
import frida
"""
Frida BB tracer that outputs in DRcov format.
Frida script is responsible for:
- Getting and sending the process module map initially
- Getting the code execution events
- Parsing the raw event into a GumCompileEvent
- Converting from GumCompileEvent to DRcov block
- Sending a list of DRcov blocks to python
- Monitoring for new threads and dynamically loaded modules (windows only right now)
Python side is responsible for:
- Attaching and detaching from the target process
- Removing duplicate DRcov blocks
- Formatting module map and blocks
- Writing the output file
"""
# Our frida script, takes two string arguments to embed
# 1. whitelist of modules, in the form "['module_a', 'module_b']" or "['all']"
# 2. threads to trace, in the form "[345, 765]" or "['all']"
js = """
"use strict";
var whitelist = %s;
var threadlist = %s;
var mod_idx = 0;
var module_ids = {};
var maps = Process.enumerateModulesSync();
populate_maps(maps);
function populate_maps(maps) {
maps.map(function(o) {
o.id = mod_idx++;
o.end = o.base.add(o.size);
module_ids[o.path] = {id: o.id, start: o.base};
});
send({'map': maps});
}
var filtered_maps = new ModuleMap(function (m) {
if (whitelist.indexOf('all') >= 0) { return true; }
var include = whitelist.indexOf(m.name) >= 0;
// uncommenting the following might provide a speedup, but also
// might cause you to miss blocks. YMMV
//if (!include) { Stalker.exclude(m); }
return include;
});
// use a global buffer and only send it when it fills or the trace ends
var entry_sz = 8;
var buffer_size = 1024 * 1024;
var bb = new ArrayBuffer(entry_sz * buffer_size);
var cur = 0;
function sendBuffer() {
send({bbs: 1}, new Uint8Array(bb, 0, cur * entry_sz));
cur = 0;
}
// This function takes a list of GumCompileEvents and converts it into a DRcov
// entry. Note that we'll get duplicated events when two traced threads
// execute the same code, but this will be handled by the python side.
function drcov_bbs(bbs, fmaps, path_ids) {
// We're going to use send(..., data) so we need an array buffer to send
// our results back with. Let's go ahead and alloc the max possible
// reply size
/*
// Data structure for the coverage info itself
typedef struct _bb_entry_t {
uint start; // offset of bb start from the image base
ushort size;
ushort mod_id;
} bb_entry_t;
*/
for (var i = 0; i < bbs.length; ++i) {
if (cur == buffer_size) {
sendBuffer();
}
var e = bbs[i];
var start = e[0];
var end = e[1];
var path = fmaps.findPath(start);
if (path == null) { continue; }
var mod_info = path_ids[path];
var offset = start.sub(mod_info.start).toInt32();
var size = end.sub(start).toInt32();
var mod_id = mod_info.id;
// We're going to create two memory views into the array we alloc'd at
// the start.
// we want one u32 after all the other entries we've created
var x = new Uint32Array(bb, cur * entry_sz, 1);
x[0] = offset;
// we want two u16's offset after the 4 byte u32 above
var y = new Uint16Array(bb, cur * entry_sz + 4, 2);
y[0] = size;
y[1] = mod_id;
++cur;
}
}
// Punt on self modifying code -- should improve speed and lighthouse will
// barf on it anyways
Stalker.trustThreshold = 0;
console.log('Starting to stalk threads...');
var threads = [];
function stalkthread(thread_id) {
if (threadlist.indexOf(thread_id) < 0 &&
threadlist.indexOf('all') < 0) {
// This is not the thread you're look for
return;
}
console.log('Stalking thread ' + thread_id + '.');
threads.push(thread_id);
Stalker.follow(thread_id, {
events: {
compile: true
},
onReceive: function (event) {
var bb_events = Stalker.parse(event,
{stringify: false, annotate: false});
drcov_bbs(bb_events, filtered_maps, module_ids);
}
});
}
Process.enumerateThreads({
onMatch(thread) { stalkthread(thread.id) },
onComplete: function () { console.log('Done stalking existing threads.'); }
});
// platform specific code
if (Process.platform == "windows") {
var GetThreadId = new NativeFunction(Module.getExportByName('kernel32', "GetThreadId"), 'uint32', ['pointer']);
function onLeaveCreateThread(ret) {
if (ret == 0) {
// on success read the thread handle and get its ID
var handle = this.thandle_ptr.readPointer();
var tid = GetThreadId(handle);
stalkthread(tid);
}
}
var NtCreateThread = Module.getExportByName('ntdll', 'NtCreateThread');
var NtCreateThreadEx = Module.getExportByName('ntdll', 'NtCreateThreadEx');
Interceptor.attach(NtCreateThread, {
onEnter(args) {
// arg 5 is PCLIENT_ID, 2nd HANDLE of CLIENT _ID is THreadHandle
this.thandle_ptr = ptr(args[4]).add(Process.pointerSize);
},
onLeave: onLeaveCreateThread
});
Interceptor.attach(NtCreateThreadEx, {
onEnter(args) {
// arg 0 receives thread handle on success
this.thandle_ptr = ptr(args[0]);
},
onLeave: onLeaveCreateThread
});
// all load library boils down to LoadLibraryExW
var LoadLibraryW = Module.getExportByName("kernelbase", "LoadLibraryExW");
Interceptor.attach(LoadLibraryW, { onLeave(args) {
// refresh module list, send new modules
var mods = Process.enumerateModules().filter(mod => !(mod.path in module_ids));
if (mods.length != 0) {
populate_maps(mods);
filtered_maps.update();
}
}});
}
// when done in python, unload and send remaining buffer
recv("done", function() {
Interceptor.detachAll();
threads.map(function(i) { Stalker.unfollow(i) });
Stalker.flush();
Stalker.garbageCollect();
sendBuffer();
send("done");
});
"""
# These are global so we can easily access them from the frida callbacks or
# signal handlers. It's important that bbs is a set, as we're going to depend
# on it's uniquing behavior for deduplication
modules = []
bbs = set([])
outfile = 'frida-cov.log'
# This converts the object frida sends which has string addresses into
# a python dict
def populate_modules(image_list):
global modules
for image in image_list:
idx = image['id']
path = image['path']
base = int(image['base'], 0)
end = int(image['end'], 0)
size = image['size']
m = {
'id': idx,
'path': path,
'base': base,
'end': end,
'size': size}
modules.append(m)
# called when we get coverage data from frida
def populate_bbs(data):
global bbs
# we know every drcov block is 8 bytes, so lets just blindly slice and
# insert. This will dedup for us.
block_sz = 8
for i in range(0, len(data), block_sz):
bbs.add(data[i:i+block_sz])
# take the module dict and format it as a drcov logfile header
def create_header(mods):
header = ''
header += 'DRCOV VERSION: 2\n'
header += 'DRCOV FLAVOR: frida\n'
header += 'Module Table: version 2, count %d\n' % len(mods)
header += 'Columns: id, base, end, entry, checksum, timestamp, path\n'
entries = []
for m in mods:
# drcov: id, base, end, entry, checksum, timestamp, path
# frida doesnt give us entry, checksum, or timestamp
# luckily, I don't think we need them.
entry = '%3d, %#016x, %#016x, %#016x, %#08x, %#08x, %s' % (
m['id'], m['base'], m['end'], 0, 0, 0, m['path'])
entries.append(entry)
header_modules = '\n'.join(entries)
return ("%s%s\n" % (header, header_modules)).encode("utf-8")
# take the recv'd basic blocks, finish the header, and append the coverage
def create_coverage(data):
bb_header = b'BB Table: %d bbs\n' % len(data)
return bb_header + b''.join(data)
def on_message(event, msg, data):
#print(msg)
if msg['type'] == 'error':
print(msg)
return
pay = msg['payload']
if 'map' in pay:
maps = pay['map']
populate_modules(maps)
elif "done" in pay:
event.set()
else:
populate_bbs(data)
def save_coverage():
header = create_header(modules)
body = create_coverage(bbs)
with open(outfile, 'wb') as h:
h.write(header)
h.write(body)
def main():
global outfile
parser = argparse.ArgumentParser()
parser.add_argument('target',
help='target process name or pid',
default=-1)
parser.add_argument('-o', '--outfile',
help='coverage file',
default='frida-cov.log')
parser.add_argument('-w', '--whitelist-modules',
help='module to trace, may be specified multiple times [all]',
action='append', default=[])
parser.add_argument('-t', '--thread-id',
help='threads to trace, may be specified multiple times [all]',
action='append', type=int, default=[])
parser.add_argument('-D', '--device',
help='select a device by id [local]',
default='local')
args = parser.parse_args()
outfile = args.outfile
device = frida.get_device(args.device)
target = -1
for p in device.enumerate_processes():
if args.target in [str(p.pid), p.name]:
if target == -1:
target = p.pid
else:
print('[-] Warning: multiple processes on device match '
'\'%s\', using pid: %d' % (args.target, target))
if target == -1:
print('[-] Error: could not find process matching '
'\'%s\' on device \'%s\'' % (args.target, device.id))
sys.exit(1)
print('[*] Attaching to pid \'%d\' on device \'%s\'...' %
(target, device.id))
session = device.attach(target)
print('[+] Attached. Loading script...')
whitelist_modules = ['all']
if len(args.whitelist_modules):
whitelist_modules = args.whitelist_modules
threadlist = ['all']
if len(args.thread_id):
threadlist = args.thread_id
json_whitelist_modules = json.dumps(whitelist_modules)
json_threadlist = json.dumps(threadlist)
script = session.create_script(js % (json_whitelist_modules, json_threadlist))
event = threading.Event()
script.on('message', functools.partial(on_message, event))
script.load()
print('[*] Now collecting info, press enter to end trace')
try:
sys.stdin.read(1)
except KeyboardInterrupt:
pass
script.post({"type": "done"})
print('[*] Detaching, this might take a second...')
event.wait()
print('[+] Detached. Got %d basic blocks.' % len(bbs))
print('[*] Formatting coverage and saving...')
save_coverage()
print('[!] Done')
# session.detach and script.unload hang the process
# I don't like doing this but I can't figure out another way
os._exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment