Last active
May 30, 2024 13:32
-
-
Save calam1/469f0a8dcde552547281e6cb940067d5 to your computer and use it in GitHub Desktop.
An example from the book Concurrency in Go generating random numbers and testing if they are prime, using fan in and fan out, plus updated to use generics vs interface{} also updated to show using context instead of done channel, but left old code as an example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"math/rand" | |
"runtime" | |
"sync" | |
"time" | |
) | |
func main() { | |
// done := make(chan interface{}) | |
// defer close(done) | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
// for num := range take(done, repeat(done, 1), 10) { | |
// // for num := range repeat(done, 1) { // will infinitely print 1 | |
// fmt.Printf("%v ", num) | |
// } | |
// context usage instead of done channel | |
// for num := range take(ctx, repeat(ctx, 1), 10) { | |
// fmt.Printf("%v ", num) | |
// } | |
randFunc := func() interface{} { return rand.Int() } | |
// for num := range take(done, repeatFn(done, randFunc), 10) { | |
// // for num := range repeat(done, 1) { // will infinitely print 1 | |
// fmt.Printf("%v\n", num) | |
// } | |
// context usage instead of done channel | |
// for num := range take(ctx, repeatFn(ctx, randFunc), 10) { | |
// // for num := range repeat(done, 1) { // will infinitely print 1 | |
// fmt.Printf("%v\n", num) | |
// } | |
start := time.Now() | |
// finding primes is slow | |
// randIntStream := repeatFn(done, randFunc) | |
// for prime := range take(done, primeFinder(done, randIntStream, isPrime), 10) { | |
// fmt.Printf("prime: %v\n", prime) | |
// } | |
// finding primes is slow - context usage instead of done channel | |
// randIntStream := repeatFn(ctx, randFunc) | |
// for prime := range take(ctx, primeFinder(ctx, randIntStream, isPrime), 10) { | |
// fmt.Printf("prime: %v\n", prime) | |
// } | |
// fan-out - primeFinder - about 8 times faster than above | |
// randIntStream := repeatFn(done, randFunc) | |
// numFinders := runtime.NumCPU() // get number of cores | |
// fmt.Printf("Spinning up %d prime finders\n", numFinders) | |
// finders := make([]<-chan interface{}, numFinders) | |
// fmt.Println("Spinning up finders for Prime") | |
// for i := 0; i < numFinders; i++ { | |
// finders[i] = primeFinder(done, randIntStream, isPrime) | |
// } | |
// | |
// for prime := range take(done, fanIn(done, finders...), 10) { | |
// fmt.Printf("prime: %v\n", prime) | |
// } | |
// fan-out - primeFinder - about 8 times faster than above - context usage instead of done channel | |
randIntStream := repeatFn(ctx, randFunc) | |
numFinders := runtime.NumCPU() // get number of cores | |
fmt.Printf("Spinning up %d prime finders\n", numFinders) | |
finders := make([]<-chan interface{}, numFinders) | |
fmt.Println("Spinning up finders for Prime") | |
for i := 0; i < numFinders; i++ { | |
finders[i] = primeFinder(ctx, randIntStream, isPrime) | |
} | |
for prime := range take(ctx, fanIn(ctx, finders...), 10) { | |
fmt.Printf("prime: %v\n", prime) | |
} | |
fmt.Printf("prime search took %v\n", time.Since(start)) | |
} | |
// this is just a generator that will repeat the values passed to it indefinitely | |
// func repeat[T any](done <-chan T, values ...T) <-chan T { | |
// valueStream := make(chan T) | |
// go func() { | |
// defer close(valueStream) | |
// for { // infinite loop | |
// for _, v := range values { | |
// select { | |
// case <-done: | |
// return | |
// case valueStream <- v: | |
// } | |
// } | |
// } | |
// }() | |
// return valueStream | |
// } | |
func repeat[T any](ctx context.Context, values ...T) <-chan T { | |
valueStream := make(chan T) | |
go func() { | |
defer close(valueStream) | |
for { | |
for _, v := range values { | |
select { | |
case <-ctx.Done(): // Check for cancellation | |
return | |
case valueStream <- v: | |
} | |
} | |
} | |
}() | |
return valueStream | |
} | |
// func repeatFn[T any](done <-chan T, fn func() T) <-chan T { | |
// valueStream := make(chan T) | |
// go func() { | |
// defer close(valueStream) | |
// for { | |
// select { | |
// case <-done: | |
// return | |
// case valueStream <- fn(): | |
// } | |
// } | |
// }() | |
// return valueStream | |
// } | |
func repeatFn[T any](ctx context.Context, fn func() T) <-chan T { | |
valueStream := make(chan T) | |
go func() { | |
defer close(valueStream) | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case valueStream <- fn(): | |
} | |
} | |
}() | |
return valueStream | |
} | |
// func take[T any](done <-chan T, valueStream <-chan T, num int) <-chan T { | |
// takeStream := make(chan T) | |
// go func() { | |
// defer close(takeStream) | |
// for i := 0; i < num; i++ { | |
// select { | |
// case <-done: | |
// return | |
// case takeStream <- <-valueStream: | |
// } | |
// } | |
// }() | |
// return takeStream | |
// } | |
func take[T any](ctx context.Context, valueStream <-chan T, num int) <-chan T { | |
takeStream := make(chan T) | |
go func() { | |
defer close(takeStream) | |
for i := 0; i < num; i++ { | |
select { | |
case <-ctx.Done(): | |
return | |
case takeStream <- <-valueStream: | |
} | |
} | |
}() | |
return takeStream | |
} | |
// func primeFinder[T any](done <-chan T, numbers <-chan T, isPrime func(T) bool) <-chan T { | |
// primes := make(chan T) | |
// go func() { | |
// defer close(primes) | |
// for num := range numbers { | |
// if isPrime(num) { // Use type assertion with concrete type int | |
// select { | |
// case <-done: | |
// return | |
// case primes <- num: | |
// } | |
// } | |
// } | |
// }() | |
// return primes | |
// } | |
func primeFinder[T any](ctx context.Context, numbers <-chan T, isPrime func(T) bool) <-chan T { | |
primes := make(chan T) | |
go func() { | |
defer close(primes) | |
for num := range numbers { | |
if isPrime(num) { // Use type assertion with concrete type int | |
select { | |
case <-ctx.Done(): | |
return | |
case primes <- num: | |
} | |
} | |
} | |
}() | |
return primes | |
} | |
func isPrime(number interface{}) bool { | |
if num, ok := number.(int); ok { | |
if num <= 1 { | |
return false | |
} | |
if num <= 3 { | |
return true | |
} | |
if num%2 == 0 || num%3 == 0 { | |
return false | |
} | |
i := 5 | |
for i*i <= num { | |
if num%i == 0 || num%(i+2) == 0 { | |
return false | |
} | |
i += 6 | |
} | |
return true | |
} | |
return false | |
} | |
// func fanIn[T any](done <-chan T, channels ...<-chan T) <-chan T { | |
// var wg sync.WaitGroup | |
// multiplexedStream := make(chan T) | |
// | |
// multiplex := func(c <-chan T) { | |
// defer wg.Done() | |
// for i := range c { | |
// select { | |
// case <-done: | |
// return | |
// case multiplexedStream <- i: | |
// } | |
// } | |
// } | |
// | |
// // select from all the channels | |
// wg.Add(len(channels)) | |
// for _, c := range channels { | |
// go multiplex(c) | |
// } | |
// | |
// // wait for all the reads to complete | |
// go func() { | |
// wg.Wait() | |
// close(multiplexedStream) | |
// }() | |
// | |
// return multiplexedStream | |
// } | |
func fanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T { | |
var wg sync.WaitGroup | |
multiplexedStream := make(chan T) | |
multiplex := func(c <-chan T) { | |
defer wg.Done() | |
for i := range c { | |
select { | |
case <-ctx.Done(): | |
return | |
case multiplexedStream <- i: | |
} | |
} | |
} | |
// select from all the channels | |
wg.Add(len(channels)) | |
for _, c := range channels { | |
go multiplex(c) | |
} | |
// wait for all the reads to complete | |
go func() { | |
wg.Wait() | |
close(multiplexedStream) | |
}() | |
return multiplexedStream | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment