Skip to content

Instantly share code, notes, and snippets.

@petermattis
Created June 22, 2020 17:49
Show Gist options
  • Save petermattis/590b45e21774600275b0f6a61ab0d8f8 to your computer and use it in GitHub Desktop.
Save petermattis/590b45e21774600275b0f6a61ab0d8f8 to your computer and use it in GitHub Desktop.
diff --git a/github.com/cockroachdb/pebble/compaction_picker.go b/github.com/cockroachdb/pebble/compaction_picker.go
index 186e16ca5..20012461e 100644
--- a/github.com/cockroachdb/pebble/compaction_picker.go
+++ b/github.com/cockroachdb/pebble/compaction_picker.go
@@ -119,6 +119,11 @@ type compactionPickerByScore struct {
// levelMaxBytes holds the dynamically adjusted max bytes setting for each
// level.
levelMaxBytes [numLevels]int64
+
+ // The ratio of the bytes in level to the previous higher level. For Lbase,
+ // this will be the ratio of bytes(Lbase)/bytes(L0).
+ // See the comment in pickAuto().
+ currentByteRatios [numLevels]float64
}
var _ compactionPicker = &compactionPickerByScore{}
@@ -209,13 +214,21 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
// Determine the first non-empty level and the bottom level size.
firstNonEmptyLevel := -1
var bottomLevelSize int64
+ l0Size := int64(totalSize(p.vers.Levels[0]))
+ dbSize := l0Size
+ prevLevelSize := l0Size
for level := 1; level < numLevels; level++ {
levelSize := int64(totalSize(p.vers.Levels[level]))
+ dbSize += levelSize
if levelSize > 0 {
if firstNonEmptyLevel == -1 {
firstNonEmptyLevel = level
}
bottomLevelSize = levelSize
+ if prevLevelSize > 0 {
+ p.currentByteRatios[level] = float64(levelSize) / float64(prevLevelSize)
+ }
+ prevLevelSize = levelSize
}
}
for _, c := range inProgressCompactions {
@@ -244,6 +257,13 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
return
}
+ // To handle the case where the LSM is not in the right shape (see the
+ // comment in PickAuto()) -- we don't want to pick an Lbase that is too
+ // low based on just looking at the bytes in the bottom level.
+ if float64(bottomLevelSize) < 0.8*float64(dbSize) {
+ bottomLevelSize = int64(0.8 * float64(dbSize))
+ }
+
levelMultiplier := 10.0
baseBytesMax := p.opts.LBaseMaxBytes
@@ -487,6 +507,18 @@ func (p *compactionPickerByScore) pickFile(level, outputLevel int) int {
return file
}
+type sortCompactionLevels []*pickedCompactionInfo
+
+func (s sortCompactionLevels) Len() int {
+ return len(s)
+}
+func (s sortCompactionLevels) Less(i, j int) bool {
+ return s[i].level < s[j].level
+}
+func (s sortCompactionLevels) Swap(i, j int) {
+ s[i], s[j] = s[j], s[i]
+}
+
// pickAuto picks the best compaction, if any.
//
// On each call, pickAuto computes per-level size adjustments based on
@@ -497,6 +529,19 @@ func (p *compactionPickerByScore) pickFile(level, outputLevel int) int {
// If a score-based compaction cannot be found, pickAuto falls back to looking
// for a forced compaction (identified by FileMetadata.MarkedForCompaction).
func (p *compactionPickerByScore) pickAuto(env compactionEnv) (c *compaction) {
+ countBaseCompactions := 0
+ countIntraL0Compactions := 0
+ for i := range env.inProgressCompactions {
+ if env.inProgressCompactions[i].startLevel == 0 {
+ if env.inProgressCompactions[i].outputLevel == 0 {
+ countIntraL0Compactions++
+ } else {
+ countBaseCompactions++
+ }
+ }
+ }
+ const highPriorityThreshold = 1.5
+
// highPriorityThreshold controls compaction concurrency. If there is already
// a compaction in progress, highPriorityThreshold is set to the minimum
// score needed for a concurrent compaction to be initiated. Since all level
@@ -521,49 +566,75 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (c *compaction) {
// large or the number of L0 sstables has reached 2x the L0 compaction
// threshold. In testing, it is usually the latter condition that triggers
// concurrent compactions in RocksDB.
- var highPriorityThreshold float64
- if len(env.inProgressCompactions) > 0 {
- // Exponential high priority threshold: 2, 4, 8, ...
- highPriorityThreshold = float64(int(1) << len(env.inProgressCompactions))
- }
+ // var highPriorityThreshold float64
+ // if len(env.inProgressCompactions) > 0 {
+ // // Exponential high priority threshold: 2, 4, 8, ...
+ // highPriorityThreshold = float64(int(1) << len(env.inProgressCompactions))
+ // }
scores := p.calculateScores(env.inProgressCompactions)
- // Check for a score-based compaction. "scores" has been sorted in order of
- // decreasing score. For each level with a score >= 1, we attempt to find a
- // compaction anchored at at that level.
+ var highestPriority []*pickedCompactionInfo
+ var highPriority []*pickedCompactionInfo
+ var lowPriority []*pickedCompactionInfo
for i := range scores {
info := &scores[i]
- if info.score < highPriorityThreshold {
- // Don't start a low priority compaction if there is already a compaction
- // running.
- return nil
- }
- if info.score < 1 {
+ if info.score > 4 {
+ // The info.score > 50 heuristic is based on noticing that the
+ // byte ratios are only an aggregate across the whole key space.
+ // And when Lbase falls too far behind then the number of observed
+ // input bytes from Lbase for L0 => Lbase becomes unproductive.
+ // For example 12MB from L0 + 1GB from Lbase => Lbase.
+ if p.currentByteRatios[info.level] > 5 || info.score > 50 {
+ highestPriority = append(highestPriority, info)
+ } else {
+ highPriority = append(highPriority, info)
+ }
+ } else if info.score >= 1 {
+ lowPriority = append(lowPriority, info)
+ } else {
break
}
+ }
- if info.level == 0 && p.opts.Experimental.L0SublevelCompactions {
- c = pickL0(env, p.opts, p.vers, p.baseLevel)
- // Fail-safe to protect against compacting the same sstable
- // concurrently.
+ sort.Sort(sortCompactionLevels(highPriority))
+
+ // Check for a score-based compaction. "scores" has been sorted in order of
+ // decreasing score. For each level with a score >= 1, we attempt to find a
+ // compaction anchored at at that level.
+ for _, comList := range [3][]*pickedCompactionInfo{highestPriority, highPriority, lowPriority} {
+ for _, info := range comList {
+ if info.score < highPriorityThreshold {
+ // Don't start a low priority compaction if there is already a compaction
+ // running.
+ return nil
+ }
+ if info.score < 1 {
+ break
+ }
+
+ if info.level == 0 && p.opts.Experimental.L0SublevelCompactions {
+ c = pickL0(env, p.opts, p.vers, p.baseLevel)
+ // Fail-safe to protect against compacting the same sstable
+ // concurrently.
+ if c != nil && !inputAlreadyCompacting(c) {
+ c.score = info.score
+ return c
+ }
+ continue
+ }
+
+ info.file = p.pickFile(info.level, info.outputLevel)
+ if info.file == -1 {
+ continue
+ }
+
+ c := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel)
+ // Fail-safe to protect against compacting the same sstable concurrently.
if c != nil && !inputAlreadyCompacting(c) {
c.score = info.score
return c
}
- continue
- }
-
- info.file = p.pickFile(info.level, info.outputLevel)
- if info.file == -1 {
- continue
- }
-
- c := pickAutoHelper(env, p.opts, p.vers, *info, p.baseLevel)
- // Fail-safe to protect against compacting the same sstable concurrently.
- if c != nil && !inputAlreadyCompacting(c) {
- c.score = info.score
- return c
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment