Skip to content

Instantly share code, notes, and snippets.

@kulti
Created May 5, 2021 05:12
Show Gist options
  • Save kulti/e3514db299f48b9f859b73a9a06ec1fc to your computer and use it in GitHub Desktop.
Save kulti/e3514db299f48b9f859b73a9a06ec1fc to your computer and use it in GitHub Desktop.
package hw06pipelineexecution
import (
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestPipeline(t *testing.T) {
g := func(_ string, f func(v interface{}) interface{}) Stage {
return func(in In) Out {
out := make(Bi)
go func() {
defer close(out)
for v := range in {
out <- f(v)
}
}()
return out
}
}
stages := []Stage{
g("Dummy", func(v interface{}) interface{} { return v }),
g("Multiplier (* 2)", func(v interface{}) interface{} { return v.(int) * 2 }),
g("Adder (+ 100)", func(v interface{}) interface{} { return v.(int) + 100 }),
g("Stringifier", func(v interface{}) interface{} { return strconv.Itoa(v.(int)) }),
}
in := make(Bi)
data := []int{1, 2, 3, 4, 5}
go func() {
for _, v := range data {
in <- v
}
close(in)
}()
result := make([]string, 0, 10)
for s := range ExecutePipeline(in, nil, stages...) {
result = append(result, s.(string))
}
require.Equal(t, []string{"102", "104", "106", "108", "110"}, result)
}
func TestPipelineConcurrency(t *testing.T) {
waitCh := make(chan struct{})
defer close(waitCh)
stageFn := func(in In) Out {
out := make(Bi)
go func() {
defer close(out)
for v := range in {
out <- v
<-waitCh
}
}()
return out
}
lastStageFn := func(in In) Out {
out := make(Bi)
go func() {
defer close(out)
for v := range in {
out <- v
<-waitCh
}
}()
return out
}
in := make(Bi)
const testValue = "test"
go func() {
in <- testValue
close(in)
}()
var resValue interface{}
out := ExecutePipeline(in, nil, stageFn, stageFn, lastStageFn)
require.Eventually(t, func() bool {
select {
case resValue = <-out:
return true
default:
return false
}
}, time.Second, time.Millisecond)
require.EqualValues(t, testValue, resValue)
}
func TestPipelineDone(t *testing.T) {
waitCh := make(chan struct{})
defer close(waitCh)
stageFn := func(in In) Out {
out := make(Bi)
go func() {
defer close(out)
for v := range in {
<-waitCh
out <- v
}
}()
return out
}
in := make(Bi)
const testValue = "test"
go func() {
in <- testValue
close(in)
}()
doneCh := make(Bi)
var resValue interface{}
out := ExecutePipeline(in, doneCh, stageFn, stageFn, stageFn)
close(doneCh)
require.Eventually(t, func() bool {
select {
case resValue = <-out:
return true
default:
return false
}
}, time.Second, time.Millisecond)
require.Nil(t, resValue)
}
@Antonboom
Copy link

а тут вообще в принципе нужен waitCh?

кажется, что его можно убрать и ничего не изменится. по сути ты убрал слипы и мы получили тест на неблокированную работу связки каналов (ждём теперь вместо времени слипа в стейдже время на оверхед по этой работе).

плюс если его убрать, сразу покроется случай с

Надо еще один тест написать, который покажет, что первый стейдж не ждет второй.

ну в общем, те же яйца только в профиль, не?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment