Skip to content

Instantly share code, notes, and snippets.

@matheusd
Created September 23, 2020 18:26
Show Gist options
  • Save matheusd/88f172df827629b189854be1c4570bfa to your computer and use it in GitHub Desktop.
Save matheusd/88f172df827629b189854be1c4570bfa to your computer and use it in GitHub Desktop.
package main
import (
"sync"
)
func Highway(asyncProcessor EventProcessor, numWorkers int, inputC chan Event) chan HydratedEvent {
outputC := make(chan HydratedEvent, 0)
type worker struct {
in chan Event
out chan HydratedEvent
}
nextWorker := make(chan *worker, numWorkers)
nextResponse := make(chan *worker, numWorkers)
quit := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
w := &worker{
in: make(chan Event, 1),
out: make(chan HydratedEvent, 1),
}
wg.Add(1)
go func() {
defer wg.Done()
for {
var e Event
select {
case <-quit:
return
case e = <-w.in:
}
he := asyncProcessor(e)
w.out <- he
}
}()
nextWorker <- w
}
go func() {
defer close(quit)
defer close(nextResponse)
for in := range inputC {
w := <-nextWorker
w.in <- in
nextResponse <- w
nextWorker <- w
}
}()
go func() {
defer func() {
wg.Wait()
close(outputC)
}()
for w := range nextResponse {
he := <-w.out
outputC <- he
}
}()
return outputC
}
package main
import (
"sync"
)
func MyWay(asyncProcessor EventProcessor, numWorkers int, inputC chan Event) chan HydratedEvent {
outputC := make(chan HydratedEvent, 0)
workers := make([]chan Event, numWorkers)
responses := make([]chan HydratedEvent, numWorkers)
quit := make(chan struct{})
var wg sync.WaitGroup
for i := range workers {
workers[i] = make(chan Event, 1)
responses[i] = make(chan HydratedEvent, 1)
wg.Add(1)
go func(w chan Event, r chan HydratedEvent) {
defer wg.Done()
for {
var e Event
select {
case <-quit:
return
case e = <-w:
}
he := asyncProcessor(e)
select {
case <-quit:
return
case r <- he:
}
}
}(workers[i], responses[i])
}
go func() {
defer close(quit)
var i int
for in := range inputC {
workers[i] <- in
i = (i + 1) % numWorkers
}
}()
go func() {
defer func() {
wg.Wait()
close(outputC)
}()
var i int
for {
var he HydratedEvent
select {
case he = <-responses[i]:
case <-quit:
return
}
select {
case outputC <- he:
case <-quit:
return
}
i = (i + 1) % numWorkers
}
}()
return outputC
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment