Skip to content

Instantly share code, notes, and snippets.

@george-enf
Created January 23, 2023 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save george-enf/2d7cc288202b94defbe253f4ca26357b to your computer and use it in GitHub Desktop.
Save george-enf/2d7cc288202b94defbe253f4ca26357b to your computer and use it in GitHub Desktop.
test kmemleak
#include <linux/fs.h>
#include <linux/miscdevice.h>
#include <linux/mm.h>
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/vmalloc.h>
MODULE_LICENSE("GPL");
#define MAX_LEAKS (1 * 1024 * 1024)
struct alloc_cmd {
int count;
int *sizes;
};
struct range_cmd {
int offset;
int count;
};
struct leaks {
int idx;
void *addrs[MAX_LEAKS];
} leaks;
#define IOCTL_ALLOCATE _IOR('a', 1, struct alloc_cmd)
#define IOCTL_GET_ADDRESSES _IO('a', 2)
#define IOCTL_LEAK _IOR('a', 3, struct range_cmd)
#define IOCTL_FREE _IOR('a', 4, struct range_cmd)
#define IOCTL_RESET _IO('a', 5)
#define LEAK_XOR 0x1234000000000000ul
static long leakmod_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
switch (cmd) {
case IOCTL_ALLOCATE: {
struct alloc_cmd cmd;
int i;
if (copy_from_user(&cmd, (void *)arg, sizeof(cmd)))
return -EFAULT;
for (i = 0; i < cmd.count; i++) {
if (leaks.idx >= MAX_LEAKS)
return -EOVERFLOW;
if (cmd.sizes[i] < 0)
leaks.addrs[leaks.idx++] = vmalloc(-cmd.sizes[i]);
else
leaks.addrs[leaks.idx++] = kmalloc(cmd.sizes[i], GFP_KERNEL);
}
return 0;
}
case IOCTL_GET_ADDRESSES: {
if (copy_to_user((void *) arg, &leaks.addrs, leaks.idx * sizeof(leaks.addrs[0])))
return -EFAULT;
return 0;
}
case IOCTL_LEAK: {
struct range_cmd cmd;
int i;
if (copy_from_user(&cmd, (void *)arg, sizeof(cmd)))
return -EFAULT;
for (i = cmd.offset; i < cmd.offset + cmd.count; i++)
leaks.addrs[i] = (void *)((long)leaks.addrs[i] ^ LEAK_XOR);
return 0;
}
case IOCTL_FREE: {
struct range_cmd cmd;
void *p;
int i;
if (copy_from_user(&cmd, (void *)arg, sizeof(cmd)))
return -EFAULT;
for (i = cmd.offset; i < cmd.offset + cmd.count; i++) {
p = leaks.addrs[i];
if ((p >= (void *)PAGE_OFFSET) && (p < high_memory))
kfree(p);
else
vfree(p);
}
return 0;
}
case IOCTL_RESET:
leaks.idx = 0;
return 0;
}
return -ENOTTY;
}
static const struct file_operations leakmod_fops = {
.owner = THIS_MODULE,
.unlocked_ioctl = leakmod_ioctl,
};
static struct miscdevice leakmod_dev = {
MISC_DYNAMIC_MINOR,
"leakmod",
&leakmod_fops,
};
static int __init leakmod_init(void)
{
misc_register(&leakmod_dev);
return 0;
}
static void __exit leakmod_exit(void)
{
misc_deregister(&leakmod_dev);
}
module_init(leakmod_init);
module_exit(leakmod_exit);
#!/usr/bin/env python3
import ctypes
import fcntl
import os
import random
import re
import time
# Change MSECS_MIN_AGE to 1000 in mm/kmemleak.c or increase the following timeout:
TIMEOUT_SECONDS = 4
VERBOSE = False
_IOC_READ = 2
_IOC_NRBITS = 8
_IOC_TYPEBITS = 8
_IOC_SIZEBITS = 14
_IOC_NRSHIFT = 0
_IOC_TYPESHIFT = _IOC_NRSHIFT + _IOC_NRBITS
_IOC_SIZESHIFT = _IOC_TYPESHIFT + _IOC_TYPEBITS
_IOC_DIRSHIFT = _IOC_SIZESHIFT + _IOC_SIZEBITS
class AllocCmd(ctypes.Structure):
_fields_ = [
("count", ctypes.c_int),
("sizes", ctypes.POINTER(ctypes.c_int)),
]
class RangeCmd(ctypes.Structure):
_fields_ = [
("offset", ctypes.c_int),
("count", ctypes.c_int),
]
IOCTL_ALLOCATE = ord("a") << _IOC_TYPESHIFT | 1 << _IOC_NRSHIFT | _IOC_READ << _IOC_DIRSHIFT | ctypes.sizeof(AllocCmd) << _IOC_SIZESHIFT
IOCTL_GET_ADDRESSES = ord("a") << _IOC_TYPESHIFT | 2 << _IOC_NRSHIFT
IOCTL_LEAK = ord("a") << _IOC_TYPESHIFT | 3 << _IOC_NRSHIFT | _IOC_READ << _IOC_DIRSHIFT | ctypes.sizeof(RangeCmd) << _IOC_SIZESHIFT
IOCTL_FREE = ord("a") << _IOC_TYPESHIFT | 4 << _IOC_NRSHIFT | _IOC_READ << _IOC_DIRSHIFT | ctypes.sizeof(RangeCmd) << _IOC_SIZESHIFT
IOCTL_RESET = ord("a") << _IOC_TYPESHIFT | 5 << _IOC_NRSHIFT
def sort_addrs(addrs):
return sorted(addrs, key=lambda a: (a & 0xFFF, a))
def print_leaks(header, leaks):
if len(leaks) == 0:
return
print(f"{header} ({len(leaks)}):")
if not VERBOSE:
return
for i, a in enumerate(sort_addrs(leaks)):
print(f" {a:x}", flush=True)
if i < 10:
with open("/sys/kernel/debug/kmemleak", "w") as f:
f.write(f"dump=0x{a:x}")
def check_for_leaks(expected_leaks):
expected_leaks = set(expected_leaks)
for i in range(TIMEOUT_SECONDS):
with open("/sys/kernel/debug/kmemleak", "w") as f:
f.write("scan")
with open("/sys/kernel/debug/kmemleak") as f:
data = f.read()
detected_leaks = re.findall(r"unreferenced object 0x([0-9a-f]{16})", data)
detected_leaks = [int(x, 16) for x in detected_leaks]
assert len(set(detected_leaks)) == len(detected_leaks)
detected_leaks = set(detected_leaks)
if len(expected_leaks) > 0 and detected_leaks == expected_leaks:
break
time.sleep(1)
msg = f"expected leaks: {len(expected_leaks)}, detected leaks: {len(detected_leaks)}"
if detected_leaks == expected_leaks:
print(f"\x1b[32m[ OK ]: {msg}\x1b[0m")
else:
print(f"\x1b[31m[ FAIL ]: {msg}\x1b[0m")
print_leaks("expected but not found leaks", expected_leaks - detected_leaks)
print_leaks("found but not expected leaks", detected_leaks - expected_leaks)
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i : i + n]
def main():
fd = os.open("/dev/leakmod", os.O_RDONLY)
with open("/sys/kernel/debug/kmemleak", "w") as f:
f.write("stack=off")
sizes = [2**i for i in range(20)]
print(sizes)
allocs = []
for size in sizes:
count = max(16, min(8192, 512 * 1024 // size))
for vmalloc in [0, 1]:
allocs.extend([size if vmalloc == 0 else -size] * count)
seed = int(time.time())
print(seed)
random.seed(seed)
random.shuffle(allocs)
cmd = AllocCmd()
cmd.count = len(allocs)
cmd.sizes = (ctypes.c_int * len(allocs))(*allocs)
assert fcntl.ioctl(fd, IOCTL_ALLOCATE, cmd) == 0
cmd = RangeCmd()
cmd.offset = 0
cmd.count = len(allocs)
assert fcntl.ioctl(fd, IOCTL_FREE, cmd) == 0
assert fcntl.ioctl(fd, IOCTL_RESET) == 0
random.shuffle(allocs)
cmd = AllocCmd()
cmd.count = len(allocs)
cmd.sizes = (ctypes.c_int * len(allocs))(*allocs)
assert fcntl.ioctl(fd, IOCTL_ALLOCATE, cmd) == 0
check_for_leaks([])
addrs = (ctypes.c_ulong * len(allocs))()
assert fcntl.ioctl(fd, IOCTL_GET_ADDRESSES, addrs) == 0
addrs = [int(x) for x in addrs]
assert len(addrs) == len(allocs)
assert 0 not in addrs
assert len(addrs) == len(set(addrs))
offset = 0
for expected_leaks in chunks(addrs, 4096):
cmd = RangeCmd()
cmd.offset = offset
cmd.count = len(expected_leaks)
assert fcntl.ioctl(fd, IOCTL_LEAK, cmd) == 0
check_for_leaks(expected_leaks)
with open("/sys/kernel/debug/kmemleak", "w") as f:
f.write("clear")
assert fcntl.ioctl(fd, IOCTL_LEAK, cmd) == 0
assert fcntl.ioctl(fd, IOCTL_FREE, cmd) == 0
offset += len(expected_leaks)
assert fcntl.ioctl(fd, IOCTL_RESET) == 0
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment