Skip to content

Instantly share code, notes, and snippets.

@kenenbek
Last active December 27, 2017 16:36
Show Gist options
  • Save kenenbek/65121d03512d9756c1d6f88400be7209 to your computer and use it in GitHub Desktop.
Save kenenbek/65121d03512d9756c1d6f88400be7209 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math"
"sync"
)
type Environment struct {
currentTime float64
managerChannels []pairChannel
closeChannels []chan interface{}
}
type askChannel chan interface{}
type answerChannel chan interface{}
type pairChannel struct {
askChannel
answerChannel
}
type Process struct {
env *Environment
}
type Worker struct {
env *Environment
askChannel
answerChannel
closeChan chan interface{}
}
func NewProcessWorker(env *Environment) *Worker {
w := &Worker{}
ask := make(chan interface{})
answer := make(chan interface{})
closeChan := make(chan interface{})
w.env = env
w.askChannel = ask
w.answerChannel = answer
w.closeChan = closeChan
pairChan := pairChannel{ask, answer}
env.managerChannels = append(env.managerChannels, pairChan)
env.closeChannels = append(env.closeChannels, closeChan)
return w
}
func (w *Worker) worker1(env *Environment, link chan float64, wg *sync.WaitGroup) {
queue := []float64{0.1, 0.3, 0.5}
go func() {
defer wg.Done()
for {
select {
case rec := <-w.askChannel:
switch rec.(type) {
case bool:
_, queue = queue[0], queue[1:]
default:
if len(queue) > 0 {
w.answerChannel <- queue[0]
} else {
w.answerChannel <- "E"
}
}
case <-w.closeChan:
return
}
}
fmt.Println("end-worker1")
}()
for i := float64(1); i < 5; i++ {
link <- i
}
}
func (w *Worker) worker2(env *Environment, link chan float64, wg *sync.WaitGroup) {
queue := []float64{0.2, 0.4, 0.6}
go func() {
defer wg.Done()
for {
select {
case rec := <-w.askChannel:
switch rec.(type) {
case bool:
_, queue = queue[0], queue[1:]
default:
if len(queue) > 0 {
w.answerChannel <- queue[0]
} else {
w.answerChannel <- "E"
}
}
case <-w.closeChan:
return
}
}
fmt.Println("end-worker2")
}()
x := float64(0)
for i := 1; i < 5; i++ {
x = <-link
queue = append(queue, x)
fmt.Println("End receive", i)
}
}
func master(env *Environment, until float64, wg *sync.WaitGroup) {
for env.currentTime < until {
minTime := math.MaxFloat32
var minChannel chan interface{}
n := len(env.managerChannels)
findEvent := false
for i := 0; i < n; i++ {
env.managerChannels[i].askChannel <- struct{}{}
switch x := (<-env.managerChannels[i].answerChannel).(type) {
case string:
case float64:
x = float64(x)
if x < minTime {
minTime = x
minChannel = env.managerChannels[i].askChannel
}
findEvent = true
}
}
if findEvent {
minChannel <- true
env.currentTime = minTime
fmt.Println(minTime)
}else {
break
}
}
n := len(env.managerChannels)
for i := 0; i < n; i++ {
close(env.closeChannels[i])
}
fmt.Println("end-master")
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
env := new(Environment)
w1 := NewProcessWorker(env)
w2 := NewProcessWorker(env)
link := make(chan float64)
go w1.worker1(env, link, &wg)
go w2.worker2(env, link, &wg)
go master(env, float64(51.61), &wg)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment