Skip to content

Instantly share code, notes, and snippets.

@calam1
Last active May 30, 2024 13:32
Show Gist options
  • Save calam1/469f0a8dcde552547281e6cb940067d5 to your computer and use it in GitHub Desktop.
Save calam1/469f0a8dcde552547281e6cb940067d5 to your computer and use it in GitHub Desktop.
An example from the book Concurrency in Go generating random numbers and testing if they are prime, using fan in and fan out, plus updated to use generics vs interface{} also updated to show using context instead of done channel, but left old code as an example
package main
import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
"time"
)
func main() {
// done := make(chan interface{})
// defer close(done)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// for num := range take(done, repeat(done, 1), 10) {
// // for num := range repeat(done, 1) { // will infinitely print 1
// fmt.Printf("%v ", num)
// }
// context usage instead of done channel
// for num := range take(ctx, repeat(ctx, 1), 10) {
// fmt.Printf("%v ", num)
// }
randFunc := func() interface{} { return rand.Int() }
// for num := range take(done, repeatFn(done, randFunc), 10) {
// // for num := range repeat(done, 1) { // will infinitely print 1
// fmt.Printf("%v\n", num)
// }
// context usage instead of done channel
// for num := range take(ctx, repeatFn(ctx, randFunc), 10) {
// // for num := range repeat(done, 1) { // will infinitely print 1
// fmt.Printf("%v\n", num)
// }
start := time.Now()
// finding primes is slow
// randIntStream := repeatFn(done, randFunc)
// for prime := range take(done, primeFinder(done, randIntStream, isPrime), 10) {
// fmt.Printf("prime: %v\n", prime)
// }
// finding primes is slow - context usage instead of done channel
// randIntStream := repeatFn(ctx, randFunc)
// for prime := range take(ctx, primeFinder(ctx, randIntStream, isPrime), 10) {
// fmt.Printf("prime: %v\n", prime)
// }
// fan-out - primeFinder - about 8 times faster than above
// randIntStream := repeatFn(done, randFunc)
// numFinders := runtime.NumCPU() // get number of cores
// fmt.Printf("Spinning up %d prime finders\n", numFinders)
// finders := make([]<-chan interface{}, numFinders)
// fmt.Println("Spinning up finders for Prime")
// for i := 0; i < numFinders; i++ {
// finders[i] = primeFinder(done, randIntStream, isPrime)
// }
//
// for prime := range take(done, fanIn(done, finders...), 10) {
// fmt.Printf("prime: %v\n", prime)
// }
// fan-out - primeFinder - about 8 times faster than above - context usage instead of done channel
randIntStream := repeatFn(ctx, randFunc)
numFinders := runtime.NumCPU() // get number of cores
fmt.Printf("Spinning up %d prime finders\n", numFinders)
finders := make([]<-chan interface{}, numFinders)
fmt.Println("Spinning up finders for Prime")
for i := 0; i < numFinders; i++ {
finders[i] = primeFinder(ctx, randIntStream, isPrime)
}
for prime := range take(ctx, fanIn(ctx, finders...), 10) {
fmt.Printf("prime: %v\n", prime)
}
fmt.Printf("prime search took %v\n", time.Since(start))
}
// this is just a generator that will repeat the values passed to it indefinitely
// func repeat[T any](done <-chan T, values ...T) <-chan T {
// valueStream := make(chan T)
// go func() {
// defer close(valueStream)
// for { // infinite loop
// for _, v := range values {
// select {
// case <-done:
// return
// case valueStream <- v:
// }
// }
// }
// }()
// return valueStream
// }
func repeat[T any](ctx context.Context, values ...T) <-chan T {
valueStream := make(chan T)
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-ctx.Done(): // Check for cancellation
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
// func repeatFn[T any](done <-chan T, fn func() T) <-chan T {
// valueStream := make(chan T)
// go func() {
// defer close(valueStream)
// for {
// select {
// case <-done:
// return
// case valueStream <- fn():
// }
// }
// }()
// return valueStream
// }
func repeatFn[T any](ctx context.Context, fn func() T) <-chan T {
valueStream := make(chan T)
go func() {
defer close(valueStream)
for {
select {
case <-ctx.Done():
return
case valueStream <- fn():
}
}
}()
return valueStream
}
// func take[T any](done <-chan T, valueStream <-chan T, num int) <-chan T {
// takeStream := make(chan T)
// go func() {
// defer close(takeStream)
// for i := 0; i < num; i++ {
// select {
// case <-done:
// return
// case takeStream <- <-valueStream:
// }
// }
// }()
// return takeStream
// }
func take[T any](ctx context.Context, valueStream <-chan T, num int) <-chan T {
takeStream := make(chan T)
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-ctx.Done():
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
// func primeFinder[T any](done <-chan T, numbers <-chan T, isPrime func(T) bool) <-chan T {
// primes := make(chan T)
// go func() {
// defer close(primes)
// for num := range numbers {
// if isPrime(num) { // Use type assertion with concrete type int
// select {
// case <-done:
// return
// case primes <- num:
// }
// }
// }
// }()
// return primes
// }
func primeFinder[T any](ctx context.Context, numbers <-chan T, isPrime func(T) bool) <-chan T {
primes := make(chan T)
go func() {
defer close(primes)
for num := range numbers {
if isPrime(num) { // Use type assertion with concrete type int
select {
case <-ctx.Done():
return
case primes <- num:
}
}
}
}()
return primes
}
func isPrime(number interface{}) bool {
if num, ok := number.(int); ok {
if num <= 1 {
return false
}
if num <= 3 {
return true
}
if num%2 == 0 || num%3 == 0 {
return false
}
i := 5
for i*i <= num {
if num%i == 0 || num%(i+2) == 0 {
return false
}
i += 6
}
return true
}
return false
}
// func fanIn[T any](done <-chan T, channels ...<-chan T) <-chan T {
// var wg sync.WaitGroup
// multiplexedStream := make(chan T)
//
// multiplex := func(c <-chan T) {
// defer wg.Done()
// for i := range c {
// select {
// case <-done:
// return
// case multiplexedStream <- i:
// }
// }
// }
//
// // select from all the channels
// wg.Add(len(channels))
// for _, c := range channels {
// go multiplex(c)
// }
//
// // wait for all the reads to complete
// go func() {
// wg.Wait()
// close(multiplexedStream)
// }()
//
// return multiplexedStream
// }
func fanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
var wg sync.WaitGroup
multiplexedStream := make(chan T)
multiplex := func(c <-chan T) {
defer wg.Done()
for i := range c {
select {
case <-ctx.Done():
return
case multiplexedStream <- i:
}
}
}
// select from all the channels
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
// wait for all the reads to complete
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment