Created
June 22, 2020 17:49
-
-
Save petermattis/590b45e21774600275b0f6a61ab0d8f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/github.com/cockroachdb/pebble/compaction_picker.go b/github.com/cockroachdb/pebble/compaction_picker.go | |
index 186e16ca5..20012461e 100644 | |
--- a/github.com/cockroachdb/pebble/compaction_picker.go | |
+++ b/github.com/cockroachdb/pebble/compaction_picker.go | |
@@ -119,6 +119,11 @@ type compactionPickerByScore struct { | |
// levelMaxBytes holds the dynamically adjusted max bytes setting for each | |
// level. | |
levelMaxBytes [numLevels]int64 | |
+ | |
+ // The ratio of the bytes in level to the previous higher level. For Lbase, | |
+ // this will be the ratio of bytes(Lbase)/bytes(L0). | |
+ // See the comment in pickAuto(). | |
+ currentByteRatios [numLevels]float64 | |
} | |
var _ compactionPicker = &compactionPickerByScore{} | |
@@ -209,13 +214,21 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp | |
// Determine the first non-empty level and the bottom level size. | |
firstNonEmptyLevel := -1 | |
var bottomLevelSize int64 | |
+ l0Size := int64(totalSize(p.vers.Levels[0])) | |
+ dbSize := l0Size | |
+ prevLevelSize := l0Size | |
for level := 1; level < numLevels; level++ { | |
levelSize := int64(totalSize(p.vers.Levels[level])) | |
+ dbSize += levelSize | |
if levelSize > 0 { | |
if firstNonEmptyLevel == -1 { | |
firstNonEmptyLevel = level | |
} | |
bottomLevelSize = levelSize | |
+ if prevLevelSize > 0 { | |
+ p.currentByteRatios[level] = float64(levelSize) / float64(prevLevelSize) | |
+ } | |
+ prevLevelSize = levelSize | |
} | |
} | |
for _, c := range inProgressCompactions { | |
@@ -244,6 +257,13 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp | |
return | |
} | |
+ // To handle the case where the LSM is not in the right shape (see the | |
+ // comment in PickAuto()) -- we don't want to pick an Lbase that is too | |
+ // low based on just looking at the bytes in the bottom level. | |
+ if float64(bottomLevelSize) < 0.8*float64(dbSize) { | |
+ bottomLevelSize = int64(0.8 * float64(dbSize)) | |
+ } | |
+ | |
levelMultiplier := 10.0 | |
baseBytesMax := p.opts.LBaseMaxBytes | |
@@ -487,6 +507,18 @@ func (p *compactionPickerByScore) pickFile(level, outputLevel int) int { | |
return file | |
} | |
+type sortCompactionLevels []*pickedCompactionInfo | |
+ | |
+func (s sortCompactionLevels) Len() int { | |
+ return len(s) | |
+} | |
+func (s sortCompactionLevels) Less(i, j int) bool { | |
+ return s[i].level < s[j].level | |
+} | |
+func (s sortCompactionLevels) Swap(i, j int) { | |
+ s[i], s[j] = s[j], s[i] | |
+} | |
+ | |
// pickAuto picks the best compaction, if any. | |
// | |
// On each call, pickAuto computes per-level size adjustments based on | |
@@ -497,6 +529,19 @@ func (p *compactionPickerByScore) pickFile(level, outputLevel int) int { | |
// If a score-based compaction cannot be found, pickAuto falls back to looking | |
// for a forced compaction (identified by FileMetadata.MarkedForCompaction). | |
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (c *compaction) { | |
+ countBaseCompactions := 0 | |
+ countIntraL0Compactions := 0 | |
+ for i := range env.inProgressCompactions { | |
+ if env.inProgressCompactions[i].startLevel == 0 { | |
+ if env.inProgressCompactions[i].outputLevel == 0 { | |
+ countIntraL0Compactions++ | |
+ } else { | |
+ countBaseCompactions++ | |
+ } | |
+ } | |
+ } | |
+ const highPriorityThreshold = 1.5 | |
+ | |
// highPriorityThreshold controls compaction concurrency. If there is already | |
// a compaction in progress, highPriorityThreshold is set to the minimum | |
// score needed for a concurrent compaction to be initiated. Since all level | |
@@ -521,49 +566,75 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (c *compaction) { | |
// large or the number of L0 sstables has reached 2x the L0 compaction | |
// threshold. In testing, it is usually the latter condition that triggers | |
// concurrent compactions in RocksDB. | |
- var highPriorityThreshold float64 | |
- if len(env.inProgressCompactions) > 0 { | |
- // Exponential high priority threshold: 2, 4, 8, ... | |
- highPriorityThreshold = float64(int(1) << len(env.inProgressCompactions)) | |
- } | |
+ // var highPriorityThreshold float64 | |
+ // if len(env.inProgressCompactions) > 0 { | |
+ // // Exponential high priority threshold: 2, 4, 8, ... | |
+ // highPriorityThreshold = float64(int(1) << len(env.inProgressCompactions)) | |
+ // } | |
scores := p.calculateScores(env.inProgressCompactions) | |
- // Check for a score-based compaction. "scores" has been sorted in order of | |
- // decreasing score. For each level with a score >= 1, we attempt to find a | |
- // compaction anchored at at that level. | |
+ var highestPriority []*pickedCompactionInfo | |
+ var highPriority []*pickedCompactionInfo | |
+ var lowPriority []*pickedCompactionInfo | |
for i := range scores { | |
info := &scores[i] | |
- if info.score < highPriorityThreshold { | |
- // Don't start a low priority compaction if there is already a compaction | |
- // running. | |
- return nil | |
- } | |
- if info.score < 1 { | |
+ if info.score > 4 { | |
+ // The info.score > 50 heuristic is based on noticing that the | |
+ // byte ratios are only an aggregate across the whole key space. | |
+ // And when Lbase falls too far behind then the number of observed | |
+ // input bytes from Lbase for L0 => Lbase becomes unproductive. | |
+ // For example 12MB from L0 + 1GB from Lbase => Lbase. | |
+ if p.currentByteRatios[info.level] > 5 || info.score > 50 { | |
+ highestPriority = append(highestPriority, info) | |
+ } else { | |
+ highPriority = append(highPriority, info) | |
+ } | |
+ } else if info.score >= 1 { | |
+ lowPriority = append(lowPriority, info) | |
+ } else { | |
break | |
} | |
+ } | |
- if info.level == 0 && p.opts.Experimental.L0SublevelCompactions { | |
- c = pickL0(env, p.opts, p.vers, p.baseLevel) | |
- // Fail-safe to protect against compacting the same sstable | |
- // concurrently. | |
+ sort.Sort(sortCompactionLevels(highPriority)) | |
+ | |
+ // Check for a score-based compaction. "scores" has been sorted in order of | |
+ // decreasing score. For each level with a score >= 1, we attempt to find a | |
+ // compaction anchored at at that level. | |
+ for _, comList := range [3][]*pickedCompactionInfo{highestPriority, highPriority, lowPriority} { | |
+ for _, info := range comList { | |
+ if info.score < highPriorityThreshold { | |
+ // Don't start a low priority compaction if there is already a compaction | |
+ // running. | |
+ return nil | |
+ } | |
+ if info.score < 1 { | |
+ break | |
+ } | |
+ | |
+ if info.level == 0 && p.opts.Experimental.L0SublevelCompactions { | |
+ c = pickL0(env, p.opts, p.vers, p.baseLevel) | |
+ // Fail-safe to protect against compacting the same sstable | |
+ // concurrently. | |
+ if c != nil && !inputAlreadyCompacting(c) { | |
+ c.score = info.score | |
+ return c | |
+ } | |
+ continue | |
+ } | |
+ | |
+ info.file = p.pickFile(info.level, info.outputLevel) | |
+ if info.file == -1 { | |
+ continue | |
+ } | |
+ | |
+ c := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel) | |
+ // Fail-safe to protect against compacting the same sstable concurrently. | |
if c != nil && !inputAlreadyCompacting(c) { | |
c.score = info.score | |
return c | |
} | |
- continue | |
- } | |
- | |
- info.file = p.pickFile(info.level, info.outputLevel) | |
- if info.file == -1 { | |
- continue | |
- } | |
- | |
- c := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel) | |
- // Fail-safe to protect against compacting the same sstable concurrently. | |
- if c != nil && !inputAlreadyCompacting(c) { | |
- c.score = info.score | |
- return c | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment