Skip to content

Instantly share code, notes, and snippets.

@speedplane
Created October 25, 2017 02:02
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save speedplane/224eb551c51a74068011f4d776237513 to your computer and use it in GitHub Desktop.
Save speedplane/224eb551c51a74068011f4d776237513 to your computer and use it in GitHub Desktop.
Celery Autoscaler Based on Memory and System Load
import multiprocessing
import re
from celery import Celery
from celery.worker.autoscale import Autoscaler as CeleryAutoscaler
class DAAutoscaler(CeleryAutoscaler):
# Try to keep the load above this point.
LOAD_MIN = .8
# Try to keep the load below this.
LOAD_MAX = 1.1
# We need this percentage of free memory to scale up.
MEM_FREE_SCALE_UP = .3
# Any less than this memory and we scale down.
MEM_FREE_SCALE_DOWN = .2
def __init__(self, *args, **kwargs):
self.num_cpus = multiprocessing.cpu_count()
logging.info("DAAutoscaler: Num CPUs %s", self.num_cpus)
super(DAAutoscaler, self).__init__(*args, **kwargs)
def _maybe_scale(self, req = None):
'''Scale up or down if we too much/little load or memory.'''
cur_load = self._get_load()
mem_free = self._get_free_mem()
if cur_load < self.LOAD_MIN and mem_free > self.MEM_FREE_SCALE_UP:
mul = int(self.LOAD_MAX / cur_load)
logging.info("DAAutoscaler: Scale Up %dX %.2f free=%.2f%%",
mul, cur_load, 100*mem_free)
self.scale_up(1)
return True
if cur_load > self.LOAD_MAX or mem_free < self.MEM_FREE_SCALE_DOWN:
mul = int(cur_load / self.LOAD_MAX)
logging.info("DAAutoscaler: Scale Down %dX %.2f free=%.2f%%",
mul, cur_load, 100*mem_free)
self.scale_down(mul)
return True
logging.info("DAAutoscaler: Ok %.2f .2f%%", cur_load, 100*mem_free)
def _get_load(self):
load1min, load5min, load15min = os.getloadavg()
# Prevent divide by zero
if load1min < 0.001:
load1min = 0.001
return 1.0 * load1min / self.num_cpus
re_total = re.compile(r"MemTotal:\s+(?P<total>\d+)\s+kB")
re_free = re.compile(r"MemFree:\s+(?P<free>\d+)\s+kB")
def _get_free_mem(self):
'''Return percentage of free memory 0.0 to 1.0.'''
try:
# Try using the cross platform method.
import psutil
except ImportError:
# If not, make it work for most linux distros.
with open('/proc/meminfo', 'rb') as f:
mem = f.read()
return (1.0 * int(self.re_free.search(mem).group("free")) /
int(self.re_total.search(mem).group("total")))
else:
return psutil.virtual_memory().percent / 100
@korycins
Copy link

korycins commented Nov 7, 2017

@speedplane great work!
I guess that you have mistake in line 60:
psutil.virtual_memory().percen't doesnt return free memory but total usage.

from description of virtual_memory:

       the percentage usage calculated as (total - available) / total * 100```

@avnersorek
Copy link

Hey great work and thanks very much for publishing this !

Am I missing something or -
If the queue is empty, won't this just scale up indefinitely (or until idle workers exhaust resources) ?
No reference to min/max worker limits (self.max/min_concurrency) or number of reserved tasks (self.qty) ?

Thanks again 🙏 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment