Skip to content

Instantly share code, notes, and snippets.

@blzzua
Created March 21, 2023 17:16
Show Gist options
  • Save blzzua/84fdd0950bc74de1cab8b1f995eca466 to your computer and use it in GitHub Desktop.
Save blzzua/84fdd0950bc74de1cab8b1f995eca466 to your computer and use it in GitHub Desktop.
adaptive cpu on/off
from time import sleep
import os
import json
def uncollapse(inp):
""" '0-2,6-8,9,10,23' -> 0,1,2,6,7,8,9,10,23 """
res = []
for items in inp.split(','):
if '-' in items:
start, end = items.split('-')
res.extend( range(int(start), int(end)+1) )
else:
res.append(int(items))
return res
def get_online_cpu():
lscpu_stream = os.popen('lscpu -J')
lscpu_json = json.loads(lscpu_stream.read())
return next(f['data'] for f in lscpu_json['lscpu'] if f['field'] == 'On-line CPU(s) list:')
def increase_cpu():
cpus_on = uncollapse(get_online_cpu())
for cpu in range(cpu_cnt):
if cpu not in cpus_on:
stream = os.popen(f'sudo /usr/sbin/chcpu -e {cpu}')
print(stream.read())
return cpu
def decrease_cpu():
cpus_on = uncollapse(get_online_cpu())
for cpu in range(cpu_cnt-1, 0, -1):
if cpu in cpus_on and cpu not in alwayson_cpu:
stream = os.popen(f'sudo /usr/sbin/chcpu -d {cpu}')
print(stream.read())
return cpu
def get_stat():
with open('/proc/stat') as st:
#cpu_name, cpu_user, cpu_nice, cpu_system, cpu_idle,
stat = st.readline().strip()
cpu_name, cpu_user, cpu_nice, cpu_system, cpu_idle, cpu_iowait, cpu_irq, cpu_softirq, *_ = stat.split()
return int(cpu_idle), sum(map(int, (cpu_user, cpu_nice, cpu_system, cpu_iowait, cpu_irq, cpu_softirq)))
def main_loop():
cpu_idle, cpu_busy = get_stat()
cpu_online = uncollapse(get_online_cpu())
while True:
prevcpu_idle, prevcpu_busy = cpu_idle, cpu_busy
sleep(15) # sleep time
cpu_idle, cpu_busy = get_stat()
d_idle = cpu_idle - prevcpu_idle
d_busy = cpu_busy - prevcpu_busy
pct_busy = d_busy / ( d_busy + d_idle )
if not 0 < pct_busy <= 1 : continue
print(f"DEBUG: current cpu usage: {pct_busy}")
if pct_busy > 0.9 and online_cpu_cnt < cpu_cnt:
increase_cpu()
cpu_online = uncollapse(get_online_cpu())
online_cpu_cnt = len(cpu_online)
elif pct_busy < 0.1 and online_cpu_cnt > len(alwayson_cpu):
decrease_cpu()
cpu_online = uncollapse(get_online_cpu())
online_cpu_cnt = len(cpu_online)
if __name__ == '__main__':
lscpu_stream = os.popen('lscpu -J')
lscpu_json = json.loads(lscpu_stream.read())
cpu_cnt = int(next(f['data'] for f in lscpu_json['lscpu'] if f['field'] == 'CPU(s):'))
alwayson_cpu = [0,1,2,23] # constant
cpu_online = uncollapse(get_online_cpu())
online_cpu_cnt = len(cpu_online)
main_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment