Skip to content

Instantly share code, notes, and snippets.

@macalinao
Last active January 6, 2017 19:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save macalinao/bc08b9a6ec31cb496feaf058dccd6fff to your computer and use it in GitHub Desktop.
Save macalinao/bc08b9a6ec31cb496feaf058dccd6fff to your computer and use it in GitHub Desktop.
package processor
import (
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// Metrics records processed summoners and matches and logs progress.
type Metrics struct {
Logger *logrus.Logger `inject:"t"`
reqCt map[string]int
reqs chan string
reqCtMu sync.Mutex
}
// Start starts the metrics.
func (m *Metrics) Start() {
m.reqCt = map[string]int{}
m.reqs = make(chan string)
// Show rate
go func() {
for range time.Tick(5 * time.Second) {
m.Logger.Infof("===")
m.reqCtMu.Lock()
total := 0
for reqType, ct := range m.reqCt {
m.Logger.Infof("- %s: %d (%.2f/sec)", reqType, ct, float64(ct)/5.0)
total += ct
}
m.Logger.Infof("TOTAL: %d (%.2f/sec)", total, float64(total)/5.0)
m.reqCt = map[string]int{}
m.reqCtMu.Unlock()
}
}()
// Process channels
for endpoint := range m.reqs {
m.reqCtMu.Lock()
m.reqCt[endpoint] += 1
m.reqCtMu.Unlock()
}
}
// RecordSummoner records a summoner.
func (m *Metrics) Record(endpoint string) {
m.reqs <- endpoint
}
// Metrics stream
val metrics = PublishToOneSubject[String]()
// Metrics reporter
metrics.bufferTimed(10.seconds).foreach { vals =>
println("=== METRICS 10S ==")
vals.groupBy(identity).mapValues(_.length).foreach { case (str, ct) =>
println(s"${str}: ${ct} vals")
}
println("=== END METRICS ===")
}
def measuredEndpoint[T](fut: Future[T])(implicit tag: ClassTag[T]): Future[T] = {
val name = tag.runtimeClass.getName
for {
_ <- metrics.onNext(s"start: $name")
res <- endpoint(fut)
_ <- metrics.onNext(s"end: $name")
} yield res
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment