-
-
Save kawamuray/a27aaa2d0668d0050537651978735321 to your computer and use it in GitHub Desktop.
Kafka broker performance degradation by mysterious JVM pause - Kafka meetup 2019-07
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import subprocess | |
import sys | |
SLOW_CALL_THRESHOLD = 1000 | |
def parse_probes(option): | |
options = option.split(",") | |
probes = [] | |
for option in options: | |
parts = option.split("/", 2) | |
if len(parts) == 2: | |
probes.append('module("%s").function("%s")' % (parts[0], parts[1])) | |
else: | |
probes.append('kernel.function("%s")' % parts[0]) | |
return probes | |
def genscript(entrypoint, probepoints): | |
joinedprobes = ",\n".join(probepoints) | |
joinedretprobes = ",\n".join(x + ".return" for x in probepoints) | |
return r""" | |
global entry_times[8192] | |
global durations // Key: tid() | |
global current_time_us | |
probe begin { | |
current_time_us = gettimeofday_us() | |
} | |
probe timer.ms(1) { | |
current_time_us = gettimeofday_us() | |
} | |
global indents | |
function indent_spaces(count) { | |
spaces = "" | |
for (i = 0; i < count; i++) { | |
spaces .= " " | |
} | |
return spaces | |
} | |
function alloc_frame_id() { | |
indent = indent_spaces(indents[tid()]) | |
indents[tid()] += 1 | |
return indent . ppfunc() | |
} | |
function dealloc_frame_id() { | |
count = indents[tid()] | |
indent = indent_spaces(count - 1) | |
if (count == 1) { | |
delete indents[tid()] | |
} else { | |
indents[tid()] -= 1 | |
} | |
return indent . ppfunc() | |
} | |
function in_rec() { // bool | |
return durations[tid()] != "" | |
} | |
probe %(entrypoints_entry)s { | |
if (pid() == target()) { | |
frmid = alloc_frame_id() | |
entry_times[tid(), frmid] = gettimeofday_us() | |
durations[tid()] = "\n" | |
} | |
} | |
probe %(entrypoints_return)s { | |
if (!in_rec()) { | |
next | |
} | |
frmid = dealloc_frame_id() | |
t0 = entry_times[tid(), frmid] | |
if (t0) { | |
elapsed = gettimeofday_us() - t0 | |
if (elapsed >= %(slow_call_threshold)s) { | |
printf("%%s took %%d us in total:", ppfunc(), elapsed) | |
printf("%%s\n", durations[tid()]) | |
} | |
delete durations[tid()] | |
delete entry_times[tid(), frmid] | |
} | |
} | |
probe | |
%(probepoints_entry)s | |
{ | |
if (in_rec()) { | |
frmid = alloc_frame_id() | |
entry_times[tid(), frmid] = gettimeofday_us() | |
} | |
} | |
probe | |
%(probepoints_return)s | |
{ | |
if (in_rec()) { | |
frmid = dealloc_frame_id() | |
tin = entry_times[tid(), frmid] | |
tout = gettimeofday_us() | |
elapsed = tout - tin | |
durations[tid()] .= sprintf(",%%s=%%d(in: %%d, out: %%d)", frmid, elapsed, tin, tout) | |
delete entry_times[tid(), frmid] | |
} | |
} | |
""" % { | |
"entrypoints_entry": entrypoint, | |
"entrypoints_return": entrypoint + ".return", | |
"probepoints_entry": joinedprobes, | |
"probepoints_return": joinedretprobes, | |
"slow_call_threshold": SLOW_CALL_THRESHOLD, | |
} | |
def main(): | |
entrypoint = sys.argv[1] | |
probepoints = parse_probes(sys.argv[2]) | |
script = genscript(entrypoint, probepoints) | |
print script | |
cmd = ["stap", "-v"] | |
cmd.extend(sys.argv[3:]) | |
cmd.append("-e") | |
cmd.append(script) | |
subprocess.call(cmd) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <fcntl.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <unistd.h> | |
#include <time.h> | |
#include <errno.h> | |
const size_t WRITE_SIZE = 1200; | |
const char *SIGNAL_PATH = "/tmp/justwrite-slow"; | |
const long long DURATION_THRESHOLD_US = 100000; | |
static long long duration_us(struct timespec *t0, struct timespec *te) { | |
return (te->tv_sec - t0->tv_sec) * 1000 * 1000 + (te->tv_nsec - t0->tv_nsec) / 1000; | |
} | |
static void signal_slow(long long elapsed_us) { | |
int fd; | |
fprintf(stderr, "[%ld] Signaling slow write %lld us\n", time(NULL), elapsed_us); | |
fd = open(SIGNAL_PATH, O_CREAT|O_EXCL); | |
if (fd == -1) { | |
if (errno == EEXIST) { | |
fprintf(stderr, "attempted to create a file, but it already exists\n"); | |
} else { | |
perror("open"); | |
} | |
return; | |
} | |
close(fd); | |
} | |
int main(int argc, char *argv[]) { | |
const char content[WRITE_SIZE]; | |
int fd; | |
size_t written; | |
struct timespec t0, te; | |
long long elapsed_us; | |
fd = open("./justwrite.out", O_CREAT|O_EXCL|O_WRONLY); | |
if (fd == -1) { | |
perror("open"); | |
exit(1); | |
} | |
while (1) { | |
if (clock_gettime(CLOCK_MONOTONIC, &t0) == -1) { | |
perror("clock_gettime"); | |
continue; | |
} | |
written = write(fd, content, sizeof(content)); | |
if (clock_gettime(CLOCK_MONOTONIC, &te) == -1) { | |
perror("clock_gettime"); | |
continue; | |
} | |
if (written == -1) { | |
perror("write"); | |
} else if (written < sizeof(content)) { | |
fprintf(stderr, "could write just %ld bytes while trying %ld bytes\n", written, sizeof(content)); | |
} | |
elapsed_us = duration_us(&t0, &te); | |
if (elapsed_us > DURATION_THRESHOLD_US) { | |
signal_slow(elapsed_us); | |
} | |
usleep(10000); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <linux/module.h> | |
#include <linux/kernel.h> | |
#include <linux/kprobes.h> | |
#include <linux/delay.h> | |
#include <linux/ptrace.h> | |
#include <linux/bio.h> | |
#include <linux/kallsyms.h> | |
static const dev_t TARGET_MAJOR = 8; | |
static const dev_t TARGET_MINOR = 3; | |
static const pid_t TARGET_PID = 1234; | |
// regs_get_argument_nth | |
static const unsigned long (*regs_getarg_fn)(struct pt_regs *regs, unsigned int n); | |
static struct kprobe kp = { | |
.symbol_name = "generic_make_request", | |
}; | |
static long long sampling; | |
static int handler_pre(struct kprobe *p, struct pt_regs *regs) { | |
struct bio *biop; | |
dev_t devn; | |
biop = (struct bio*) regs_getarg_fn(regs, 0); | |
devn = (biop == NULL) ? 0 : biop->bi_bdev->bd_dev; | |
printk(KERN_INFO "pre_handler (%d): p->addr = 0x%p, dev(major:minor) = %u:%u\n", current->pid, p->addr, MAJOR(devn), MINOR(devn)); | |
if (biop->bi_rw & WRITE && bio_rw_flagged(biop, BIO_RW_SYNCIO) && | |
MAJOR(devn) == TARGET_MAJOR && MINOR(devn) == TARGET_MINOR) { | |
if (sampling++ % 3 == 0) { | |
printk(KERN_INFO "throttling\n"); | |
mdelay(300); | |
} | |
} | |
return 0; | |
} | |
static int find_getarg_fn(void *fnvar, const char *name, struct module *mod, unsigned long addr) { | |
if (memcmp(name, "regs_get_argument_nth", sizeof("regs_get_argument_nth")) == 0) { | |
printk(KERN_INFO "Found address of regs_get_argument_nth: 0x%p\n", (void *)addr); | |
*(void **)fnvar = (void *)addr; | |
} | |
return 0; | |
} | |
static int __init kprobe_init(void) { | |
int ret; | |
// First find address of non exported but required function. | |
kallsyms_on_each_symbol(find_getarg_fn, ®s_getarg_fn); | |
if (regs_getarg_fn == NULL) { | |
printk(KERN_INFO "failed finding location of regs_get_argument_nth\n"); | |
return -1; | |
} | |
kp.pre_handler = handler_pre; | |
ret = register_kprobe(&kp); | |
if (ret < 0) { | |
printk(KERN_INFO "register_kprobe failed, returned %d\n", ret); | |
return ret; | |
} | |
printk(KERN_INFO "Planted kprobe at %p\n", kp.addr); | |
return 0; | |
} | |
static void __exit kprobe_exit(void) { | |
unregister_kprobe(&kp); | |
printk(KERN_INFO "kprobe at %p unregistered\n", kp.addr); | |
} | |
module_init(kprobe_init) | |
module_exit(kprobe_exit) | |
MODULE_LICENSE("GPL"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@define SLOW_CALL_THRESHOLD_MS %( $1 %) | |
global pidtask | |
global intime | |
global fdarg | |
# Return human-readable timestamp in microseconds procesion. | |
function ts() { | |
time_ms = gettimeofday_ms() | |
return sprintf("%s,%03d", ctime(time_ms / 1000), time_ms % 1000) | |
} | |
function fdpath(fd) { | |
if (fd < 0) { | |
return "UNKNOWN" | |
} | |
filest = task_fd_lookup(pidtask, fd) | |
if (!filest) { | |
return "UNKNOWN" | |
} | |
return fullpath_struct_file(pidtask, filest) | |
} | |
function on_call(fd) { | |
if (tid() == /* VM_THREADS_TID */) { | |
t0 = gettimeofday_ms() | |
intime[tid()] = t0 | |
fdarg[tid()] = fd | |
} | |
} | |
function on_return(name) { | |
t0 = intime[tid()] | |
if (t0) { | |
duration = gettimeofday_ms() - t0 | |
if (duration >= @SLOW_CALL_THRESHOLD_MS) { | |
printf("%s SYSCALL %s from PID/TID %d/%d for FILE %s took %d ms\n", ts(), name, pid(), tid(), fdpath(fdarg[tid()]), duration) | |
print_backtrace() | |
print_ubacktrace() | |
} | |
delete intime[tid()] | |
delete fdarg[tid()] | |
} | |
} | |
probe begin { | |
pidtask = pid2task(target()) | |
} | |
probe syscall.write { | |
on_call(fd) | |
} | |
probe syscall.write.return { | |
on_return(name) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment