Demonstrate nice threads in Python
import logging | |
import multiprocessing | |
import os | |
import random | |
import threading | |
import time | |
from typing import Optional | |
LOG_FMT = ('%(asctime)s %(levelname)s [%(name)s] ' | |
'<%(filename)s:%(funcName)s:%(lineno)d> ' | |
'{%(process)s-%(thread)s} ' | |
'%(message)s') | |
logging.basicConfig(level=logging.INFO, format=LOG_FMT) | |
def pin_process_to_cpus( | |
pid: Optional[int] = None, n_cpus: int = 1) -> None: | |
if pid is None: | |
pid = multiprocessing.current_process().pid | |
# assert n_cpus > 0, f'Invalid n_cpus {n_cpus}' | |
n_available_cpus = multiprocessing.cpu_count() | |
cpus = sorted(random.sample( | |
range(n_available_cpus), min(n_cpus, n_available_cpus))) | |
cpus_str = ','.join(str(cpu) for cpu in cpus) | |
logging.warning('Pinning %s to cpus %s', pid, cpus_str) | |
os.system('taskset --pid --cpu-list {} {}'.format(cpus_str, pid)) | |
def fib(n): | |
if n < 2: | |
return 1 | |
return fib(n - 1) + fib(n - 2) | |
def loop(name, nice): | |
logging.info('Starting %s [%s]', name, nice) | |
if nice: | |
os.nice(nice) | |
t0 = time.time() | |
for _ in range(N_ITER): | |
fib(FIB_N) | |
logging.info('Done %s [%s] in %.2f', name, nice, time.time() - t0) | |
N_CPUS = 1 | |
FIB_N = 26 | |
N_ITER = 100 | |
N_THREADS = 4 | |
pin_process_to_cpus(n_cpus=N_CPUS) | |
def parallel(): | |
ts = [] | |
for i in range(N_THREADS): | |
nice = i * 5 | |
t = threading.Thread(target=loop, args=('t{}'.format(i), nice)) | |
ts.append(t) | |
for t in ts: | |
t.start() | |
def seq(): | |
for i in range(N_THREADS): | |
loop(f'seq-{i}', None) | |
# seq() | |
parallel() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment