Created
January 9, 2023 20:30
-
-
Save timmaxw/ef46ebcb8e679af9076ca91a2975f8af to your computer and use it in GitHub Desktop.
lagcurse.py: randomly introduce lag into a running process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
lagcurse.py uses the Linux ptrace() API to pause/unpause threads of the given | |
process at random intervals. This can be helpful for fuzzing race conditions or | |
simulating CPU contention. See "lagcurse.py -h" for usage information. | |
""" | |
import argparse | |
import ctypes | |
from pathlib import Path | |
import random | |
import sys | |
import time | |
libc = ctypes.CDLL("libc.so.6", use_errno=True) | |
c_pid_t = ctypes.c_int | |
c_enum = ctypes.c_int | |
libc.ptrace.argtypes = [c_enum, c_pid_t, ctypes.c_void_p, ctypes.c_void_p] | |
libc.ptrace.restype = ctypes.c_long | |
def check_errno(res, func, args): | |
if res == -1: | |
raise OSError(ctypes.get_errno(), f"error when calling ptrace") | |
libc.ptrace.errcheck = check_errno | |
PTRACE_ATTACH = 16 | |
PTRACE_DETACH = 17 | |
class VictimThread(object): | |
def __init__(self, parent, tid): | |
self.parent = parent | |
self.tid = tid | |
self.attached = False | |
self.schedule() | |
def schedule(self): | |
if self.attached: | |
self.next_time = time.time() + \ | |
random.random() * (self.parent.max_lag_ms / 1000) | |
else: | |
self.next_time = time.time() + \ | |
random.random() * (self.parent.max_lag_ms / 1000) * \ | |
(100 - self.parent.percent_lag) / self.parent.percent_lag | |
def notify(self): | |
try: | |
if self.attached: | |
libc.ptrace(PTRACE_DETACH, self.tid, None, None) | |
else: | |
libc.ptrace(PTRACE_ATTACH, self.tid, None, None) | |
except ProcessLookupError: | |
# This can happen if the thread disappered since the last call to | |
# Victim.update_threads(). Ignore the error, and the next call to | |
# update_threads() will remove this Victim. | |
pass | |
self.attached = not self.attached | |
self.schedule() | |
class Victim(object): | |
def __init__(self, pid, *, max_lag_ms, percent_lag): | |
self.pid = pid | |
self.max_lag_ms = max_lag_ms | |
self.percent_lag = percent_lag | |
self.threads = {} | |
def curse(self): | |
try: | |
while True: | |
self.update_threads() | |
next_thread = None | |
for thread in self.threads.values(): | |
if next_thread is None or thread.next_time < next_thread.next_time: | |
next_thread = thread | |
if next_thread: | |
time.sleep(max(0, next_thread.next_time - time.time())) | |
next_thread.notify() | |
else: | |
time.sleep(0.1) | |
finally: | |
# detach cleanly if we get KeyboardInterrupt | |
for thread in self.threads.values(): | |
if thread.attached: | |
try: | |
libc.ptrace(PTRACE_DETACH, thread.tid, None, None) | |
except ProcessLookupError: | |
pass | |
def update_threads(self): | |
new_threads = {} | |
for tid_path in Path(f"/proc/{self.pid}/task").iterdir(): | |
tid = int(tid_path.stem) | |
if tid in self.threads: | |
new_threads[tid] = self.threads[tid] | |
else: | |
new_threads[tid] = VictimThread(self, tid) | |
self.threads = new_threads | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
prog="lagcurse.py", | |
description="lagcurse.py uses the Linux ptrace() API to pause/unpause " + | |
"threads of the given process at random intervals. This can be helpful " + | |
"for fuzzing race conditions or simulating CPU contention." | |
) | |
parser.add_argument("pid") | |
parser.add_argument("-m", "--max-lag-ms", type=int, default=100, | |
help="Maximum length of a single lag spike, in milliseconds. The lengths " + | |
"of lag spikes will be randomly selected between 0ms and the maximum. " + | |
"The default maximum is 100ms.") | |
parser.add_argument("-p", "--percent-lag", type=int, default=50, | |
help="On average, the lag spikes on each thread will add up to " + | |
"PERCENT_LAG percent of the thread's lifetime. For example, if " + | |
"PERCENT_LAG=50, the thread will effectively run 50%% slower." + | |
"Default is 50%%.") | |
args = parser.parse_args() | |
assert args.max_lag_ms > 0 | |
assert 1 <= args.percent_lag <= 99 | |
victim = Victim(args.pid, max_lag_ms=args.max_lag_ms, percent_lag=args.percent_lag) | |
print(f"Cursing {args.pid}...") | |
try: | |
victim.curse() | |
except KeyboardInterrupt: | |
print("Exiting.") | |
except PermissionError: | |
print("Error: Insufficient permission to ptrace given process. Try running under sudo?") | |
exit(1) | |
except FileNotFoundError: | |
print(f"Error: No such process {victim.pid}.") | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment