Skip to content

Instantly share code, notes, and snippets.

@atedja
Created December 31, 2014 00:34
Show Gist options
  • Save atedja/bba9ee75835702e1c8bc to your computer and use it in GitHub Desktop.
Save atedja/bba9ee75835702e1c8bc to your computer and use it in GitHub Desktop.
Fan-in and Fan-out example in Go
package main
import "fmt"
import "time"
var workerId int = 0
var publisherId int = 0
// Publishers push data into a channel
func Publisher(out chan string) {
publisherId += 1
thisId := publisherId
dataId := 0
for {
dataId += 1
fmt.Printf("Publisher %d is pushing data\n", thisId)
data := fmt.Sprintf("Data from publisher %d. Data %d", thisId, dataId)
out <- data
}
}
func WorkerProcess(in <-chan string) {
workerId += 1
thisId := workerId
for {
fmt.Printf("%d: waiting for input...\n", thisId)
input := <-in
fmt.Printf("%d: input is: %s\n", thisId, input)
}
}
func main() {
input := make(chan string)
go WorkerProcess(input)
go WorkerProcess(input)
go WorkerProcess(input)
go Publisher(input)
go Publisher(input)
go Publisher(input)
go Publisher(input)
time.Sleep(1 * time.Millisecond)
}
@phecht
Copy link

phecht commented Feb 6, 2018

Hello,

It seems you have a race condition when adding workerId and PublisherId. If I put in an atomic.AddInt64 and LoadInt64 to replace the += 1.

	//publisherId += 1
	atomic.AddInt64(&publisherID, 1)
	thisID := atomic.LoadInt64(&publisherID)

and

	atomic.AddInt64(&workerID, 1)
	thisID := atomic.LoadInt64(&workerID)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment