Skip to content

Instantly share code, notes, and snippets.

@andrask
Last active January 9, 2021 10:39
Show Gist options
  • Save andrask/a44427900c660b86335a7084c2dc5277 to your computer and use it in GitHub Desktop.
Save andrask/a44427900c660b86335a7084c2dc5277 to your computer and use it in GitHub Desktop.
Eventpump with channels
type Event interface{}
type EventChannel chan Event
type EventPump struct {
Input EventChannel
Outputs []EventChannel
}
const BufferSize = 20
func CreateEventPump(numOutputs int) EventPump {
input := make(EventChannel, 0)
outputs := make([]EventChannel, numOutputs)
for i := 0; i < numOutputs; i++ {
outputs[i] = make(EventChannel, BufferSize)
}
go func (input EventChannel, outputs []EventChannel) {
for {
event, more := <-input
if more {
for _, chn := range outputs {
chn <- event
}
} else {
for _, chn := range outputs {
close(chn)
}
break
}
}
}(input, outputs)
return EventPump{
Input: input,
Outputs: outputs,
}
}
import (
"fmt"
"testing"
)
func TestEventPumpSend(t *testing.T) {
for NumOutputs := 0; NumOutputs < 3; NumOutputs++ {
t.Run(fmt.Sprintf("Outputs=%d", NumOutputs), func(t *testing.T) {
eventPump := CreateEventPump(NumOutputs)
if len(eventPump.Outputs) != NumOutputs {
t.Error("Not enough outputs")
}
eventPump.Input <- BeginEvent{}
for _, output := range eventPump.Outputs {
evt := <-output
switch evt.(type) {
case BeginEvent:
default:
t.Errorf("Received something interesting %v", evt)
}
}
})
}
}
func TestEventPumpClosePropagation(t *testing.T) {
eventPump := CreateEventPump(2)
close(eventPump.Input)
for _, output := range eventPump.Outputs {
_, more := <- output
if more {
t.Error("All channels should be closed")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment