Skip to content

Instantly share code, notes, and snippets.

@dktapps
Last active December 12, 2020 21:08
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save dktapps/dfef49b7548fec6468cff6c1e6c41bf9 to your computer and use it in GitHub Desktop.
Save dktapps/dfef49b7548fec6468cff6c1e6c41bf9 to your computer and use it in GitHub Desktop.
param (
[String[]] $include,
[alias("xclude")] [String[]] $exclude, #"xclude" allows using -x
[switch] $read = $false,
[switch] $write = $false
)
$call = "python tracer.py -U {0} -x `"*Function*`" -x `"*PacketData*`" -x `"*AttributeData*`" -x `"*android*`" -x `"*WithHeader*`" -x `"*NoHeader*`" com.mojang.minecraftpe"
$extraArgs = ""
if(($read -eq $false) -and ($write -eq $false)){
echo "Please specify -r or -w"
exit 1
}
function add_handler([string] $name, [bool] $handlerInclude){
$str = " -{0} `"_ZN*{1}*" -f $(if($handlerInclude){ "i" }else{ "x" }), $name
$result = ""
if($read){
$result += ($str + "read*`"")
}
if($write){
$result += ($str + "write*`"")
}
return $result
}
function add_include_handler([string] $name){
add_handler -name $name -handlerInclude $true
}
function add_exclude_handler([string] $name){
add_handler -name $name -handlerInclude $false
}
if($include.Count -eq 0){
$extraArgs += (add_include_handler "Packet") #Generic capture-everything filter
echo "Including all packets"
}else{
foreach($packet in $include){
$extraArgs += (add_include_handler $packet)
echo "Including $packet"
}
}
foreach($packet in $exclude){
$extraArgs += (add_exclude_handler $packet)
echo "Excluding $packet"
}
adb start-server
iex ($call -f $extraArgs)
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function
import base64
import binascii
import codecs
import os
import platform
import re
import subprocess
import threading
import time
from frida import FileMonitor
from frida_tools.model import Module, Function, ModuleFunction, ObjCMethod
def main():
from colorama import Fore, Style
from frida_tools.application import ConsoleApplication, input_with_timeout
class TracerApplication(ConsoleApplication, UI):
def __init__(self):
super(TracerApplication, self).__init__(self._await_ctrl_c)
self._palette = [Fore.CYAN, Fore.MAGENTA, Fore.YELLOW, Fore.GREEN, Fore.RED, Fore.BLUE]
self._next_color = 0
self._attributes_by_thread_id = {}
self._last_event_tid = -1
def _add_options(self, parser):
pb = TracerProfileBuilder()
def process_builder_arg(option, opt_str, value, parser, method, **kwargs):
method(value)
parser.add_option("-i", "--include", help="include FUNCTION", metavar="FUNCTION",
type='string', action='callback', callback=process_builder_arg, callback_args=(pb.include,))
parser.add_option("-x", "--exclude", help="exclude FUNCTION", metavar="FUNCTION",
type='string', action='callback', callback=process_builder_arg, callback_args=(pb.exclude,))
self._profile_builder = pb
def _usage(self):
return "usage: %prog [options] target"
def _initialize(self, parser, options, args):
self._tracer = None
self._targets = None
self._profile = self._profile_builder.build()
def _needs_target(self):
return True
def _start(self):
self._tracer = Tracer(self._reactor, self._profile, log_handler=self._log)
try:
self._targets = self._tracer.start_trace(self._session, self)
except Exception as e:
self._update_status("Failed to start tracing: {error}".format(error=e))
self._exit(1)
def _stop(self):
self._tracer.stop()
self._tracer = None
def _await_ctrl_c(self, reactor):
while reactor.is_running():
try:
input_with_timeout(0.5)
except KeyboardInterrupt:
break
def on_trace_progress(self, operation):
if operation == 'resolve':
self._update_status("Resolving functions...")
elif operation == 'instrument':
self._update_status("Instrumenting functions...")
elif operation == 'ready':
if len(self._targets) == 1:
plural = ""
else:
plural = "s"
for t in self._targets:
self._print('Hooked function', t)
self._print("Writing output to %s" % (self._tracer.file_path))
self._update_status("Started tracing %d function%s. Press Ctrl+C to stop." % (len(self._targets), plural))
self._resume()
def on_trace_error(self, error):
self._print(Fore.RED + Style.BRIGHT + "Error" + Style.RESET_ALL + ": " + error['message'])
app = TracerApplication()
app.run()
class TracerProfileBuilder(object):
_RE_REL_ADDRESS = re.compile("(?P<module>[^\s!]+)!(?P<offset>(0x)?[0-9a-fA-F]+)")
def __init__(self):
self._spec = []
def include(self, *function_name_globs):
for f in function_name_globs:
self._spec.append(('include', 'function', f))
return self
def exclude(self, *function_name_globs):
for f in function_name_globs:
self._spec.append(('exclude', 'function', f))
return self
def build(self):
return TracerProfile(self._spec)
class TracerProfile(object):
_BLACKLIST = set([
"libSystem.B.dylib!dyld_stub_binder"
])
def __init__(self, spec):
self._spec = spec
def resolve(self, session, log_handler=None):
script = session.create_script(name="profile-resolver", source=self._create_resolver_script())
script.set_log_handler(log_handler)
def on_message(message, data):
print(message)
script.on('message', on_message)
script.load()
try:
data = script.exports.resolve(self._spec)
finally:
script.unload()
modules = {}
for module_id, m in data['modules'].items():
module = Module(m['name'], int(m['base'], 16), m['size'], m['path'])
modules[int(module_id)] = module
working_set = []
for target in data['targets']:
objc = target.get('objc')
if objc is not None:
method = objc['method']
of = ObjCMethod(method['type'], objc['className'], method['name'], int(target['address'], 16))
working_set.append(of)
else:
name = target['name']
absolute_address = int(target['address'], 16)
module_id = target.get('module')
if module_id is not None:
module = modules[module_id]
relative_address = absolute_address - module.base_address
exported = not target.get('private', False)
mf = ModuleFunction(module, name, relative_address, exported)
if not self._is_blacklisted(mf):
working_set.append(mf)
else:
f = Function(name, absolute_address)
working_set.append(f)
return working_set
def _is_blacklisted(self, module_function):
key = module_function.module.name + "!" + module_function.name
return key in TracerProfile._BLACKLIST
def _create_resolver_script(self):
return r""""use strict";
rpc.exports = {
resolve: function (spec) {
var workingSet = spec.reduce(function (workingSet, item) {
var operation = item[0];
var scope = item[1];
var param = item[2];
switch (scope) {
case 'function':
if (operation === 'include')
workingSet = includeFunction(param, workingSet);
else if (operation === 'exclude')
workingSet = excludeFunction(param, workingSet);
break;
}
return workingSet;
}, {});
var modules = {};
var targets = [];
for (var address in workingSet) {
if (workingSet.hasOwnProperty(address)) {
var target = workingSet[address];
var moduleId = target.module;
if (moduleId !== undefined && !modules.hasOwnProperty(moduleId)) {
var m = allModules()[moduleId];
delete m._cachedFunctionExports;
modules[moduleId] = m;
}
targets.push(target);
}
}
return {
modules: modules,
targets: targets
};
}
};
function includeFunction(pattern, workingSet) {
moduleResolver().enumerateMatchesSync('exports:*!' + pattern).forEach(function (m) {
workingSet[m.address.toString()] = moduleExportFromMatch(m);
});
return workingSet;
}
function excludeFunction(pattern, workingSet) {
moduleResolver().enumerateMatchesSync('exports:*!' + pattern).forEach(function (m) {
delete workingSet[m.address.toString()];
});
return workingSet;
}
var cachedModuleResolver = null;
function moduleResolver() {
if (cachedModuleResolver === null)
cachedModuleResolver = new ApiResolver('module');
return cachedModuleResolver;
}
var cachedObjcResolver = null;
function objcResolver() {
if (cachedObjcResolver === null) {
try {
cachedObjcResolver = new ApiResolver('objc');
} catch (e) {
throw new Error("Objective-C runtime is not available");
}
}
return cachedObjcResolver;
}
var cachedModules = null;
function allModules() {
if (cachedModules === null) {
cachedModules = Process.enumerateModulesSync();
cachedModules._idByPath = cachedModules.reduce(function (mappings, module, index) {
mappings[module.path] = index;
return mappings;
}, {});
cachedModules._idByName = cachedModules.reduce(function (mappings, module, index) {
mappings[module.name] = index;
return mappings;
}, {});
}
return cachedModules;
}
function moduleExportFromMatch(m) {
var encodedName = m.name;
var delimiterIndex = encodedName.indexOf('!');
var modulePath = encodedName.substring(0, delimiterIndex);
var functionName = encodedName.substring(delimiterIndex + 1);
return {
name: functionName,
address: m.address,
module: allModules()._idByPath[modulePath]
};
}
"""
class Tracer(object):
def __init__(self, reactor, profile, log_handler=None):
self._reactor = reactor
self._profile = profile
self._script = None
self._log_handler = log_handler
file = 'mcpe_packet_log_' + str(time.time()) + '.log'
self.file_path = file
self._file = open(file, 'a')
print('Writing trace output to', file)
def start_trace(self, session, ui):
def on_message(message, data):
self._reactor.schedule(lambda: self._process_message(message, data, ui))
ui.on_trace_progress('resolve')
working_set = self._profile.resolve(session, log_handler=self._log_handler)
ui.on_trace_progress('instrument')
self._script = session.create_script(name="tracer", source=self._create_trace_script())
self._script.set_log_handler(self._log_handler)
self._script.on('message', on_message)
self._script.load()
for chunk in [working_set[i:i+1000] for i in range(0, len(working_set), 1000)]:
targets = [{
'name': function.name,
'absolute_address': hex(function.absolute_address),
'handler': generate_handler(function.name)
} for function in chunk]
self._script.exports.add(targets)
self._reactor.schedule(lambda: ui.on_trace_progress('ready'))
return working_set
def stop(self):
self._file.close()
if self._script is not None:
try:
self._script.unload()
except:
pass
self._script = None
def _create_trace_script(self):
return """"use strict";
var started = Date.now();
var handlers = {};
var state = {};
var pending = [];
var timer = null;
rpc.exports = {
dispose: function () {
flush();
},
add: function (targets) {
targets.forEach(function (target) {
var h = [parseHandler(target)];
var name = target.name;
var targetAddress = target.absolute_address;
target = null;
handlers[targetAddress] = h;
function invokeCallback(callback, context, param) {
if (callback === undefined)
return;
var timestamp = Date.now() - started;
var threadId = context.threadId;
var depth = context.depth;
function log(message) {
emit([timestamp, threadId, depth, targetAddress, message]);
}
callback.call(context, log, param, state);
}
try {
Interceptor.attach(ptr(targetAddress), {
onEnter: function (args) {
invokeCallback(h[0].onEnter, this, args);
},
onLeave: function (retval) {
invokeCallback(h[0].onLeave, this, retval);
}
});
} catch (e) {
send({
from: "/targets",
name: '+error',
payload: {
message: "Skipping '" + name + "': " + e.message
}
});
}
});
}
};
function emit(event) {
pending.push(event);
if (timer === null)
timer = setTimeout(flush, 50);
}
function flush() {
if (timer !== null) {
clearTimeout(timer);
timer = null;
}
if (pending.length === 0)
return;
var items = pending;
pending = [];
send({
from: "/events",
name: '+add',
payload: {
items: items
}
});
}
function parseHandler(target) {
try {
return (1, eval)("(" + target.handler + ")");
} catch (e) {
send({
from: "/targets",
name: '+error',
payload: {
message: "Invalid handler for '" + target.name + "': " + e.message
}
});
return {};
}
}
"""
def _process_message(self, message, data, ui):
handled = False
if message['type'] == 'send':
stanza = message['payload']
if isinstance(stanza, dict):
if 'pk_type' in stanza and 'buffer' in stanza and 'pk_name' in stanza:
self._file.write(stanza['pk_type'] + ':')
self._file.write(base64.b64encode(bytearray(stanza['buffer'].values())).decode('utf-8'))
self._file.write('\n')
handled = True
if not handled:
print(message)
def read_mangled_name_len(function_name, start):
len = 0
while function_name[start].isdigit():
len = len * 10
len += int(function_name[start])
start += 1
return start, len
def demangle_hack(function_name):
if not function_name.startswith('_Z'):
return None
demangle_chars = ["Z", "N", "K", "V", "O", "R"]
start = 1
while function_name[start] in demangle_chars:
start += 1
start, len = read_mangled_name_len(function_name, start)
class_name = function_name[start:(start+len)]
start += len
start, len = read_mangled_name_len(function_name, start)
fn_name = function_name[start:(start+len)]
return class_name, fn_name
def generate_handler(function_name):
packet_name, fn_name = demangle_hack(function_name)
if packet_name is None or fn_name is None:
return None
if fn_name != 'read' and fn_name != 'write':
return None
if not 'Packet' in packet_name:
return None
return """
{
onEnter: function (log, args, state) {
this.pointer = args[1];
},
onLeave: function (log, retval, state) {
realAddr = Memory.readPointer( //addr of actual byte array
Memory.readPointer( //addr of basic_string
this.pointer.add(12) //addr of ptr to basic_string
)
)
//Length is offset -12 from start of byte array
rlen = Memory.readU32(realAddr.sub(12))
uint8arr = new Uint8Array(Memory.readByteArray(realAddr, rlen))
send({
pk_type: '%(pk_type)s',
pk_name: '%(pk_name)s',
buffer: uint8arr
})
}
}""" % {'pk_type': fn_name, 'pk_name': packet_name}
class UI(object):
def on_trace_progress(self, operation):
pass
def on_trace_error(self, error):
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment