Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package main
import (
"fmt"
"strings"
"sync"
"time"
)
type ChannelPerf struct {
ch chan interface{}
sent, received int64
sendBlockingDuration time.Duration
receiveBlockingDuration time.Duration
start time.Time
timeLock sync.Mutex
name string
}
func NewChannelPerf(size int, name string) *ChannelPerf {
return &ChannelPerf{
ch: make(chan interface{}, size),
start: time.Now(),
name: name,
}
}
func (cf *ChannelPerf) Receive() (obj interface{}, received bool) {
start := time.Now()
obj, received = <-cf.ch
cf.timeLock.Lock()
defer cf.timeLock.Unlock()
cf.receiveBlockingDuration += time.Now().Sub(start)
if received {
cf.received++
}
return
}
func (cf *ChannelPerf) Send(obj interface{}) {
start := time.Now()
cf.ch <- obj
cf.timeLock.Lock()
defer cf.timeLock.Unlock()
cf.sendBlockingDuration += time.Now().Sub(start)
cf.sent++
}
func (cf *ChannelPerf) Close() {
close(cf.ch)
}
func (cf *ChannelPerf) Reset() {
cf.timeLock.Lock()
defer cf.timeLock.Unlock()
cf.sendBlockingDuration = 0
cf.receiveBlockingDuration = 0
cf.start = time.Now()
cf.sent = 0
cf.received = 0
}
func (cf *ChannelPerf) String() string {
cf.timeLock.Lock()
defer cf.timeLock.Unlock()
return fmt.Sprintf("%s (clock: %v, sent: %d in %v, received: %d in %v)",
cf.name, time.Now().Sub(cf.start),
cf.sent, cf.sendBlockingDuration,
cf.received, cf.receiveBlockingDuration)
}
func (cf *ChannelPerf) ReceiveFor(expire time.Duration) (obj interface{}, received bool, timedOut bool) {
start := time.Now()
select {
case obj, received = <-cf.ch:
case <-time.After(expire):
timedOut = true
}
cf.timeLock.Lock()
defer cf.timeLock.Unlock()
cf.receiveBlockingDuration += time.Now().Sub(start)
if received {
cf.received++
}
return
}
func (cf *ChannelPerf) Channel() chan interface{} {
return cf.ch
}
type ChannelPerfGroup struct {
channels []*ChannelPerf
channelsLock sync.Mutex
}
func NewChannelPerfGroup() *ChannelPerfGroup {
return &ChannelPerfGroup{}
}
func (g *ChannelPerfGroup) NewChannelPerf(size int, name string) *ChannelPerf {
channel := NewChannelPerf(size, name)
g.channelsLock.Lock()
defer g.channelsLock.Unlock()
g.channels = append(g.channels, channel)
return channel
}
func (g *ChannelPerfGroup) String() string {
g.channelsLock.Lock()
defer g.channelsLock.Unlock()
var lines []string
for _, channel := range g.channels {
lines = append(lines, channel.String())
}
return strings.Join(lines, "; ")
}
func (g *ChannelPerfGroup) Reset() {
g.channelsLock.Lock()
defer g.channelsLock.Unlock()
for _, channel := range g.channels {
channel.Reset()
}
}
func (g *ChannelPerfGroup) StartTicker(every time.Duration, reset bool, fn func()) {
go func() {
for range time.NewTicker(every).C {
fn()
if reset {
g.Reset()
}
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.