Skip to content

Instantly share code, notes, and snippets.

@timmaxw
Created January 9, 2023 20:30
Show Gist options
  • Save timmaxw/ef46ebcb8e679af9076ca91a2975f8af to your computer and use it in GitHub Desktop.
Save timmaxw/ef46ebcb8e679af9076ca91a2975f8af to your computer and use it in GitHub Desktop.
lagcurse.py: randomly introduce lag into a running process
#!/usr/bin/env python3
"""
lagcurse.py uses the Linux ptrace() API to pause/unpause threads of the given
process at random intervals. This can be helpful for fuzzing race conditions or
simulating CPU contention. See "lagcurse.py -h" for usage information.
"""
import argparse
import ctypes
from pathlib import Path
import random
import sys
import time
libc = ctypes.CDLL("libc.so.6", use_errno=True)
c_pid_t = ctypes.c_int
c_enum = ctypes.c_int
libc.ptrace.argtypes = [c_enum, c_pid_t, ctypes.c_void_p, ctypes.c_void_p]
libc.ptrace.restype = ctypes.c_long
def check_errno(res, func, args):
if res == -1:
raise OSError(ctypes.get_errno(), f"error when calling ptrace")
libc.ptrace.errcheck = check_errno
PTRACE_ATTACH = 16
PTRACE_DETACH = 17
class VictimThread(object):
def __init__(self, parent, tid):
self.parent = parent
self.tid = tid
self.attached = False
self.schedule()
def schedule(self):
if self.attached:
self.next_time = time.time() + \
random.random() * (self.parent.max_lag_ms / 1000)
else:
self.next_time = time.time() + \
random.random() * (self.parent.max_lag_ms / 1000) * \
(100 - self.parent.percent_lag) / self.parent.percent_lag
def notify(self):
try:
if self.attached:
libc.ptrace(PTRACE_DETACH, self.tid, None, None)
else:
libc.ptrace(PTRACE_ATTACH, self.tid, None, None)
except ProcessLookupError:
# This can happen if the thread disappered since the last call to
# Victim.update_threads(). Ignore the error, and the next call to
# update_threads() will remove this Victim.
pass
self.attached = not self.attached
self.schedule()
class Victim(object):
def __init__(self, pid, *, max_lag_ms, percent_lag):
self.pid = pid
self.max_lag_ms = max_lag_ms
self.percent_lag = percent_lag
self.threads = {}
def curse(self):
try:
while True:
self.update_threads()
next_thread = None
for thread in self.threads.values():
if next_thread is None or thread.next_time < next_thread.next_time:
next_thread = thread
if next_thread:
time.sleep(max(0, next_thread.next_time - time.time()))
next_thread.notify()
else:
time.sleep(0.1)
finally:
# detach cleanly if we get KeyboardInterrupt
for thread in self.threads.values():
if thread.attached:
try:
libc.ptrace(PTRACE_DETACH, thread.tid, None, None)
except ProcessLookupError:
pass
def update_threads(self):
new_threads = {}
for tid_path in Path(f"/proc/{self.pid}/task").iterdir():
tid = int(tid_path.stem)
if tid in self.threads:
new_threads[tid] = self.threads[tid]
else:
new_threads[tid] = VictimThread(self, tid)
self.threads = new_threads
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="lagcurse.py",
description="lagcurse.py uses the Linux ptrace() API to pause/unpause " +
"threads of the given process at random intervals. This can be helpful " +
"for fuzzing race conditions or simulating CPU contention."
)
parser.add_argument("pid")
parser.add_argument("-m", "--max-lag-ms", type=int, default=100,
help="Maximum length of a single lag spike, in milliseconds. The lengths " +
"of lag spikes will be randomly selected between 0ms and the maximum. " +
"The default maximum is 100ms.")
parser.add_argument("-p", "--percent-lag", type=int, default=50,
help="On average, the lag spikes on each thread will add up to " +
"PERCENT_LAG percent of the thread's lifetime. For example, if " +
"PERCENT_LAG=50, the thread will effectively run 50%% slower." +
"Default is 50%%.")
args = parser.parse_args()
assert args.max_lag_ms > 0
assert 1 <= args.percent_lag <= 99
victim = Victim(args.pid, max_lag_ms=args.max_lag_ms, percent_lag=args.percent_lag)
print(f"Cursing {args.pid}...")
try:
victim.curse()
except KeyboardInterrupt:
print("Exiting.")
except PermissionError:
print("Error: Insufficient permission to ptrace given process. Try running under sudo?")
exit(1)
except FileNotFoundError:
print(f"Error: No such process {victim.pid}.")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment