Skip to content

Instantly share code, notes, and snippets.

@patois
Forked from pellaeon/child-gating-poc.py
Created November 15, 2022 19:11
Show Gist options
  • Save patois/1fede18833d5e6e62e75d915e3621c8f to your computer and use it in GitHub Desktop.
Save patois/1fede18833d5e6e62e75d915e3621c8f to your computer and use it in GitHub Desktop.
Frida child-gating and spawn-gating example
"""
This POC is based on example from https://frida.re/news/#child-gating
and is aimed to instrument child processes along with the main one.
"""
from __future__ import print_function
import frida
from frida_tools.application import Reactor
import threading
class Application(object):
def __init__(self):
self._stop_requested = threading.Event()
self._reactor = Reactor(run_until_return=lambda _:
self._stop_requested.wait())
self._device = frida.get_usb_device(timeout=5)
self._sessions = set()
self._device.on("child-added", lambda child:
self._reactor.schedule(
lambda: self._on_delivered(child)))
def run(self):
self._reactor.schedule(lambda: self._start())
self._reactor.run()
def _start(self):
app_package_name = "com.example.app"
pid = self._device.spawn(app_package_name)
print("✔ spawn({})".format(app_package_name))
self._instrument(pid)
def _stop_if_idle(self):
if len(self._sessions) == 0:
self._stop_requested.set()
def _instrument(self, pid):
print("✔ attach(pid={})".format(pid))
session = self._device.attach(pid)
session.on("detached", lambda reason:
self._reactor.schedule(lambda:
self._on_detached(pid, session, reason)))
print("✔ enable_child_gating()")
session.enable_child_gating()
print("✔ create_script()")
script = session.create_script("""
Interceptor.attach(Module.findExportByName(null, 'open'), {
onEnter: function (args) {
send({
type: 'open',
path: Memory.readUtf8String(args[0])
});
}
});
""")
script.on("message", lambda message, data:
self._reactor.schedule(
lambda: self._on_message(pid, message)))
print("✔ load()")
script.load()
print("✔ resume(pid={})".format(pid))
self._device.resume(pid)
self._sessions.add(session)
def _on_delivered(self, child):
print("⚡ child-added: {}".format(child))
self._instrument(child.pid)
def _on_detached(self, pid, session, reason):
print("⚡ detached: pid={}, reason='{}'"
.format(pid, reason))
self._sessions.remove(session)
self._reactor.schedule(self._stop_if_idle, delay=0.5)
def _on_message(self, pid, message):
print("⚡ message: pid={}, payload={}"
.format(pid, message["payload"]))
app = Application()
app.run()
"""
This POC is based on example from https://frida.re/news/#child-gating
and is modified to use spawn-gating to catch any newly spawned process on the device
Background:
1. Android apps can declare in AndroidManifest.xml to put some components
(such as a `Service`) into a separate process using the `android:process`
directive. See: https://developer.android.com/guide/topics/manifest/service-element
2. In such cases, the new process will be spawned by the system, instead of
by the app's main process. I.e. the new process will have `zygote` as
its parent process.
3. Thus, child-gating is not able to catch these new processes
The logic to catch those processes are simply to catch all newly spawned
process on the system, then filter by its identifier, which should start
with the app's package name.
Ref:
- https://github.com/frida/frida/issues/602
- https://gist.github.com/oleavr/ae7bcbbb9179852a4731
- https://developer.android.com/guide/components/processes-and-threads
- https://developer.android.com/guide/components/activities/process-lifecycle
"""
from __future__ import print_function
import frida
from frida_tools.application import Reactor
import threading
app_package_name = "com.example.app"
class Application(object):
def __init__(self):
self._stop_requested = threading.Event()
self._reactor = Reactor(run_until_return=lambda _:
self._stop_requested.wait())
self._device = frida.get_usb_device(timeout=5)
print("✔ enable_spawn_gating()")
self._device.enable_spawn_gating()
self._sessions = set()
self._device.on("spawn-added", lambda child:
self._reactor.schedule(
lambda: self._on_delivered(child)))
def run(self):
self._reactor.schedule(lambda: self._start())
self._reactor.run()
def _start(self):
pid = self._device.spawn(app_package_name)
print("✔ spawn({})".format(app_package_name))
self._instrument(pid)
def _stop_if_idle(self):
if len(self._sessions) == 0:
self._stop_requested.set()
def _instrument(self, pid):
print("✔ attach(pid={})".format(pid))
session = self._device.attach(pid)
session.on("detached", lambda reason:
self._reactor.schedule(lambda:
self._on_detached(pid, session, reason)))
print("✔ create_script()")
script = session.create_script("""
Interceptor.attach(Module.findExportByName(null, 'open'), {
onEnter: function (args) {
send({
type: 'open',
path: Memory.readUtf8String(args[0])
});
}
});
""")
script.on("message", lambda message, data:
self._reactor.schedule(
lambda: self._on_message(pid, message)))
print("✔ load()")
script.load()
print("✔ resume(pid={})".format(pid))
self._device.resume(pid)
self._sessions.add(session)
def _on_delivered(self, child):
print("⚡ child-added: {}".format(child))
if child.identifier.startswith(app_package_name):
self._instrument(child.pid)
def _on_detached(self, pid, session, reason):
print("⚡ detached: pid={}, reason='{}'"
.format(pid, reason))
self._sessions.remove(session)
self._reactor.schedule(self._stop_if_idle, delay=0.5)
def _on_message(self, pid, message):
print("⚡ message: pid={}, payload={}"
.format(pid, message["payload"]))
app = Application()
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment