Skip to content

Instantly share code, notes, and snippets.

@liggitt
Created January 6, 2021 16:57
Show Gist options
  • Save liggitt/7cf544f833b6baec029bb21dc247ecfd to your computer and use it in GitHub Desktop.
Save liggitt/7cf544f833b6baec029bb21dc247ecfd to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
fmt.Println("ctrl+c to stop...")
swh := &sampleAndWaterMarkHistograms{
sampleAndWaterMarkObserverGenerator: &sampleAndWaterMarkObserverGenerator{
t0: time.Now(),
samplePeriod: time.Millisecond,
},
}
// set up a few goroutines per CPU
for i := 0; i < 4*runtime.NumCPU(); i++ {
go func() {
for {
swh.Set(1)
time.Sleep(time.Millisecond)
}
}()
}
select {}
}
type sampleAndWaterMarkObserverGenerator struct {
t0 time.Time
samplePeriod time.Duration
}
func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 {
if when.Before(swg.t0) {
panic(fmt.Errorf("Time went backwards: t0=%v, when=%v", swg.t0, when))
}
return int64(when.Sub(swg.t0) / swg.samplePeriod)
}
type sampleAndWaterMarkHistograms struct {
*sampleAndWaterMarkObserverGenerator
sync.Mutex
x1 float64
sampleAndWaterMarkAccumulator
}
type sampleAndWaterMarkAccumulator struct {
lastSet time.Time
lastSetInt int64 // lastSet / samplePeriod
x float64
relX float64 // x / x1
loRelX, hiRelX float64
}
func (saw *sampleAndWaterMarkHistograms) Set(x float64) {
saw.innerSet(func() {
saw.x = x
})
}
func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
var when time.Time
var whenInt int64
var acc sampleAndWaterMarkAccumulator
var wellOrdered bool
func() {
saw.Lock()
defer saw.Unlock()
when = time.Now()
whenInt = saw.quantize(when)
acc = saw.sampleAndWaterMarkAccumulator
wellOrdered = !when.Before(acc.lastSet)
updateXOrX1()
saw.relX = saw.x / saw.x1
if wellOrdered {
if acc.lastSetInt < whenInt {
saw.loRelX, saw.hiRelX = acc.relX, acc.relX
saw.lastSetInt = whenInt
}
saw.lastSet = when
}
// `wellOrdered` should always be true because we are using
// monotonic clock readings and they never go backwards. Yet
// very small backwards steps (under 1 microsecond) have been
// observed
// (https://github.com/kubernetes/kubernetes/issues/96459).
// In the backwards case, treat the current reading as if it
// had occurred at time `saw.lastSet` and log an error. It
// would be wrong to update `saw.lastSet` in this case because
// that plants a time bomb for future updates to
// `saw.lastSetInt`.
if saw.relX < saw.loRelX {
saw.loRelX = saw.relX
} else if saw.relX > saw.hiRelX {
saw.hiRelX = saw.relX
}
}()
if !wellOrdered {
lastSetS := acc.lastSet.String()
whenS := when.String()
fmt.Printf("Time went backwards from %s to %s\n", lastSetS, whenS)
}
for acc.lastSetInt < whenInt {
acc.lastSetInt++
acc.loRelX, acc.hiRelX = acc.relX, acc.relX
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment