Skip to content

Instantly share code, notes, and snippets.

@nickhutchinson
Created April 12, 2017 20:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickhutchinson/cffbb2927211af3efe87a6cb77c4ca81 to your computer and use it in GitHub Desktop.
Save nickhutchinson/cffbb2927211af3efe87a6cb77c4ca81 to your computer and use it in GitHub Desktop.
# WIP Port of https://github.com/dotnet/coreclr/blob/ef1e2ab328087c61a6878c1e84f4fc5d710aebce/src/vm/hillclimbing.cpp
import math
import random
from collections import deque
from enum34 import Enum, auto
import psutil
WAVE_PERIOD = 4
MAX_THREAD_WAVE_MAGNITUDE = 20
THREAD_MAGNITUDE_MULTIPLIER = 1.0
SAMPLES_TO_MEASURE = WAVE_PERIOD * 8
TARGET_THROUGHPUT_RATIO = 0.15
TARGET_SIGNAL_TO_NOISE_RATIO = 3.0
MAX_CHANGE_PER_SECOND = 4.0
MAX_CHANGE_PER_SAMPLE = 20.0
SAMPLE_INTERVAL_LOW = 10.0
SAMPLE_INTERVAL_HIGH = 200.0
THROUGHPUT_ERROR_SMOOTHING_FACTOR = 0.01
# 1.0: linear gain; higher values enhance large moves and damp small ones
GAIN_EXPONENT = 2.0
MAX_SAMPLE_ERROR = 0.15
CPU_UTILIZATION_HIGH = .95
MIN_LIMIT_TOTAL_WORKER_THREADS = 2
MAX_LIMIT_TOTAL_WORKER_THREADS = 1000
class HillClimbingStateTransition(Enum):
Warmup = auto()
Initializing = auto()
RandomMove = auto()
ClimbingMove = auto()
ChangePoint = auto()
Stabilizing = auto()
Starvation = auto() # used by ThreadpoolMgr
ThreadTimedOut = auto() # used by ThreadpoolMgr
Undefined = auto()
class ThreadState(object):
def __init__(self):
self.currentControlSetting = 0
self.totalSamples = 0
self.lastThreadCount = 0
self.averageThroughputNoise = 0
self.elapsedSinceLastChange = 0 # elapsed seconds since last thread count change
self.completionsSinceLastChange = 0 # number of completions since last thread count change
self.accumulatedCompletionCount = 0
self.accumulatedSampleDuration = 0
self.samples = deque(SAMPLES_TO_MEASURE)
self.threadCounts = deque(SAMPLES_TO_MEASURE)
self.currentSampleInterval = random.randint(SAMPLE_INTERVAL_LOW,
SAMPLE_INTERVAL_HIGH)
@classmethod
def GetWaveComponent(cls, samples, period):
assert len(samples) >= period
# can't measure a wave that doesn't fit
assert period >= 2
# can't measure above the Nyquist frequency
# Calculate the sinusoid with the given period.
# We're using the Goertzel algorithm for this. See
# http://en.wikipedia.org/wiki/Goertzel_algorithm.
w = 2.0 * math.pi / period
cosine = math.cos(w)
sine = math.sin(w)
coeff = 2.0 * cosine
q0, q1, q2 = 0.0, 0.0, 0.0
for sample in samples:
q0 = sample + coeff * q1 - q2
q2 = q1
q1 = q0
return complex(q1 - q2 * cosine, q2 * sine) / len(samples)
def Update(self, currentThreadCount, sampleDuration, numCompletions):
# If someone changed the thread count without telling us, update our
# records accordingly.
if currentThreadCount != self.lastThreadCount:
self.ForceChange(currentThreadCount,
HillClimbingStateTransition.Initializing)
# Update the cumulative stats for this thread count
self.elapsedSinceLastChange += sampleDuration
self.completionsSinceLastChange += numCompletions
# Add in any data we've already collected about this sample
sampleDuration += self.accumulatedSampleDuration
numCompletions += self.accumulatedCompletionCount
# We need to make sure we're collecting reasonably accurate data.
# Since we're just counting the end of each work item, we are going to
# be missing some data about what really happened during the sample
# interval. The count produced by each thread includes an initial work
# item that may have started well before the start of the interval, and
# each thread may have been running some new work item for some time
# before the end of the interval, which did not yet get counted. So
# our count is going to be off by +/- threadCount workitems.
#
# The exception is that the thread that reported to us last time
# definitely wasn't running any work at that time, and the thread
# that's reporting now definitely isn't running a work item now. So we
# really only need to consider threadCount-1 threads.
#
# Thus the percent error in our count is +/-
# (threadCount-1)/numCompletions. We cannot rely on the
# frequency-domain analysis we'll be doing later to filter out this
# error, because of the way it accumulates over time. If this sample
# is off by, say, 33% in the negative direction, then the next one
# likely will be too. The one after that will include the sum of the
# completions we missed in the previous samples, and so will be 33%
# positive. So every three samples we'll have two "low" samples and one
# "high" sample. This will appear as periodic variation right in the
# frequency range we're targeting, which will not be filtered by the
# frequency-domain translation.
if self.totalSamples and (float(currentThreadCount - 1) /
numCompletions) >= MAX_SAMPLE_ERROR:
# not accurate enough yet. Let's accumulate the data so far, and
# tell the ThreadPool to collect a little more.
self.accumulatedSampleDuration = sampleDuration
self.accumulatedCompletionCount = numCompletions
return currentThreadCount, SAMPLE_INTERVAL_LOW
# We've got enough data for our sample; reset our accumulators for next
# time.
self.accumulatedSampleDuration = 0
self.accumulatedCompletionCount = 0
# Add the current thread count and throughput sample to our history
throughput = float(numCompletions) / sampleDuration
self.samples.append(throughput)
self.threadCounts.append(currentThreadCount)
self.totalSamples += 1
# Set up defaults for our metrics
threadWaveComponent = complex(0.0)
throughputWaveComponent = complex(0.0)
throughputErrorEstimate = 0.0
ratio = complex(0.0)
confidence = 0.0
transition = HillClimbingStateTransition.Warmup
# How many samples will we use? It must be at least the three wave
# periods we're looking for, and it must also be a whole multiple of
# the primary wave's period; otherwise the frequency we're looking for
# will fall between two frequency bands in the Fourier analysis, and
# we won't be able to measure it accurately.
sampleCount = min(self.totalSamples - 1,
SAMPLES_TO_MEASURE) // WAVE_PERIOD * WAVE_PERIOD
if sampleCount > WAVE_PERIOD:
# Average the throughput and thread count samples, so we can scale
# the wave magnitudes later.
averageThroughput = sum(self.samples[-sampleCount:]) / sampleCount
averageThreadCount = (
sum(self.threadCounts[-sampleCount:]) / sampleCount)
if averageThroughput > 0 and averageThreadCount > 0:
# Calculate the periods of the adjacent frequency bands we'll
# be using to measure noise levels. We want the two adjacent
# Fourier frequency bands.
adjacentPeriod1 = (sampleCount /
(float(sampleCount) / WAVE_PERIOD + 1))
adjacentPeriod2 = (sampleCount /
(float(sampleCount) / WAVE_PERIOD - 1))
# Get the three different frequency components of the
# throughput (scaled by average throughput). Our "error"
# estimate (the amount of noise that might be present in the
# frequency band we're really interested in) is the average of
# the adjacent bands.
throughputWaveComponent = (
self.GetWaveComponent(self.samples[-sampleCount:],
WAVE_PERIOD) / averageThroughput)
throughputErrorEstimate = abs(
self.GetWaveComponent(self.samples[-sampleCount:],
adjacentPeriod1) / averageThroughput)
if adjacentPeriod2 <= sampleCount:
throughputErrorEstimate = max(
throughputErrorEstimate,
abs(
self.GetWaveComponent(self.samples[-sampleCount:],
adjacentPeriod2) /
averageThroughput))
# Do the same for the thread counts, so we have something to
# compare to. We don't measure thread count noise, because
# there is none; these are exact measurements.
threadWaveComponent = (
self.GetWaveComponent(self.threadCounts[-sampleCount:],
WAVE_PERIOD) / averageThreadCount)
# Update our moving average of the throughput noise. We'll use
# this later as feedback to determine the new size of the
# thread wave.
if self.averageThroughputNoise == 0:
self.averageThroughputNoise = throughputErrorEstimate
else:
self.averageThroughputNoise = (
THROUGHPUT_ERROR_SMOOTHING_FACTOR *
throughputErrorEstimate) + (
(1.0 - THROUGHPUT_ERROR_SMOOTHING_FACTOR) *
self.averageThroughputNoise)
if abs(threadWaveComponent):
# Adjust the throughput wave so it's centered around the
# target wave, and then calculate the adjusted
# throughput/thread ratio.
ratio = (throughputWaveComponent -
(TARGET_THROUGHPUT_RATIO * threadWaveComponent)
) / threadWaveComponent
transition = HillClimbingStateTransition.ClimbingMove
else:
ratio = 0
transition = HillClimbingStateTransition.Stabilizing
# Calculate how confident we are in the ratio. More noise ==
# less confident. This has the effect of slowing down
# movements that might be affected by random noise.
noiseForConfidence = max(self.averageThroughputNoise,
throughputErrorEstimate)
if noiseForConfidence:
confidence = (
(abs(threadWaveComponent) / noiseForConfidence) /
TARGET_SIGNAL_TO_NOISE_RATIO)
else:
#there is no noise!
confidence = 1.0
# We use just the real part of the complex ratio we just calculated.
# If the throughput signal is exactly in phase with the thread signal,
# this will be the same as taking the magnitude of the complex move and
# moving that far up. If they're 180 degrees out of phase, we'll move
# backward (because this indicates that our changes are having the
# opposite of the intended effect). If they're 90 degrees out of
# phase, we won't move at all, because we can't tell whether we're
# having a negative or positive effect on throughput.
move = min(1.0, max(-1.0, ratio.real))
# Apply our confidence multiplier.
move *= min(1.0, max(0.0, confidence))
# Now apply non-linear gain, such that values around zero are
# attenuated, while higher values are enhanced. This allows us to move
# quickly if we're far away from the target, but more slowly if we're
# getting close, giving us rapid ramp-up without wild oscillations
# around the target.
gain = MAX_CHANGE_PER_SECOND * sampleDuration
move = pow(abs(move), GAIN_EXPONENT) * (gain if move >= 0.0 else -gain)
move = min(move, MAX_CHANGE_PER_SAMPLE)
# If the result was positive, and CPU is > 95%, refuse the move.
if move > 0.0 and psutil.cpu_percent() > CPU_UTILIZATION_HIGH:
move = 0.0
# Apply the move to our control setting
self.currentControlSetting += move
# Calculate the new thread wave magnitude, which is based on the moving
# average we've been keeping of the throughput error. This average
# starts at zero, so we'll start with a nice safe little wave at first.
newThreadWaveMagnitude = int(0.5 + (
self.currentControlSetting * self.averageThroughputNoise *
TARGET_SIGNAL_TO_NOISE_RATIO * THREAD_MAGNITUDE_MULTIPLIER * 2.0))
newThreadWaveMagnitude = min(newThreadWaveMagnitude,
MAX_THREAD_WAVE_MAGNITUDE)
newThreadWaveMagnitude = max(newThreadWaveMagnitude, 1)
# Make sure our control setting is within the ThreadPool's limits
self.currentControlSetting = min(
MAX_LIMIT_TOTAL_WORKER_THREADS - newThreadWaveMagnitude,
self.currentControlSetting)
self.currentControlSetting = max(MIN_LIMIT_TOTAL_WORKER_THREADS,
self.currentControlSetting)
# Calculate the new thread count (control setting + square wave)
newThreadCount = int(self.currentControlSetting +
newThreadWaveMagnitude * (
(self.totalSamples / (WAVE_PERIOD / 2)) % 2))
# Make sure the new thread count doesn't exceed the ThreadPool's limits
newThreadCount = min(MAX_LIMIT_TOTAL_WORKER_THREADS, newThreadCount)
newThreadCount = max(MIN_LIMIT_TOTAL_WORKER_THREADS, newThreadCount)
# If all of this caused an actual change in thread count, log that as
# well.
if newThreadCount != currentThreadCount:
self.ChangeThreadCount(newThreadCount, transition)
# Return the new thread count and sample interval. This is randomized
# to prevent correlations with other periodic changes in throughput.
# Among other things, this prevents us from getting confused by Hill
# Climbing instances running in other processes.
# If we're at minThreads, and we seem to be hurting performance by
# going higher, we can't go any lower to fix this. So we'll simply
# stay at minThreads much longer, and only occasionally try a higher
# value.
if (ratio.real < 0.0 and
newThreadCount == MIN_LIMIT_TOTAL_WORKER_THREADS):
return newThreadCount, int(0.5 + self.currentSampleInterval *
(10.0 * max(-ratio.real, 1.0)))
else:
return newThreadCount, self.currentSampleInterval
def ForceChange(self, newThreadCount, transition):
if newThreadCount != self.lastThreadCount:
self.currentControlSetting += newThreadCount - self.lastThreadCount
self.ChangeThreadCount(newThreadCount, transition)
def ChangeThreadCount(self, newThreadCount, transition):
self.lastThreadCount = newThreadCount
self.currentSampleInterval = random.randint(SAMPLE_INTERVAL_LOW,
SAMPLE_INTERVAL_HIGH)
self.elapsedSinceLastChange = 0
self.completionsSinceLastChange = 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment