Skip to content

Instantly share code, notes, and snippets.

@kawamuray
Last active March 29, 2020 08:25
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kawamuray/a27aaa2d0668d0050537651978735321 to your computer and use it in GitHub Desktop.
Save kawamuray/a27aaa2d0668d0050537651978735321 to your computer and use it in GitHub Desktop.
Kafka broker performance degradation by mysterious JVM pause - Kafka meetup 2019-07
#!/usr/bin/env python
import subprocess
import sys
SLOW_CALL_THRESHOLD = 1000
def parse_probes(option):
options = option.split(",")
probes = []
for option in options:
parts = option.split("/", 2)
if len(parts) == 2:
probes.append('module("%s").function("%s")' % (parts[0], parts[1]))
else:
probes.append('kernel.function("%s")' % parts[0])
return probes
def genscript(entrypoint, probepoints):
joinedprobes = ",\n".join(probepoints)
joinedretprobes = ",\n".join(x + ".return" for x in probepoints)
return r"""
global entry_times[8192]
global durations // Key: tid()
global current_time_us
probe begin {
current_time_us = gettimeofday_us()
}
probe timer.ms(1) {
current_time_us = gettimeofday_us()
}
global indents
function indent_spaces(count) {
spaces = ""
for (i = 0; i < count; i++) {
spaces .= " "
}
return spaces
}
function alloc_frame_id() {
indent = indent_spaces(indents[tid()])
indents[tid()] += 1
return indent . ppfunc()
}
function dealloc_frame_id() {
count = indents[tid()]
indent = indent_spaces(count - 1)
if (count == 1) {
delete indents[tid()]
} else {
indents[tid()] -= 1
}
return indent . ppfunc()
}
function in_rec() { // bool
return durations[tid()] != ""
}
probe %(entrypoints_entry)s {
if (pid() == target()) {
frmid = alloc_frame_id()
entry_times[tid(), frmid] = gettimeofday_us()
durations[tid()] = "\n"
}
}
probe %(entrypoints_return)s {
if (!in_rec()) {
next
}
frmid = dealloc_frame_id()
t0 = entry_times[tid(), frmid]
if (t0) {
elapsed = gettimeofday_us() - t0
if (elapsed >= %(slow_call_threshold)s) {
printf("%%s took %%d us in total:", ppfunc(), elapsed)
printf("%%s\n", durations[tid()])
}
delete durations[tid()]
delete entry_times[tid(), frmid]
}
}
probe
%(probepoints_entry)s
{
if (in_rec()) {
frmid = alloc_frame_id()
entry_times[tid(), frmid] = gettimeofday_us()
}
}
probe
%(probepoints_return)s
{
if (in_rec()) {
frmid = dealloc_frame_id()
tin = entry_times[tid(), frmid]
tout = gettimeofday_us()
elapsed = tout - tin
durations[tid()] .= sprintf(",%%s=%%d(in: %%d, out: %%d)", frmid, elapsed, tin, tout)
delete entry_times[tid(), frmid]
}
}
""" % {
"entrypoints_entry": entrypoint,
"entrypoints_return": entrypoint + ".return",
"probepoints_entry": joinedprobes,
"probepoints_return": joinedretprobes,
"slow_call_threshold": SLOW_CALL_THRESHOLD,
}
def main():
entrypoint = sys.argv[1]
probepoints = parse_probes(sys.argv[2])
script = genscript(entrypoint, probepoints)
print script
cmd = ["stap", "-v"]
cmd.extend(sys.argv[3:])
cmd.append("-e")
cmd.append(script)
subprocess.call(cmd)
if __name__ == "__main__":
main()
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>
const size_t WRITE_SIZE = 1200;
const char *SIGNAL_PATH = "/tmp/justwrite-slow";
const long long DURATION_THRESHOLD_US = 100000;
static long long duration_us(struct timespec *t0, struct timespec *te) {
return (te->tv_sec - t0->tv_sec) * 1000 * 1000 + (te->tv_nsec - t0->tv_nsec) / 1000;
}
static void signal_slow(long long elapsed_us) {
int fd;
fprintf(stderr, "[%ld] Signaling slow write %lld us\n", time(NULL), elapsed_us);
fd = open(SIGNAL_PATH, O_CREAT|O_EXCL);
if (fd == -1) {
if (errno == EEXIST) {
fprintf(stderr, "attempted to create a file, but it already exists\n");
} else {
perror("open");
}
return;
}
close(fd);
}
int main(int argc, char *argv[]) {
const char content[WRITE_SIZE];
int fd;
size_t written;
struct timespec t0, te;
long long elapsed_us;
fd = open("./justwrite.out", O_CREAT|O_EXCL|O_WRONLY);
if (fd == -1) {
perror("open");
exit(1);
}
while (1) {
if (clock_gettime(CLOCK_MONOTONIC, &t0) == -1) {
perror("clock_gettime");
continue;
}
written = write(fd, content, sizeof(content));
if (clock_gettime(CLOCK_MONOTONIC, &te) == -1) {
perror("clock_gettime");
continue;
}
if (written == -1) {
perror("write");
} else if (written < sizeof(content)) {
fprintf(stderr, "could write just %ld bytes while trying %ld bytes\n", written, sizeof(content));
}
elapsed_us = duration_us(&t0, &te);
if (elapsed_us > DURATION_THRESHOLD_US) {
signal_slow(elapsed_us);
}
usleep(10000);
}
}
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kprobes.h>
#include <linux/delay.h>
#include <linux/ptrace.h>
#include <linux/bio.h>
#include <linux/kallsyms.h>
static const dev_t TARGET_MAJOR = 8;
static const dev_t TARGET_MINOR = 3;
static const pid_t TARGET_PID = 1234;
// regs_get_argument_nth
static const unsigned long (*regs_getarg_fn)(struct pt_regs *regs, unsigned int n);
static struct kprobe kp = {
.symbol_name = "generic_make_request",
};
static long long sampling;
static int handler_pre(struct kprobe *p, struct pt_regs *regs) {
struct bio *biop;
dev_t devn;
biop = (struct bio*) regs_getarg_fn(regs, 0);
devn = (biop == NULL) ? 0 : biop->bi_bdev->bd_dev;
printk(KERN_INFO "pre_handler (%d): p->addr = 0x%p, dev(major:minor) = %u:%u\n", current->pid, p->addr, MAJOR(devn), MINOR(devn));
if (biop->bi_rw & WRITE && bio_rw_flagged(biop, BIO_RW_SYNCIO) &&
MAJOR(devn) == TARGET_MAJOR && MINOR(devn) == TARGET_MINOR) {
if (sampling++ % 3 == 0) {
printk(KERN_INFO "throttling\n");
mdelay(300);
}
}
return 0;
}
static int find_getarg_fn(void *fnvar, const char *name, struct module *mod, unsigned long addr) {
if (memcmp(name, "regs_get_argument_nth", sizeof("regs_get_argument_nth")) == 0) {
printk(KERN_INFO "Found address of regs_get_argument_nth: 0x%p\n", (void *)addr);
*(void **)fnvar = (void *)addr;
}
return 0;
}
static int __init kprobe_init(void) {
int ret;
// First find address of non exported but required function.
kallsyms_on_each_symbol(find_getarg_fn, &regs_getarg_fn);
if (regs_getarg_fn == NULL) {
printk(KERN_INFO "failed finding location of regs_get_argument_nth\n");
return -1;
}
kp.pre_handler = handler_pre;
ret = register_kprobe(&kp);
if (ret < 0) {
printk(KERN_INFO "register_kprobe failed, returned %d\n", ret);
return ret;
}
printk(KERN_INFO "Planted kprobe at %p\n", kp.addr);
return 0;
}
static void __exit kprobe_exit(void) {
unregister_kprobe(&kp);
printk(KERN_INFO "kprobe at %p unregistered\n", kp.addr);
}
module_init(kprobe_init)
module_exit(kprobe_exit)
MODULE_LICENSE("GPL");
@define SLOW_CALL_THRESHOLD_MS %( $1 %)
global pidtask
global intime
global fdarg
# Return human-readable timestamp in microseconds procesion.
function ts() {
time_ms = gettimeofday_ms()
return sprintf("%s,%03d", ctime(time_ms / 1000), time_ms % 1000)
}
function fdpath(fd) {
if (fd < 0) {
return "UNKNOWN"
}
filest = task_fd_lookup(pidtask, fd)
if (!filest) {
return "UNKNOWN"
}
return fullpath_struct_file(pidtask, filest)
}
function on_call(fd) {
if (tid() == /* VM_THREADS_TID */) {
t0 = gettimeofday_ms()
intime[tid()] = t0
fdarg[tid()] = fd
}
}
function on_return(name) {
t0 = intime[tid()]
if (t0) {
duration = gettimeofday_ms() - t0
if (duration >= @SLOW_CALL_THRESHOLD_MS) {
printf("%s SYSCALL %s from PID/TID %d/%d for FILE %s took %d ms\n", ts(), name, pid(), tid(), fdpath(fdarg[tid()]), duration)
print_backtrace()
print_ubacktrace()
}
delete intime[tid()]
delete fdarg[tid()]
}
}
probe begin {
pidtask = pid2task(target())
}
probe syscall.write {
on_call(fd)
}
probe syscall.write.return {
on_return(name)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment