Skip to content

Instantly share code, notes, and snippets.

@SaveTheRbtz
Last active December 25, 2020 03:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save SaveTheRbtz/287511efe881f921d1198bd08cc18f97 to your computer and use it in GitHub Desktop.
Save SaveTheRbtz/287511efe881f921d1198bd08cc18f97 to your computer and use it in GitHub Desktop.
affinitization scripts, originally written by @behebot
#!/usr/bin/env python
# mypy: allow-untyped-defs
"""
This script is used for applying affinity settings for various hardware devices.
Script originally based on Intel's [set_irq_affinity.sh](https://gist.github.com/SaveTheRbtz/8875474).
Over the years it was updated with heuristics based on the shape of Dropbox infrastructure.
Currently, this script can manage IRQ affinities, RPS, XPS, and RXQS. For the description of
these scaling settings please use [kernel's scaling.txt](https://www.kernel.org/doc/Documentation/networking/scaling.txt).
"""
import argparse
import logging
import re
import sys
from argparse import Namespace
from multiprocessing import cpu_count as get_cpu_count
from lib import (
affinitize,
get_network_devices,
get_vectors,
MODE_NETWORK,
MODE_STORAGE,
NETWORK_PATTERNS,
packet_steering,
PS_AUTO,
PS_OFF,
PS_ON,
STORAGE_PATTERNS,
)
PS_CHOICES = (PS_ON, PS_OFF, PS_AUTO)
FORMAT = "%(asctime)s\t%(levelname)s\t%(message)s"
logging.basicConfig(format=FORMAT)
LOG = logging.getLogger()
def parse_ranges(string):
parsed_range = set()
cpu_count = get_cpu_count()
for r in string.split(","):
m = re.match(r"^(\d+)$", r)
if m:
parsed_range.add(int(m.group(1)))
continue
m = re.match(r"^(\d+)-(\d+|max)$", r)
if m:
start = int(m.group(1))
end = m.group(2)
if end == "max":
end = cpu_count - 1
else:
end = int(end)
temp_range = list(range(start, end + 1))
for i in temp_range:
parsed_range.add(i)
continue
raise argparse.ArgumentTypeError(
"'" + string + "' is not a list "
"of range of numbers. Expected forms like '0,1,2-5' or '3'. "
)
return parsed_range
def parse_args():
# type: () -> Namespace
parser = argparse.ArgumentParser(
description="Affinitize all the stuff", epilog=__doc__
)
mode_subparser = parser.add_subparsers(
dest="mode", help="Type of device to affinitize"
)
network_parser = mode_subparser.add_parser(MODE_NETWORK, help="affinitize NICs")
network_parser.add_argument(
"--rps",
choices=PS_CHOICES,
default=PS_AUTO,
help="enable Receive Packet Steering [default: %(default)s]",
)
network_parser.add_argument(
"--xps",
choices=PS_CHOICES,
default=PS_ON,
help="enable Transmit Packet Steering [default: %(default)s]",
)
# TODO(rbtz): Receive Flow Steering
# TODO(rbtz): NUMA affinity
mode_subparser.add_parser(MODE_STORAGE, help="affinitize RAIDs")
parser.add_argument(
"--dry-run",
help="perform a dry run. do not apply changes",
action="store_true",
default=False,
)
parser.add_argument(
"--check",
help="check whether any modifications will be applied;"
"exits with non-zero 1 if there are any pending canhges;"
"implies dry-run",
action="store_true",
default=False,
)
parser.add_argument("--debug", help="print debug information", action="store_true")
# TODO(rbtz): Add filters by device name (e.g. apply tunings only to `eth0` or `em*`)
parser.add_argument(
"--cpuset",
help="use following cpuset to spread interrupts",
type=parse_ranges,
default=range(get_cpu_count()),
)
return parser.parse_args()
def main(args):
# type: (Namespace) -> int
if args.debug:
LOG.setLevel(logging.DEBUG)
cpu_count = get_cpu_count()
if cpu_count == 1:
LOG.critical("No need to affinitize, there is only one CPU presented.")
return 0
if len(args.cpuset) > cpu_count:
LOG.critical("More CPUs specified in --cpuset than actually present in box.")
return 1
for i in args.cpuset:
if i > cpu_count - 1:
# We start from CPU #0
LOG.critical(
"CPU #{} was specified in --cpuset but it doesn't exist.".format(i)
)
return 1
if args.mode == MODE_STORAGE:
patterns = STORAGE_PATTERNS
elif args.mode == MODE_NETWORK:
patterns = NETWORK_PATTERNS
else:
raise AssertionError("Unknown mode: {}".format(args.mode))
if args.check:
args.dry_run = True
LOG.debug(
"Preforming check. Will return non-zero if any pending changes are detected."
)
if args.dry_run:
LOG.debug("Performing dry run. No changes will be applied.")
LOG.debug(
"{} mode selected. Will consider vectors matching: {}".format(
args.mode, ", ".join(patterns)
)
)
is_some_failed = False
for pattern in patterns:
vectors = list(get_vectors(pattern))
if not vectors:
LOG.debug("No vectors matching pattern '{}' found".format(pattern))
continue
if len(vectors) > cpu_count:
LOG.warning("There are more interrupt vectors matched than CPUs presented")
LOG.debug(
"Found {} vectors matching '{}' pattern".format(len(vectors), pattern)
)
LOG.info("Starting affinitization of '{}' pattern".format(pattern))
try:
affinitize(vectors, cpu_count, args.cpuset, args.dry_run, args.check)
except Exception:
is_some_failed = True
LOG.exception("Failed to affinitize")
if args.mode == MODE_NETWORK:
try:
devices = list(get_network_devices())
packet_steering(
devices,
cpu_count,
args.cpuset,
args.rps,
args.xps,
args.dry_run,
args.check,
)
except Exception:
LOG.exception("Failed to apply RPS/XPS rules")
is_some_failed = True
if is_some_failed:
return 1
return 0
if __name__ == "__main__":
parsed_args = parse_args()
sys.exit(main(parsed_args))
# mypy: allow-untyped-defs
"""
Library functions for the affinitize binary
"""
import itertools
import logging
import os.path
import re
from fnmatch import fnmatch
from glob import glob
from os import listdir
from typing import Any, Iterator, List, Optional, Text, Tuple, Union
NETWORK_PATTERNS = ["rx", "tx", "TxRx", r"eth\d+-\d+", "ioat-msix"]
STORAGE_PATTERNS = [
"megasas",
"hpsa",
r"mpt2sas\d+-msix\d+",
r"mpt3sas\d+-msix\d+",
"aacraid",
r"nvme\d+q\d+",
]
MODE_NETWORK = "network"
MODE_STORAGE = "storage"
PS_ON, PS_OFF, PS_AUTO = "on", "off", "auto"
LOG = logging.getLogger()
def construct_affinity_vectors(cpuset, vectors):
return list(itertools.izip(itertools.cycle(cpuset), vectors))
def affinitize(vectors, cpu_count, cpuset, dry_run=False, check=False):
"""
Applies affinitization based on upstream Intel's `set_irq_affinity.sh`:
https://tails.corp.dropbox.com/P6368
"""
changes = False
affinity_vectors = construct_affinity_vectors(cpuset, vectors)
for cpu_id, vector in affinity_vectors:
affinity = construct_affinity(cpu_id, cpu_count)
smp_affinity_file = "/proc/irq/{}/smp_affinity".format(vector)
with open(smp_affinity_file, "r") as f:
current_affinity = parse_affinity(f.read())
if current_affinity == affinity:
LOG.debug("{} is already set to {}".format(smp_affinity_file, affinity))
continue
if not dry_run:
LOG.debug("Setting {} to {}".format(smp_affinity_file, affinity))
with open(smp_affinity_file, "w") as f:
f.write(affinity)
else:
LOG.warning(
"{} will be changed {} -> {}".format(
smp_affinity_file, current_affinity, affinity
)
)
changes = True
if check and changes:
raise RuntimeError("Affinity changes are pending")
def packet_steering(devices, cpu_count, cpuset, rps, xps, dry_run=False, check=False):
"""
Applies RPS/XPS tunings based on suggestions from:
https://www.kernel.org/doc/Documentation/networking/scaling.txt
"""
failures = False
for device in devices:
if rps == PS_ON:
if cpu_count == len(cpuset):
rps_cpu_mask = (2 ** cpu_count) - 1
else:
rps_cpu_mask = 0
for cpu in cpuset:
rps_cpu_mask += 2 ** cpu
else:
rps_cpu_mask = 0
rx_queues = get_device_queues(device, "rx")
rx_queues = rx_queues[: len(cpuset)]
for queue_id, queue in enumerate(rx_queues):
filename = os.path.join(queue, "rps_cpus")
try:
write_ps_file(filename, rps_cpu_mask, dry_run, check)
except Exception as e:
failures = True
LOG.critical("RPS failed: {}: {}: {}".format(device, filename, e))
tx_queues = get_device_queues(device, "tx")
tx_queues = tx_queues[: len(cpuset)]
queue_vectors = construct_affinity_vectors(cpuset, tx_queues)
for queue_id, queue in queue_vectors:
if xps == PS_ON or (xps == PS_AUTO and len(tx_queues) != 1):
xps_cpu_mask = 2 ** (queue_id % cpu_count)
else:
xps_cpu_mask = 0
filename = os.path.join(queue, "xps_cpus")
try:
write_ps_file(filename, xps_cpu_mask, dry_run, check)
except Exception as e:
failures = True
LOG.critical("XPS failed: {}: {}: {}".format(device, filename, e))
# 4.18+ kernels have transmit queue selection based on receive queue
xps_rxqs_path = os.path.join(queue, "xps_rxqs")
if os.path.exists(xps_rxqs_path):
try:
write_ps_file(xps_rxqs_path, xps_cpu_mask, dry_run, check)
except Exception as e:
failures = True
LOG.critical(
"RXQS failed: {}: {}: {}".format(device, xps_rxqs_path, e)
)
if failures:
raise RuntimeError("Packet steering failed")
def write_ps_file(filename, cpu_mask, dry_run=False, check=False):
# type: (Union[Text, int], Any, bool, bool) -> None
"""
Converts `cpu_mask` to string that can be parsed by kernel and writes it to the given file if:
* dry-run is disabled;
* content of the file is different from value to-be-written.
"""
constructed_cpu_mask = construct_cpu_mask(cpu_mask)
with open(filename, "r") as f:
current_cpu_mask = parse_affinity(f.read())
if current_cpu_mask == constructed_cpu_mask:
LOG.debug("{} is already set to {}".format(filename, current_cpu_mask))
return
if dry_run:
LOG.debug(
"{} will be changed {} -> {}".format(
filename, current_cpu_mask, constructed_cpu_mask
)
)
if check:
raise RuntimeError("Packet Steering changes are pending")
return
LOG.debug("Setting {} to {}".format(filename, constructed_cpu_mask))
with open(filename, "w") as f:
f.write(constructed_cpu_mask)
def get_network_devices(filters=("eth*",)):
# type: (Tuple[str]) -> List[str]
"""
Filter network devices based on globs
"""
devices = listdir("/sys/class/net")
return [d for d in devices for f in filters if fnmatch(d, f)]
def get_device_queues(device, ps_type):
# type: (Text, Text) -> List[Any]
assert ps_type in ("tx", "rx"), "unknown ps_type: {}".format(ps_type)
return glob("/sys/class/net/{}/queues/{}-*".format(device, ps_type))
def construct_cpu_mask(value):
# type: (int) -> str
"""
Returns CPU mask consumable by Linux kernel
"""
def split_by_n(seq, n):
"""Divides sequence into chunks of n units"""
while seq:
yield seq[-n:]
seq = seq[:-n]
affinity_hex = format(value, "x")
affinity_int32_split = list(split_by_n(affinity_hex, 8))
return ",".join(reversed(affinity_int32_split))
def construct_affinity(cpu_id, cpu_count):
# type: (int, int) -> str
"""
Constructs string that can be echoded to /proc/irq/{}/smp_affinity
We need to split hex string into 32bit chunks
"""
return construct_cpu_mask(2 ** (cpu_id % cpu_count))
def parse_affinity(affinity):
# type: (str) -> str
"""
Convert irq affinity format to hex, strip leading 0s
"""
return affinity.replace(",", "").lstrip("0").strip() or "0"
def parse_vector(pattern, line):
# type: (str, Text) -> Optional[Text]
"""
Returns irq string that consists of vector if line matches regexp, otherwise returns None
"""
if re.search(pattern, line):
vector = line.split(":", 1)[0]
return vector.strip(" ")
return None
def get_vectors(pattern):
# type: (str) -> Iterator[Text]
"""
TODO(rbtz): in new kernels that can be obtained via:
/sys/class/$CLASS/$IFACE/device/msi_irqs
"""
with open("/proc/interrupts") as lines:
for line in lines:
vector = parse_vector(pattern, line)
if vector:
yield vector
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment