Created
October 10, 2021 20:59
-
-
Save Jguer/cd632dfbff0a4298730d5e60aab6e4d7 to your computer and use it in GitHub Desktop.
Ordered Fan-Out and collect in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"os" | |
"runtime" | |
"strings" | |
"sync" | |
"time" | |
) | |
type Mappable interface { | |
Index() int | |
Process() Mappable | |
} | |
type value struct { | |
name string | |
index int | |
} | |
// Index returns the original position of the object | |
func (v *value) Index() int { | |
return v.index | |
} | |
// Process the value and return the result as the same type but copied | |
func (v *value) Process() Mappable { | |
// Random sleep between 1-5s handled. | |
time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second) | |
return &value{name: v.name, index: v.index} | |
} | |
func (v *value) String() string { | |
return fmt.Sprintf("{index: %d, value: %s}", v.index, v.name) | |
} | |
func main() { | |
numOfWorkers := runtime.NumCPU() | |
fmt.Fprintln(os.Stdout, "Running on", numOfWorkers, "goroutines.") | |
var ( | |
inChan = make(chan Mappable) // Input values | |
outChanValues = make(chan Mappable, 10) // Output values | |
) | |
// Create consumers | |
wg := &sync.WaitGroup{} // Waitgroup for workers | |
wg.Add(numOfWorkers) | |
for s := 0; s < numOfWorkers; s++ { | |
go fanoutWorker(wg, inChan, s, outChanValues) | |
} | |
// Input data | |
inputValues := strings.Fields(Names) | |
go inputData(inChan, inputValues) | |
go func() { | |
// Once input data is treated and all workers have returned close the output channel | |
wg.Wait() | |
close(outChanValues) | |
}() | |
outputValues := make([]string, len(inputValues)) // Collected Output values | |
for v := range outChanValues { | |
fmt.Fprintf(os.Stdout, "Success: %v\n", v) | |
realValue := v.(*value) | |
outputValues[v.Index()] = realValue.name | |
} | |
if Names == strings.Join(outputValues, " ") { | |
fmt.Fprintln(os.Stdout, "Order was respected, input order is the same as output.") | |
} else { | |
fmt.Fprintln(os.Stdout, "Order was not respected, input order is not the same as output.") | |
} | |
} | |
// Insert data into the input channel and signal it's done | |
func inputData(inChan chan<- Mappable, inputValues []string) { | |
for i, v := range inputValues { | |
inChan <- &value{name: v, index: i} | |
} | |
close(inChan) | |
} | |
func fanoutWorker(wg *sync.WaitGroup, inChan <-chan Mappable, | |
routineName int, valOut chan<- Mappable) { | |
defer wg.Done() | |
for name := range inChan { | |
valOut <- name.Process() | |
} | |
} | |
const Names = "dynasty regret appalling creative accessories forlornness bazooka pattern first glow crackdown daughter addictive goon beautiful grave amusement pitch peepshow accountable bloat cyanide fork fight axiom biggest enjoy disfigurement teen foreign company lavender owl hooligan blabbermouth blockade frying melody empire apocalyptic hooves terror believable vibrator sentinel famous convulsion flirtation system heavyhearted" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment