Skip to content

Instantly share code, notes, and snippets.

@jwilder
Created March 13, 2017 20:55
Show Gist options
  • Save jwilder/048a081ba31b9b7f68c2b26287733c4b to your computer and use it in GitHub Desktop.
Save jwilder/048a081ba31b9b7f68c2b26287733c4b to your computer and use it in GitHub Desktop.
package merge
import (
"sort"
"testing"
"time"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
type Values []tsm1.Value
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a Values) StdMerge(b tsm1.Values) tsm1.Values {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return tsm1.Values(a)
}
out := make(Values, 0, len(a)+len(b))
for len(a) > 0 && len(b) > 0 {
if a[0].UnixNano() < b[0].UnixNano() {
out, a = append(out, a[0]), a[1:]
} else {
if len(b) > 0 && a[0].UnixNano() == b[0].UnixNano() {
a = a[1:]
} else {
out, b = append(out, b[0]), b[1:]
}
}
}
if len(a) > 0 {
out = append(out, a...)
} else {
out = append(out, b...)
}
return tsm1.Values(out)
}
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a Values) Merge(b Values) Values {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if a[len(a)-1].UnixNano() < b[0].UnixNano() {
return append(a, b...)
}
if b[len(b)-1].UnixNano() < a[0].UnixNano() {
return append(b, a...)
}
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
var k int
if len(b) > 0 && av > b[len(b)-1].UnixNano() {
// Fast path where a is after b, we skip the search
k = len(b)
} else {
// See where value we save from a should be inserted in b to keep b sorted
k = sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
}
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}
if len(b) > 0 {
return append(a, b...)
}
return a
}
func BenchmarkValues_Merge(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t+1, float64(i))
}
b.ResetTimer()
benchmarkMerge(a, c, b)
}
func BenchmarkValues_MergeSame(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t, float64(i))
}
b.ResetTimer()
benchmarkMerge(a, c, b)
}
func BenchmarkValues_StdMerge(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t+1, float64(i))
}
b.ResetTimer()
benchmarkStdMerge(a, c, b)
}
func BenchmarkValues_StdMergeSame(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t, float64(i))
}
b.ResetTimer()
benchmarkStdMerge(a, c, b)
}
func benchmarkMerge(a, c tsm1.Values, b *testing.B) {
for i := 0; i < b.N; i++ {
Values(a).Merge(Values(c))
}
}
func benchmarkStdMerge(a, c tsm1.Values, b *testing.B) {
for i := 0; i < b.N; i++ {
Values(a).StdMerge(c)
}
}
func getTimes(n, step int, precision time.Duration) []int64 {
t := time.Now().Round(precision).UnixNano()
a := make([]int64, n)
for i := 0; i < n; i++ {
a[i] = t + (time.Duration(i*60) * precision).Nanoseconds()
}
return a
}
@jwilder
Copy link
Author

jwilder commented Mar 13, 2017

$ go test -run non -bench Merge -benchmem -v ./merge/
BenchmarkValues_Merge-8          	  100000	     12220 ns/op	   32768 B/op	       1 allocs/op
BenchmarkValues_MergeSame-8      	  200000	     10058 ns/op	       0 B/op	       0 allocs/op
BenchmarkValues_StdMerge-8       	   50000	     33001 ns/op	   32768 B/op	       1 allocs/op
BenchmarkValues_StdMergeSame-8   	   50000	     37960 ns/op	   32768 B/op	       1 allocs/op

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment