package main | |
import ( | |
"fmt" | |
"strings" | |
"sync" | |
"time" | |
) | |
type ChannelPerf struct { | |
ch chan interface{} | |
sent, received int64 | |
sendBlockingDuration time.Duration | |
receiveBlockingDuration time.Duration | |
start time.Time | |
timeLock sync.Mutex | |
name string | |
} | |
func NewChannelPerf(size int, name string) *ChannelPerf { | |
return &ChannelPerf{ | |
ch: make(chan interface{}, size), | |
start: time.Now(), | |
name: name, | |
} | |
} | |
func (cf *ChannelPerf) Receive() (obj interface{}, received bool) { | |
start := time.Now() | |
obj, received = <-cf.ch | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.receiveBlockingDuration += time.Now().Sub(start) | |
if received { | |
cf.received++ | |
} | |
return | |
} | |
func (cf *ChannelPerf) Send(obj interface{}) { | |
start := time.Now() | |
cf.ch <- obj | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.sendBlockingDuration += time.Now().Sub(start) | |
cf.sent++ | |
} | |
func (cf *ChannelPerf) Close() { | |
close(cf.ch) | |
} | |
func (cf *ChannelPerf) Reset() { | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.sendBlockingDuration = 0 | |
cf.receiveBlockingDuration = 0 | |
cf.start = time.Now() | |
cf.sent = 0 | |
cf.received = 0 | |
} | |
func (cf *ChannelPerf) String() string { | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
return fmt.Sprintf("%s (clock: %v, sent: %d in %v, received: %d in %v)", | |
cf.name, time.Now().Sub(cf.start), | |
cf.sent, cf.sendBlockingDuration, | |
cf.received, cf.receiveBlockingDuration) | |
} | |
func (cf *ChannelPerf) ReceiveFor(expire time.Duration) (obj interface{}, received bool, timedOut bool) { | |
start := time.Now() | |
select { | |
case obj, received = <-cf.ch: | |
case <-time.After(expire): | |
timedOut = true | |
} | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.receiveBlockingDuration += time.Now().Sub(start) | |
if received { | |
cf.received++ | |
} | |
return | |
} | |
func (cf *ChannelPerf) Channel() chan interface{} { | |
return cf.ch | |
} | |
type ChannelPerfGroup struct { | |
channels []*ChannelPerf | |
channelsLock sync.Mutex | |
} | |
func NewChannelPerfGroup() *ChannelPerfGroup { | |
return &ChannelPerfGroup{} | |
} | |
func (g *ChannelPerfGroup) NewChannelPerf(size int, name string) *ChannelPerf { | |
channel := NewChannelPerf(size, name) | |
g.channelsLock.Lock() | |
defer g.channelsLock.Unlock() | |
g.channels = append(g.channels, channel) | |
return channel | |
} | |
func (g *ChannelPerfGroup) String() string { | |
g.channelsLock.Lock() | |
defer g.channelsLock.Unlock() | |
var lines []string | |
for _, channel := range g.channels { | |
lines = append(lines, channel.String()) | |
} | |
return strings.Join(lines, "; ") | |
} | |
func (g *ChannelPerfGroup) Reset() { | |
g.channelsLock.Lock() | |
defer g.channelsLock.Unlock() | |
for _, channel := range g.channels { | |
channel.Reset() | |
} | |
} | |
func (g *ChannelPerfGroup) StartTicker(every time.Duration, reset bool, fn func()) { | |
go func() { | |
for range time.NewTicker(every).C { | |
fn() | |
if reset { | |
g.Reset() | |
} | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment