Skip to content

Instantly share code, notes, and snippets.

@Jguer
Created October 10, 2021 20:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Jguer/cd632dfbff0a4298730d5e60aab6e4d7 to your computer and use it in GitHub Desktop.
Save Jguer/cd632dfbff0a4298730d5e60aab6e4d7 to your computer and use it in GitHub Desktop.
Ordered Fan-Out and collect in Go
package main
import (
"fmt"
"math/rand"
"os"
"runtime"
"strings"
"sync"
"time"
)
type Mappable interface {
Index() int
Process() Mappable
}
type value struct {
name string
index int
}
// Index returns the original position of the object
func (v *value) Index() int {
return v.index
}
// Process the value and return the result as the same type but copied
func (v *value) Process() Mappable {
// Random sleep between 1-5s handled.
time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)
return &value{name: v.name, index: v.index}
}
func (v *value) String() string {
return fmt.Sprintf("{index: %d, value: %s}", v.index, v.name)
}
func main() {
numOfWorkers := runtime.NumCPU()
fmt.Fprintln(os.Stdout, "Running on", numOfWorkers, "goroutines.")
var (
inChan = make(chan Mappable) // Input values
outChanValues = make(chan Mappable, 10) // Output values
)
// Create consumers
wg := &sync.WaitGroup{} // Waitgroup for workers
wg.Add(numOfWorkers)
for s := 0; s < numOfWorkers; s++ {
go fanoutWorker(wg, inChan, s, outChanValues)
}
// Input data
inputValues := strings.Fields(Names)
go inputData(inChan, inputValues)
go func() {
// Once input data is treated and all workers have returned close the output channel
wg.Wait()
close(outChanValues)
}()
outputValues := make([]string, len(inputValues)) // Collected Output values
for v := range outChanValues {
fmt.Fprintf(os.Stdout, "Success: %v\n", v)
realValue := v.(*value)
outputValues[v.Index()] = realValue.name
}
if Names == strings.Join(outputValues, " ") {
fmt.Fprintln(os.Stdout, "Order was respected, input order is the same as output.")
} else {
fmt.Fprintln(os.Stdout, "Order was not respected, input order is not the same as output.")
}
}
// Insert data into the input channel and signal it's done
func inputData(inChan chan<- Mappable, inputValues []string) {
for i, v := range inputValues {
inChan <- &value{name: v, index: i}
}
close(inChan)
}
func fanoutWorker(wg *sync.WaitGroup, inChan <-chan Mappable,
routineName int, valOut chan<- Mappable) {
defer wg.Done()
for name := range inChan {
valOut <- name.Process()
}
}
const Names = "dynasty regret appalling creative accessories forlornness bazooka pattern first glow crackdown daughter addictive goon beautiful grave amusement pitch peepshow accountable bloat cyanide fork fight axiom biggest enjoy disfigurement teen foreign company lavender owl hooligan blabbermouth blockade frying melody empire apocalyptic hooves terror believable vibrator sentinel famous convulsion flirtation system heavyhearted"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment