Created
April 12, 2017 20:31
-
-
Save nickhutchinson/cffbb2927211af3efe87a6cb77c4ca81 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# WIP Port of https://github.com/dotnet/coreclr/blob/ef1e2ab328087c61a6878c1e84f4fc5d710aebce/src/vm/hillclimbing.cpp | |
import math | |
import random | |
from collections import deque | |
from enum34 import Enum, auto | |
import psutil | |
WAVE_PERIOD = 4 | |
MAX_THREAD_WAVE_MAGNITUDE = 20 | |
THREAD_MAGNITUDE_MULTIPLIER = 1.0 | |
SAMPLES_TO_MEASURE = WAVE_PERIOD * 8 | |
TARGET_THROUGHPUT_RATIO = 0.15 | |
TARGET_SIGNAL_TO_NOISE_RATIO = 3.0 | |
MAX_CHANGE_PER_SECOND = 4.0 | |
MAX_CHANGE_PER_SAMPLE = 20.0 | |
SAMPLE_INTERVAL_LOW = 10.0 | |
SAMPLE_INTERVAL_HIGH = 200.0 | |
THROUGHPUT_ERROR_SMOOTHING_FACTOR = 0.01 | |
# 1.0: linear gain; higher values enhance large moves and damp small ones | |
GAIN_EXPONENT = 2.0 | |
MAX_SAMPLE_ERROR = 0.15 | |
CPU_UTILIZATION_HIGH = .95 | |
MIN_LIMIT_TOTAL_WORKER_THREADS = 2 | |
MAX_LIMIT_TOTAL_WORKER_THREADS = 1000 | |
class HillClimbingStateTransition(Enum): | |
Warmup = auto() | |
Initializing = auto() | |
RandomMove = auto() | |
ClimbingMove = auto() | |
ChangePoint = auto() | |
Stabilizing = auto() | |
Starvation = auto() # used by ThreadpoolMgr | |
ThreadTimedOut = auto() # used by ThreadpoolMgr | |
Undefined = auto() | |
class ThreadState(object): | |
def __init__(self): | |
self.currentControlSetting = 0 | |
self.totalSamples = 0 | |
self.lastThreadCount = 0 | |
self.averageThroughputNoise = 0 | |
self.elapsedSinceLastChange = 0 # elapsed seconds since last thread count change | |
self.completionsSinceLastChange = 0 # number of completions since last thread count change | |
self.accumulatedCompletionCount = 0 | |
self.accumulatedSampleDuration = 0 | |
self.samples = deque(SAMPLES_TO_MEASURE) | |
self.threadCounts = deque(SAMPLES_TO_MEASURE) | |
self.currentSampleInterval = random.randint(SAMPLE_INTERVAL_LOW, | |
SAMPLE_INTERVAL_HIGH) | |
@classmethod | |
def GetWaveComponent(cls, samples, period): | |
assert len(samples) >= period | |
# can't measure a wave that doesn't fit | |
assert period >= 2 | |
# can't measure above the Nyquist frequency | |
# Calculate the sinusoid with the given period. | |
# We're using the Goertzel algorithm for this. See | |
# http://en.wikipedia.org/wiki/Goertzel_algorithm. | |
w = 2.0 * math.pi / period | |
cosine = math.cos(w) | |
sine = math.sin(w) | |
coeff = 2.0 * cosine | |
q0, q1, q2 = 0.0, 0.0, 0.0 | |
for sample in samples: | |
q0 = sample + coeff * q1 - q2 | |
q2 = q1 | |
q1 = q0 | |
return complex(q1 - q2 * cosine, q2 * sine) / len(samples) | |
def Update(self, currentThreadCount, sampleDuration, numCompletions): | |
# If someone changed the thread count without telling us, update our | |
# records accordingly. | |
if currentThreadCount != self.lastThreadCount: | |
self.ForceChange(currentThreadCount, | |
HillClimbingStateTransition.Initializing) | |
# Update the cumulative stats for this thread count | |
self.elapsedSinceLastChange += sampleDuration | |
self.completionsSinceLastChange += numCompletions | |
# Add in any data we've already collected about this sample | |
sampleDuration += self.accumulatedSampleDuration | |
numCompletions += self.accumulatedCompletionCount | |
# We need to make sure we're collecting reasonably accurate data. | |
# Since we're just counting the end of each work item, we are going to | |
# be missing some data about what really happened during the sample | |
# interval. The count produced by each thread includes an initial work | |
# item that may have started well before the start of the interval, and | |
# each thread may have been running some new work item for some time | |
# before the end of the interval, which did not yet get counted. So | |
# our count is going to be off by +/- threadCount workitems. | |
# | |
# The exception is that the thread that reported to us last time | |
# definitely wasn't running any work at that time, and the thread | |
# that's reporting now definitely isn't running a work item now. So we | |
# really only need to consider threadCount-1 threads. | |
# | |
# Thus the percent error in our count is +/- | |
# (threadCount-1)/numCompletions. We cannot rely on the | |
# frequency-domain analysis we'll be doing later to filter out this | |
# error, because of the way it accumulates over time. If this sample | |
# is off by, say, 33% in the negative direction, then the next one | |
# likely will be too. The one after that will include the sum of the | |
# completions we missed in the previous samples, and so will be 33% | |
# positive. So every three samples we'll have two "low" samples and one | |
# "high" sample. This will appear as periodic variation right in the | |
# frequency range we're targeting, which will not be filtered by the | |
# frequency-domain translation. | |
if self.totalSamples and (float(currentThreadCount - 1) / | |
numCompletions) >= MAX_SAMPLE_ERROR: | |
# not accurate enough yet. Let's accumulate the data so far, and | |
# tell the ThreadPool to collect a little more. | |
self.accumulatedSampleDuration = sampleDuration | |
self.accumulatedCompletionCount = numCompletions | |
return currentThreadCount, SAMPLE_INTERVAL_LOW | |
# We've got enough data for our sample; reset our accumulators for next | |
# time. | |
self.accumulatedSampleDuration = 0 | |
self.accumulatedCompletionCount = 0 | |
# Add the current thread count and throughput sample to our history | |
throughput = float(numCompletions) / sampleDuration | |
self.samples.append(throughput) | |
self.threadCounts.append(currentThreadCount) | |
self.totalSamples += 1 | |
# Set up defaults for our metrics | |
threadWaveComponent = complex(0.0) | |
throughputWaveComponent = complex(0.0) | |
throughputErrorEstimate = 0.0 | |
ratio = complex(0.0) | |
confidence = 0.0 | |
transition = HillClimbingStateTransition.Warmup | |
# How many samples will we use? It must be at least the three wave | |
# periods we're looking for, and it must also be a whole multiple of | |
# the primary wave's period; otherwise the frequency we're looking for | |
# will fall between two frequency bands in the Fourier analysis, and | |
# we won't be able to measure it accurately. | |
sampleCount = min(self.totalSamples - 1, | |
SAMPLES_TO_MEASURE) // WAVE_PERIOD * WAVE_PERIOD | |
if sampleCount > WAVE_PERIOD: | |
# Average the throughput and thread count samples, so we can scale | |
# the wave magnitudes later. | |
averageThroughput = sum(self.samples[-sampleCount:]) / sampleCount | |
averageThreadCount = ( | |
sum(self.threadCounts[-sampleCount:]) / sampleCount) | |
if averageThroughput > 0 and averageThreadCount > 0: | |
# Calculate the periods of the adjacent frequency bands we'll | |
# be using to measure noise levels. We want the two adjacent | |
# Fourier frequency bands. | |
adjacentPeriod1 = (sampleCount / | |
(float(sampleCount) / WAVE_PERIOD + 1)) | |
adjacentPeriod2 = (sampleCount / | |
(float(sampleCount) / WAVE_PERIOD - 1)) | |
# Get the three different frequency components of the | |
# throughput (scaled by average throughput). Our "error" | |
# estimate (the amount of noise that might be present in the | |
# frequency band we're really interested in) is the average of | |
# the adjacent bands. | |
throughputWaveComponent = ( | |
self.GetWaveComponent(self.samples[-sampleCount:], | |
WAVE_PERIOD) / averageThroughput) | |
throughputErrorEstimate = abs( | |
self.GetWaveComponent(self.samples[-sampleCount:], | |
adjacentPeriod1) / averageThroughput) | |
if adjacentPeriod2 <= sampleCount: | |
throughputErrorEstimate = max( | |
throughputErrorEstimate, | |
abs( | |
self.GetWaveComponent(self.samples[-sampleCount:], | |
adjacentPeriod2) / | |
averageThroughput)) | |
# Do the same for the thread counts, so we have something to | |
# compare to. We don't measure thread count noise, because | |
# there is none; these are exact measurements. | |
threadWaveComponent = ( | |
self.GetWaveComponent(self.threadCounts[-sampleCount:], | |
WAVE_PERIOD) / averageThreadCount) | |
# Update our moving average of the throughput noise. We'll use | |
# this later as feedback to determine the new size of the | |
# thread wave. | |
if self.averageThroughputNoise == 0: | |
self.averageThroughputNoise = throughputErrorEstimate | |
else: | |
self.averageThroughputNoise = ( | |
THROUGHPUT_ERROR_SMOOTHING_FACTOR * | |
throughputErrorEstimate) + ( | |
(1.0 - THROUGHPUT_ERROR_SMOOTHING_FACTOR) * | |
self.averageThroughputNoise) | |
if abs(threadWaveComponent): | |
# Adjust the throughput wave so it's centered around the | |
# target wave, and then calculate the adjusted | |
# throughput/thread ratio. | |
ratio = (throughputWaveComponent - | |
(TARGET_THROUGHPUT_RATIO * threadWaveComponent) | |
) / threadWaveComponent | |
transition = HillClimbingStateTransition.ClimbingMove | |
else: | |
ratio = 0 | |
transition = HillClimbingStateTransition.Stabilizing | |
# Calculate how confident we are in the ratio. More noise == | |
# less confident. This has the effect of slowing down | |
# movements that might be affected by random noise. | |
noiseForConfidence = max(self.averageThroughputNoise, | |
throughputErrorEstimate) | |
if noiseForConfidence: | |
confidence = ( | |
(abs(threadWaveComponent) / noiseForConfidence) / | |
TARGET_SIGNAL_TO_NOISE_RATIO) | |
else: | |
#there is no noise! | |
confidence = 1.0 | |
# We use just the real part of the complex ratio we just calculated. | |
# If the throughput signal is exactly in phase with the thread signal, | |
# this will be the same as taking the magnitude of the complex move and | |
# moving that far up. If they're 180 degrees out of phase, we'll move | |
# backward (because this indicates that our changes are having the | |
# opposite of the intended effect). If they're 90 degrees out of | |
# phase, we won't move at all, because we can't tell whether we're | |
# having a negative or positive effect on throughput. | |
move = min(1.0, max(-1.0, ratio.real)) | |
# Apply our confidence multiplier. | |
move *= min(1.0, max(0.0, confidence)) | |
# Now apply non-linear gain, such that values around zero are | |
# attenuated, while higher values are enhanced. This allows us to move | |
# quickly if we're far away from the target, but more slowly if we're | |
# getting close, giving us rapid ramp-up without wild oscillations | |
# around the target. | |
gain = MAX_CHANGE_PER_SECOND * sampleDuration | |
move = pow(abs(move), GAIN_EXPONENT) * (gain if move >= 0.0 else -gain) | |
move = min(move, MAX_CHANGE_PER_SAMPLE) | |
# If the result was positive, and CPU is > 95%, refuse the move. | |
if move > 0.0 and psutil.cpu_percent() > CPU_UTILIZATION_HIGH: | |
move = 0.0 | |
# Apply the move to our control setting | |
self.currentControlSetting += move | |
# Calculate the new thread wave magnitude, which is based on the moving | |
# average we've been keeping of the throughput error. This average | |
# starts at zero, so we'll start with a nice safe little wave at first. | |
newThreadWaveMagnitude = int(0.5 + ( | |
self.currentControlSetting * self.averageThroughputNoise * | |
TARGET_SIGNAL_TO_NOISE_RATIO * THREAD_MAGNITUDE_MULTIPLIER * 2.0)) | |
newThreadWaveMagnitude = min(newThreadWaveMagnitude, | |
MAX_THREAD_WAVE_MAGNITUDE) | |
newThreadWaveMagnitude = max(newThreadWaveMagnitude, 1) | |
# Make sure our control setting is within the ThreadPool's limits | |
self.currentControlSetting = min( | |
MAX_LIMIT_TOTAL_WORKER_THREADS - newThreadWaveMagnitude, | |
self.currentControlSetting) | |
self.currentControlSetting = max(MIN_LIMIT_TOTAL_WORKER_THREADS, | |
self.currentControlSetting) | |
# Calculate the new thread count (control setting + square wave) | |
newThreadCount = int(self.currentControlSetting + | |
newThreadWaveMagnitude * ( | |
(self.totalSamples / (WAVE_PERIOD / 2)) % 2)) | |
# Make sure the new thread count doesn't exceed the ThreadPool's limits | |
newThreadCount = min(MAX_LIMIT_TOTAL_WORKER_THREADS, newThreadCount) | |
newThreadCount = max(MIN_LIMIT_TOTAL_WORKER_THREADS, newThreadCount) | |
# If all of this caused an actual change in thread count, log that as | |
# well. | |
if newThreadCount != currentThreadCount: | |
self.ChangeThreadCount(newThreadCount, transition) | |
# Return the new thread count and sample interval. This is randomized | |
# to prevent correlations with other periodic changes in throughput. | |
# Among other things, this prevents us from getting confused by Hill | |
# Climbing instances running in other processes. | |
# If we're at minThreads, and we seem to be hurting performance by | |
# going higher, we can't go any lower to fix this. So we'll simply | |
# stay at minThreads much longer, and only occasionally try a higher | |
# value. | |
if (ratio.real < 0.0 and | |
newThreadCount == MIN_LIMIT_TOTAL_WORKER_THREADS): | |
return newThreadCount, int(0.5 + self.currentSampleInterval * | |
(10.0 * max(-ratio.real, 1.0))) | |
else: | |
return newThreadCount, self.currentSampleInterval | |
def ForceChange(self, newThreadCount, transition): | |
if newThreadCount != self.lastThreadCount: | |
self.currentControlSetting += newThreadCount - self.lastThreadCount | |
self.ChangeThreadCount(newThreadCount, transition) | |
def ChangeThreadCount(self, newThreadCount, transition): | |
self.lastThreadCount = newThreadCount | |
self.currentSampleInterval = random.randint(SAMPLE_INTERVAL_LOW, | |
SAMPLE_INTERVAL_HIGH) | |
self.elapsedSinceLastChange = 0 | |
self.completionsSinceLastChange = 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment